百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

PyFlink 教程(三):PyFlink DataStream API - state & timer

bigegpt 2024-09-17 12:36 7 浏览

简介: 介绍如何在 Python DataStream API 中使用 state & timer 功能。

一、背景

Flink 1.13 已于近期正式发布,超过 200 名贡献者参与了 Flink 1.13 的开发,提交了超过 1000 个 commits,完成了若干重要功能。其中,PyFlink 模块在该版本中也新增了若干重要功能,比如支持了 state、自定义 window、row-based operation 等。随着这些功能的引入,PyFlink 功能已经日趋完善,用户可以使用 Python 语言完成绝大多数类型Flink作业的开发。接下来,我们详细介绍如何在 Python DataStream API 中使用 state & timer 功能。

二、state 功能介绍

作为流计算引擎,state 是 Flink 中最核心的功能之一。

  • 在 1.12 中,Python DataStream API 尚不支持 state,用户使用 Python DataStream API 只能实现一些简单的、不需要使用 state 的应用;
  • 而在 1.13 中,Python DataStream API 支持了此项重要功能。

state 使用示例

如下是一个简单的示例,说明如何在 Python DataStream API 作业中使用 state:

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.LONG())
        # 定义value state
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        new_cnt = cnt + 1
        self.cnt_state.update(new_cnt)
        return value[0], new_cnt


def state_access_demo():
    # 1. 创建 StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. 创建数据源
    seq_num_source = NumberSequenceSource(1, 100)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. 定义执行逻辑
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. 将打印结果数据
    ds.print()

    # 5. 执行作业
    env.execute()


if __name__ == '__main__':
    state_access_demo()

在上面的例子中,我们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为 “cnt_state” 的 ValueState,用于记录每一个 key 出现的次数。

说明:

  • 除了 ValueState 之外,Python DataStream API 还支持 ListState、MapState、ReducingState,以及 AggregatingState;
  • 定义 state 的 StateDescriptor 时,需要声明 state 中所存储的数据的类型(TypeInformation)。另外需要注意的是,当前 TypeInformation 字段并未被使用,默认使用 pickle 进行序列化,因此建议将 TypeInformation 字段定义为 Types.PICKLED_BYTE_ARRAY() 类型,与实际所使用的序列化器相匹配。这样的话,当后续版本支持使用 TypeInformation 之后,可以保持后向兼容性;
  • state 除了可以在 KeyedStream 的 map 操作中使用,还可以在其它操作中使用;除此之外,还可以在连接流中使用 state,比如:
ds1 = ...  # type DataStream
ds2 = ...  # type DataStream
ds1.connect(ds2) \
    .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \
    .map(MyCoMapFunction())  # 可以在MyCoMapFunction中使用state

可以使用 state 的 API 列表如下:


操作

自定义函数

KeyedStream

map

MapFunction

flat_map

FlatMapFunction


reduce

ReduceFunction


filter

FilterFunction


process

KeyedProcessFunction


ConnectedStreams

map

CoMapFunction

flat_map

CoFlatMapFunction


process

KeyedCoProcessFunction


WindowedStream

apply

WindowFunction


process

ProcessWindowFunction

state 工作原理

上图是 PyFlink 中,state 工作原理的架构图。从图中我们可以看出,Python 自定义函数运行在 Python worker 进程中,而 state backend 运行在 JVM 进程中(由 Java 算子来管理)。当 Python 自定义函数需要访问 state 时,会通过远程调用的方式,访问 state backend。

我们知道,远程调用的开销是非常大的,为了提升 state 读写的性能,PyFlink 针对 state 读写做了以下几个方面的优化工作:

  • Lazy Read:对于包含多个 entry 的 state,比如 MapState,当遍历 state 时,state 数据并不会一次性全部读取到 Python worker 中,只有当真正需要访问时,才从 state backend 读取。
  • Async Write:当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样做可以避免每次 state 更新操作都访问远端的 state backend;同时,针对同一个 key 的多次更新操作,可以合并执行,尽量避免无效的 state 更新。
  • LRU cache:在 Python worker 进程中维护了 state 读写的 cache。当读取某个 key 时,会先查看其是否已经被加载到读 cache 中;当更新某个 key 时,会先将其存放到写 cache 中。针对频繁读写的 key,LRU cache 可以避免每次读写操作,都访问远端的 state backend,对于有热点 key 的场景,可以极大提升 state 读写性能。
  • Flush on Checkpoint:为了保证 checkpoint 语义的正确性,当 Java 算子需要执行 checkpoint时,会将 Python worker中的写 cache 都 flush 回 state backend。

其中 LRU cache 可以细分为二级,如下图所示:

说明:

  • 二级 cache 为 global cache,二级 cache 中的读 cache 中存储着当前 Python worker 进程中所有缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着当前 Python worker 进程中所有创建的 state 对象。
  • 一级 cache 位于每一个 state 对象内,在 state 对象中缓存着该 state 对象已经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。

工作流程:

  • 当在 Python UDF 中,创建一个 state 对象时,首先会查看当前 key 所对应的 state 对象是否已经存在(在二级 cache 中的 “Global Write Cache” 中查找),如果存在,则返回对应的 state 对象;如果不存在,则创建新的 state 对象,并存入 “Global Write Cache”;
  • state 读取:当在 Python UDF 中,读取 state 对象时,如果待读取的 state 数据已经存在(一级 cache),比如对于 MapState,待读取的 map key/map value 已经存在,则直接返回对应的 map key/map value;否则,访问二级 cache,如果二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;
  • state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象内部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。

state 性能调优

通过前一节的介绍,我们知道 PyFlink 使用了多种优化手段,用于提升 state 读写的性能,这些优化行为可以通过以下参数配置:

配置

说明

python.state.cache-size

Python worker 中读 cache 以及写 cache 的大小。(二级 cache)需要注意的是:读 cache、写 cache是独立的,当前不支持分别配置读 cache 以及写 cache 的大小。

python.map-state.iterate-response-batch-size

当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。

python.map-state.read-cache-size

一个 MapState 的读 cache 中最大允许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会通过 LRU 策略从读 cache 中删除最近最少访问过的 entry。

python.map-state.write-cache-size

一个 MapState 的写 cache 中最大允许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下所有待更新 state 数据写回远端的 state backend。

需要注意的是,state 读写的性能不仅取决于以上参数,还受其它因素的影响,比如:

  • 输入数据中 key 的分布:输入数据的 key 越分散,读 cache 命中的概率越低,则性能越差。
  • Python UDF 中 state 读写次数:state 读写可能涉及到读写远端的 state backend,应该尽量优化 Python UDF 的实现,减少不必要的 state 读写。
  • checkpoint interval:为了保证 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将所有缓存的待更新 state 数据,写回 state backend。如果配置的 checkpoint interval 过小,则可能并不能有效减少 Python worker 写回 state backend 的数据量。
  • bundle size / bundle time:当前 Python 算子会将输入数据划分成多个批次,发送给 Python worker 执行。当一个批次的数据处理完之后,会强制将 Python worker 进程中的待更新 state 写回 state backend。与 checkpoint interval 类似,该行为也可能会影响 state 写性能。批次的大小可以通过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数控制。

三、timer 功能介绍

timer 使用示例

除了 state 之外,用户还可以在 Python DataStream API 中使用定时器 timer。

import datetime

from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


class CountWithTimeoutFunction(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(ValueStateDescriptor(
            "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = Row(value.f1, 0, 0)

        # update the state's count
        current[1] += 1

        # set the state's timestamp to the record's assigned event time timestamp
        current[2] = ctx.timestamp()

        # write the state back
        self.state.update(current)

        # schedule the next timer 60 seconds from the current event time
        ctx.timer_service().register_event_time_timer(current[2] + 60000)

    def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
        # get the state for the key that scheduled the timer
        result = self.state.value()

        # check if this is an outdated timer or the latest timer
        if timestamp == result[2] + 60000:
            # emit the state on timeout
            yield result[0], result[1]


class MyTimestampAssigner(TimestampAssigner):

    def __init__(self):
        self.epoch = datetime.datetime.utcfromtimestamp(0)

    def extract_timestamp(self, value, record_timestamp) -> int:
        return int((value[0] - self.epoch).total_seconds() * 1000)


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)

    t_env.execute_sql("""
            CREATE TABLE my_source (
              a TIMESTAMP(3),
              b VARCHAR,
              c VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '10'
            )
        """)

    stream = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
    watermarked_stream = stream.assign_timestamps_and_watermarks(
        WatermarkStrategy.for_monotonous_timestamps()
                         .with_timestamp_assigner(MyTimestampAssigner()))

    # apply the process function onto a keyed stream
    watermarked_stream.key_by(lambda value: value[1])\
        .process(CountWithTimeoutFunction()) \
        .print()

    env.execute()

在上述示例中,我们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每一个 key 出现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其出现次数,发送到下游节点。

除了 event time timer 之外,用户还可以使用 processing time timer。

timer 工作原理

timer 的工作流程是这样的:

  • 与 state 访问使用单独的通信信道不同,当用户注册 timer 之后,注册消息通过数据通道发送到 Java 算子;
  • Java 算子收到 timer 注册消息之后,首先检查待注册 timer 的触发时间,如果已经超过当前时间,则直接触发;否则的话,将 timer 注册到 Java 算子的 timer service 中;
  • 当 timer 触发之后,触发消息通过数据通道发送到 Python worker,Python worker 回调用户 Python UDF 中的的 on_timer 方法。

需要注意的是:由于 timer 注册消息以及触发消息通过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会造成在某些场景下,timer 的触发可能没有那么及时。比如当用户注册了一个 processing time timer,当 timer 触发之后,触发消息通过数据通道传输到 Python UDF 时,可能已经是几秒中之后了。

四、总结

在这篇文章中,我们主要介绍了如何在 Python DataStream API 作业中使用 state & timer,state & timer 的工作原理以及如何进行性能调优。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景以及最佳实践等。

本文为阿里云原创内容,未经允许不得转载。

相关推荐

方差分析简介(方差分析通俗理解)

介绍方差分析(ANOVA,AnalysisofVariance)是一种广泛使用的统计方法,用于比较两个或多个组之间的均值。单因素方差分析是方差分析的一种变体,旨在检测三个或更多分类组的均值是否存在...

正如404页面所预示,猴子正成为断网元凶--吧嗒吧嗒真好吃

吧嗒吧嗒,绘图:MakiNaro你可以通过加热、冰冻、水淹、模塑、甚至压溃压力来使网络光缆硬化。但用猴子显然是不行的。光缆那新挤压成型的塑料外皮太尼玛诱人了,无法阻挡一场试吃盛宴的举行。印度政府正...

Python数据可视化:箱线图多种库画法

概念箱线图通过数据的四分位数来展示数据的分布情况。例如:数据的中心位置,数据间的离散程度,是否有异常值等。把数据从小到大进行排列并等分成四份,第一分位数(Q1),第二分位数(Q2)和第三分位数(Q3)...

多组独立(完全随机设计)样本秩和检验的SPSS操作教程及结果解读

作者/风仕在上一期,我们已经讲完了两组独立样本秩和检验的SPSS操作教程及结果解读,这期开始讲多组独立样本秩和检验,我们主要从多组独立样本秩和检验介绍、两组独立样本秩和检验使用条件及案例的SPSS操作...

方差分析 in R语言 and Excel(方差分析r语言例题)

今天来写一篇实际中比较实用的分析方法,方差分析。通过方差分析,我们可以确定组别之间的差异是否超出了由于随机因素引起的差异范围。方差分析分为单因素方差分析和多因素方差分析,这一篇先介绍一下单因素方差分析...

可视化:前端数据可视化插件大盘点 图表/图谱/地图/关系图

前端数据可视化插件大盘点图表/图谱/地图/关系图全有在大数据时代,很多时候我们需要在网页中显示数据统计报表,从而能很直观地了解数据的走向,开发人员很多时候需要使用图表来表现一些数据。随着Web技术的...

matplotlib 必知的 15 个图(matplotlib各种图)

施工专题,我已完成20篇,施工系列几乎覆盖Python完整技术栈,目标只总结实践中最实用的东西,直击问题本质,快速帮助读者们入门和进阶:1我的施工计划2数字专题3字符串专题4列表专题5流程控制专题6编...

R ggplot2常用图表绘制指南(ggplot2绘制折线图)

ggplot2是R语言中强大的数据可视化包,基于“图形语法”(GrammarofGraphics),通过分层方式构建图表。以下是常用图表命令的详细指南,涵盖基本语法、常见图表类型及示例,适合...

Python数据可视化:从Pandas基础到Seaborn高级应用

数据可视化是数据分析中不可或缺的一环,它能帮助我们直观理解数据模式和趋势。本文将全面介绍Python中最常用的三种可视化方法。Pandas内置绘图功能Pandas基于Matplotlib提供了简洁的绘...

Python 数据可视化常用命令备忘录

本文提供了一个全面的Python数据可视化备忘单,适用于探索性数据分析(EDA)。该备忘单涵盖了单变量分析、双变量分析、多变量分析、时间序列分析、文本数据分析、可视化定制以及保存与显示等内容。所...

统计图的种类(统计图的种类及特点图片)

统计图是利用几何图形或具体事物的形象和地图等形式来表现社会经济现象数量特征和数量关系的图形。以下是几种常见的统计图类型及其适用场景:1.条形图(BarChart)条形图是用矩形条的高度或长度来表示...

实测,大模型谁更懂数据可视化?(数据可视化和可视化分析的主要模型)

大家好,我是Ai学习的老章看论文时,经常看到漂亮的图表,很多不知道是用什么工具绘制的,或者很想复刻类似图表。实测,大模型LaTeX公式识别,出乎预料前文,我用Kimi、Qwen-3-235B...

通过AI提示词让Deepseek快速生成各种类型的图表制作

在数据分析和可视化领域,图表是传达信息的重要工具。然而,传统图表制作往往需要专业的软件和一定的技术知识。本文将介绍如何通过AI提示词,利用Deepseek快速生成各种类型的图表,包括柱状图、折线图、饼...

数据可视化:解析箱线图(box plot)

箱线图/盒须图(boxplot)是数据分布的图形表示,由五个摘要组成:最小值、第一四分位数(25th百分位数)、中位数、第三四分位数(75th百分位数)和最大值。箱子代表四分位距(IQR)。IQR是...

[seaborn] seaborn学习笔记1-箱形图Boxplot

1箱形图Boxplot(代码下载)Boxplot可能是最常见的图形类型之一。它能够很好表示数据中的分布规律。箱型图方框的末尾显示了上下四分位数。极线显示最高和最低值,不包括异常值。seaborn中...