当前位置: 首页 > news >正文

怎样做网站赚流量长沙网站seo优化公司

怎样做网站赚流量,长沙网站seo优化公司,网站制作合同范本,服务器做jsp网站教程目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table & SQL connectors…

目录

StreamExecutionEnvironment

Watermark

watermark策略简介

使用 Watermark 策略

内置水印生成器

处理空闲数据源

算子处理 Watermark 的方式

创建DataStream的方式

通过list对象创建

​​​​​​使用DataStream connectors创建

使用Table & SQL connectors创建


StreamExecutionEnvironment

编写一个 Flink Python DataStream API 程序,首先需要声明一个执行环境StreamExecutionEnvironment,这是流式程序执行的上下文。

你将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

创建了 StreamExecutionEnvironment 之后,你可以使用它来声明数据源。数据源从外部系统(如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到 Flink 作业里。

为了简单起见,本教程读取文件作为数据源。

ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source"
)

Watermark

大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

为了解决乱序数据,flink引入watermark。引入watermark机制则会等待晚到的数据一段时间,等待时间到则触发计算,如果数据延迟很大,通常也会被丢弃或者另外处理。

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

watermark策略简介

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner WatermarkGenerator WatermarkStrategyWatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。

使用 Watermark 策略

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。

仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy

内置水印生成器

水印策略定义了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配记录内部时间戳的TimestampAssigner的生成器/工厂。

BoundedOutOfOrdernessDuration),为创建WatermarkStrategy常见的内置策略。

for_bound_out_of_ordernness(max_out_of_ordernesspyflink.common.time.Duration)为记录无序的情况创建水印策略,但可以设置事件无序程度的上限。

无序绑定B意味着一旦遇到时间戳为T的事件,就不会再出现早于(T-B)的事件。

for_bound_out_of_ordernness(5)

for_mononous_timestamps()为时间戳单调递增的情况创建水印策略。

水印是定期生成的,并严格遵循数据中的最新时间戳。该策略引入的延迟主要是生成水印的周期间隔。

WatermarkStrategy.for_monotonous_timestamps()

with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner)

创建一个新的WatermarkStrategy,该策略通过实现TimestampAssigner接口使用给定的TimestampAssigner。

参数: timestamp_assigner 给定的TimestampAssigner。

Return: 包装TimestampAssigner的WaterMarkStrategy。

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    with_timestamp_assigner(MyTimestampAssigner())

处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口withIdleness(Duration.ofMinutes(1))

with_idleness(idle_timeout:pyfrink.common.time.Duration)

创建一个新的丰富的WatermarkStrategy,它也在创建的WatermarkGenerator中执行空闲检测。

参数:idle_timeout–空闲超时。

Return:配置了空闲检测的新水印策略。

算子处理 Watermark 的方式

一般情况下,在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理。例如,WindowOperator 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游。换句话说,由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。

相同的规则也适用于 TwoInputstreamOperator。但是,在这种情况下,算子当前的 watermark 会取其两个输入的最小值。

创建DataStream的方式

通过list对象创建

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))

​​​​​​使用DataStream connectors创建

使用add_source函数,此函数仅支持FlinkKafkaConsumer,仅在streaming执行模式下使用

from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumerenv = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer = FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds = env.add_source(kafka_consumer)

使用from_source函数,此函数仅支持NumberSequenceSource和FileSource自定义数据源,仅在streaming执行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSourceenv = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())

​​​​​​​使用Table & SQL connectors创建

首先用Table & SQL connectors创建表,再转换为DataStream.

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))

http://www.shuangfujiaoyu.com/news/181.html

相关文章:

  • 淘宝客网站制作教程seo实战密码第四版
  • 上海模板建站源码百度app推广方法
  • 制作网站建设的公司阿里妈妈推广网站
  • wordpress主题资源分享搜索引擎seo关键词优化效果
  • 网站悬浮广告代码百度网站搜索排名
  • 营销类网站推荐优化技术
  • 网站建设客户去哪里找济南百度seo
  • 微信公众号商城网站开发做神马seo快速排名软件
  • 小规模公司做网站成本是什么郑州网络推广公司
  • 这几年做啥网站致富百度上海总部
  • 胶州企业网站设计网页设计与制作考试试题及答案
  • 做区域链的网站爱站网关键词查询系统
  • 做网站需要做什么页面黄页推广引流
  • 新手如何自己做网站谁能给我个网址
  • 做包装看什么网站百度论坛
  • wordpress同ip弹一次广告seo优化多少钱
  • 用什么软件做网站前端正规电商平台有哪些
  • 怎样制作网站电话站长工具在线查询
  • 网站 aspx 模板抖音搜索关键词排名
  • 网站登录接口怎么做正规seo关键词排名网络公司
  • 武汉公司做网站百度客服中心人工在线咨询
  • 医院网站源码下载离我最近的广告公司
  • 怎么查找网站的根目录注册网站需要多少钱
  • 文安做网站google关键词工具
  • 中山手机网站制作哪家好seo专家是什么意思
  • 免费网站建设平台今日油价92汽油价格
  • 网站开发的技术指标大连seo外包平台
  • 旅游景区网站开发的政策可行性流量查询网站
  • 网站修改图片怎么做谷歌网站优化
  • 收藏网站的链接怎么做seo排名计费系统