当前位置: 首页 > news >正文

模拟创建一个公司竞价推广和seo的区别

模拟创建一个公司,竞价推广和seo的区别,个人博客网站模板免费,tale博客和wordpress文章目录 实时流式计算Kafka StreamKafka Streams 的关键概念KStreamKafka Stream入门案例编写SpringBoot 集成 Kafka Stream 实时流式计算 一般流式计算会与批量计算相比较 流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数…

文章目录

  • 实时流式计算
  • Kafka Stream
  • Kafka Streams 的关键概念
  • KStream
  • Kafka Stream入门案例编写
  • SpringBoot 集成 Kafka Stream


实时流式计算

一般流式计算会与批量计算相比较

在这里插入图片描述

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。
流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

应用场景

  • 日志分析
    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计
    可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据
    可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算
    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

技术方案选型

  • Hadoop
  • Apche Storm
  • Flink
  • Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

Kafka Stream

Kafka Stream:提供了对存储于 Kafka内 的数据进行流式处理分析的功能

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

Kafka Streams 的关键概念

在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

KStream

  • 数据结构类似于map,如下图,key-value 键值对

在这里插入图片描述

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

在这里插入图片描述

Kafka Stream入门案例编写

  1. 需求分析,求单词个数(word count)

在这里插入图片描述

  1. 创建原生的 kafka staream 入门案例

导入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>

在这里插入图片描述

在这里插入图片描述

package com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容:hello kafka  hello itcast* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}
  1. 测试准备
  • 使用生产者在 topic 为:itcast_topic_input中发送多条消息
  • stream 接收 itcast_topic_input 的数据,进行聚合操作后,将处理结果发送到 itcast_topic_out
  • 使用消费者接收 topic 为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

SpringBoot 集成 Kafka Stream

  1. 配置

在这里插入图片描述

package com.heima.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

application.yml

在这里插入图片描述

kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}
  1. 在配置类中定义方法
  • 可注入StreamsBuilder
  • 返回值必须是KStream且放入spring容器中

在这里插入图片描述

package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}
  1. 测试

启动 springboot 项目即可自动监听

http://www.shuangfujiaoyu.com/news/56424.html

相关文章:

  • 香河建设局网站长沙网站优化价格
  • 东营网站seo外包宁波正规优化seo公司
  • 公司内部展厅设计长沙关键词优化方法
  • 北京做网页公司seo索引擎优化
  • 做网站商城需要多少钱品牌seo培训咨询
  • 口碑好的网站建设收费关键词优化一般收费价格
  • 佛山低价网站建设千锋教育培训收费一览表
  • 商业网站开发实训报告枫林seo工具
  • 湘潭商城网站建设定制自己如何建立网站
  • 淘宝运营的基础知识石家庄seo全网营销
  • seo外包公司怎么样武汉seo外包平台
  • 旅游型网站建设山西网页制作
  • 网站页面是自己做还是使用模板baidu百度首页官网
  • wordpress加入代码行舟山seo
  • 燕郊网站建设公司如何做seo搜索优化
  • 成品图片的网站在哪里找什么网站百度收录快
  • 电子商务网站设计说明最新新闻消息
  • 特产网站建设策划书世界球队最新排名
  • 现在的网站开发用什么技术sem和seo是什么职业岗位
  • 专业云南做网站挖掘关键词爱站网
  • 免费自助建站源码重庆排名seo公司
  • 佛山市做网站百家号排名
  • 网站页面设计基础教程经典软文案例标题加内容
  • 网站会员充值接口怎么做的百度搜索引擎推广步骤
  • 网站开发程序有哪些厦门百度seo
  • 江西省建设厅网站官网百度商家平台
  • 哈尔滨做网站哈尔滨学院推广产品
  • 网站建设专业培训药品销售推广方案
  • 做3d动画的斑马网站seo技术外包
  • 看电视剧的免费网站app下载站长工具爱站