当前位置: 首页 > news >正文

基于h5的个人网站建设上海好的网络推广公司

基于h5的个人网站建设,上海好的网络推广公司,html5 php 网站源码,做服装网站简介: kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能 特点: KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署 入门案例: 1、…

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

http://www.shuangfujiaoyu.com/news/35414.html

相关文章:

  • 辽宁省建设工程交易网湖南正规关键词优化
  • 母婴网站模板dede最近七天的新闻大事
  • 网站建设狼雨今天新闻
  • 在线做爰 视频网站设计网页的软件
  • 南宁培训网站建设广西网络推广公司
  • 聚美优品一个专注于做特价的网站怎么开网站平台
  • 同ip网站做301网站怎么收录到百度
  • 南昌做网站优化哪家好百度seo报价方法
  • 学校网站的建设目标是什么意思最好用的搜索引擎
  • 交互设计大学世界排名企业seo职位
  • 国内知名设计网站百度经验登录入口
  • 做兼职推荐网站网站开发公司
  • 重庆网络公司网站建设百度贴吧官网
  • 快速建设网站精准引流的网络推广
  • 国外做ppt的网站有哪些网站优化要多少钱
  • 免费发布信息网有哪些网站常州seo关键词排名
  • 网站建设与管理提纲seo的名词解释
  • 从来没做过网站如何做淘宝搜索词排名查询
  • 做网站代理网络推广优化网站
  • 贵州华瑞网站建设有限公司淘宝权重查询
  • 如何建网站教程企业培训课程清单
  • django 做网站赚钱简述网络营销的特点及功能
  • 石家庄做手机网站推广网页设计个人主页
  • 网上做任务赚钱网站有哪些nba排名最新排名
  • 自己建网站可以赚钱吗哪家培训机构学校好
  • 电子政务 和网站建设总结南京seo网站管理
  • 空壳网站广告营销
  • 唐山网站建设外包公司重庆关键词搜索排名
  • 万网国际江北seo综合优化外包
  • 会展设计是什么专业安徽新站优化