百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Flink StreamSQL 原理介绍 flink streaming

bigegpt 2024-10-27 08:22 60 浏览

引言

前面群里面同学说对flink感兴趣,特别邀请资深流专家张如聪给大家深入分析下Flink里面最重要部分:Flink SQL。

本文对Flink SQL深入浅出,相当有深度的技术分析文章,希望大家会喜欢,对Flink技术上有疑问的也可以联系专家帮忙解答。

一、Flink SQL简介

Flink SQL 是Fllink提供的SQL的SDK API。SQL是比Table更高阶的API,集成在Table library中提供,在流和批上都可以用此API开发业务。本文主要侧重于SQL在Stream上的能力,也就是介绍StreamSQL的能力。

二、 StreamSQL能力概述

Flink SQL的语法采用Apache Calcite的语法.很多开源组件如Samza、Storm、Apex都使用Calcite的语法作为其SQL的语法。

在Flink 1.3.0版本中流上的操作支持SELECT, FROM, WHERE,UNION、聚合和自定义能力,join能力预计在Flink 1.4.0(预计在9月份发布)版本中提供。详细的语法能力,请参见第6章节。

三、用户使用StreamSQL开发业务流应用流程

在使用StreamSQL开发业务流应用前,需要在pom.xml增加引用flink-table lib的依赖,具体如下:

<dependency>

使用StreamSQL开发流应用的过程如下:

1、获取table stream 环境;

val env = StreamExecutionEnvironment.getExecutionEnvironment

2、从source算子获取流式数据,并定义数据的schema:

val orderA: DataStream[Order] = env.fromCollection(Seq(

其中order类的定义为:

case class Order(user: Long, product: String, amount: Int)

当前StreamSQL支持的source算子包含公共的kafka source、CSV file source,以及自定义的source。 3、用StreamSQL书写数据处理逻辑,即定义业务应用:

val result = tEnv.sql(

4、定义处理结果的输出,即sink算子:

result.toAppendStream[Order].print()

5、提交到Flink系统执行:

env.execute()

四、StreamSQL 执行原理介绍

如上图所示,StreamSQL API的执行原理如下:

1、用户使用对外提供Stream SQL的语法开发业务应用;

2、用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划;

3、采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划;

4、对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行;

五、StreamSQL 编译执行流程介绍

1、使用calcite对Sql进行编译,在编译的过程中对语法进行校验,如果语法符合要求,编译结果为calcite的一个逻辑树,如:第四章节中的SQL“SELECT * FROM OrderA WHERE amount > 2 + 3”编译后为:

LogicalFilter(condition=[>($2, 2+3)])

2、首先用如下规则集将table scan节点转化为关系表达式。

val TABLE_CONV_RULES: RuleSet = RuleSets.ofList(

3、再用如下规则集,将逻辑计划树标准化。

 val DATASET_NORM_RULES: RuleSet = RuleSets.ofList(

4、再使用如下规则,将逻辑树进行优化,生成最优的逻辑计划树.

val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(

如逻辑计划

LogicalFilter(condition=[>($2, 2+3)])

优化为:

LogicalProject.NONE(input=rel#19:LogicalFilter.NONE(input=rel#10:LogicalTableScan.NONE(table=[OrderA]),condition=>($2, 5)),user=$0,product=$1,amount=$2)

主要做了如下几点优化: 1)过滤下压到tablescan节点,直接在输入的时候进行判断; 2)表达式提前预计算,如“2+3”之间计算出结果5; 5、再使用如下规则并使用janino codegen生成用DataStream API描述的物理执行计划;

val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(

6、最后使用如下规则对物理执行计划进行优化。主要是对聚合操作的优化。

val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(

7、最后提交到Flink平台执行;

六、StreamSQL 语法介绍

整体语法范式如下:

query:

语法操作

1)Scan、Projection、Filter以及自定义函数

OperatorsDescription
Scan/Select/AsSELECT a, c AS d FROM Orders;
Where/FilterSELECT * FROM Orders WHERE a % 2 = 0;
User-defined Scalar Functions (Scalar UDF)SELECT TimestampModifier(user) FROM Orders;

对于自定义函数,需要注册到table 的stream环境中。如:

object TimestampModifier extends ScalarFunction {

2)Aggregations及用户自定义的聚合函数

OperatorsDescription
GroupBy AggregationSELECT a, SUM(b) as d FROM Orders GROUP BY a;
**GroupBy Window Aggregation **SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user;
Over Window aggregationSELECT COUNT(amount) OVER (PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders;
HavingSELECT SUM(amount) FROM Orders GROUP BY users HAVING SUM(amount) > 50;
User-defined Aggregate Functions (UDAGG)SELECT MyAggregate(amount) FROM Orders GROUP BY users;

说明: a)window当前支持group window和row window。group window就是所谓的跳跃窗口,固定周期触发输出。row window就是所谓的滑动窗口,每个数据流过来,都会触发输出;group window当前支持:

Group Window FunctionDescription
TUMBLE(time_attr, interval)Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (interval). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).
HOP(time_attr, interval, interval)Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second interval parameter) and hops by a specified hop interval (first interval parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on event-time (stream + batch) or processing-time (stream).
SESSION(time_attr, interval)Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time interval of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time (stream + batch) or processing-time (stream).

b)聚合自定义函数和普通自定义函数类似。 3)Set

OperatorsDescription
UnionAllSELECT * FROM ((SELECT user FROM Orders WHERE a % 2 = 0)UNION ALL(SELECT user FROM Orders WHERE b = 0));

StreamSQL支持的数据类型

StreamSQL支持的数据类型如下:



版权归微信公众号/头条号《大数据和云计算技术》所有,转载请联系本人授权;欢迎关注,获取最新技术资讯和深度分析。

相关推荐

AI「自我复制」能力曝光,RepliBench警示:大模型正在学会伪造身份

科幻中AI自我复制失控场景,正成为现实世界严肃的研究课题。英国AISI推出RepliBench基准,分解并评估AI自主复制所需的四大核心能力。测试显示,当前AI尚不具备完全自主复制能力,但在获取资源...

【Python第三方库安装】介绍8种情况,这里最全看这里就够了!

**本图文作品主要解决CMD或pycharm终端下载安装第三方库可能出错的问题**本作品介绍了8种安装方法,这里最全的python第三方库安装教程,简单易上手,满满干货!希望大家能愉快地写代码,而不要...

pyvips,一个神奇的 Python 库!(pythonvip视频)

大家好,今天为大家分享一个神奇的Python库-pyvips。在图像处理领域,高效和快速的图像处理工具对于开发者来说至关重要。pyvips是一个强大的Python库,基于libvips...

mac 安装tesseract、pytesseract以及简单使用

一.tesseract-OCR的介绍1.tesseract-OCR是一个开源的OCR引擎,能识别100多种语言,专门用于对图片文字进行识别,并获取文本。但是它的缺点是对手写的识别能力比较差。2.用te...

实测o3/o4-mini:3分钟解决欧拉问题,OpenAI最强模型名副其实!

号称“OpenAI迄今为止最强模型”,o3/o4-mini真实能力究竟如何?就在发布后的几小时内,网友们的第一波实测已新鲜出炉。最强推理模型o3,即使遇上首位全职提示词工程师RileyGoodsid...

使用Python将图片转换为字符画并保存到文件

字符画(ASCIIArt)是将图片转换为由字符组成的艺术作品。利用Python,我们可以轻松实现图片转字符画的功能。本教程将带你一步步实现这个功能,并详细解释每一步的代码和实现原理。环境准备首先,你...

5分钟-python包管理器pip安装(python pip安装包)

pip是一个现代的,通用、普遍的Python包管理工具。提供了对Python包的查找、下载、安装、卸载的功能,是Python开发的基础。第一步:PC端打开网址:选择gz后缀的文件下载第二步:...

网络问题快速排查,你也能当好自己家的网络攻城狮

前面写了一篇关于网络基础和常见故障排查的,只列举了工具。没具体排查方式。这篇重点把几个常用工具的组合讲解一下。先有请今天的主角:nslookup及dig,traceroute,httping,teln...

终于把TCP/IP 协议讲的明明白白了,再也不怕被问三次握手了

文:涤生_Woo下周就开始和大家成体系的讲hadoop了,里面的每一个模块的技术细节我都会涉及到,希望大家会喜欢。当然了你也可以评论或者留言自己喜欢的技术,还是那句话,希望咱们一起进步。今天周五,讲讲...

记一次工控触摸屏故障的处理(工控触摸屏维修)

先说明一下,虽然我是自动化专业毕业,但已经很多年不从事现场一线的工控工作了。但自己在单位做的工作也牵涉到信息化与自动化的整合,所以平时也略有关注。上一周一个朋友接到一个活,一家光伏企业用于启动机组的触...

19、90秒快速“读懂”路由、交换命令行基础

命令行视图VRP分层的命令结构定义了很多命令行视图,每条命令只能在特定的视图中执行。本例介绍了常见的命令行视图。每个命令都注册在一个或多个命令视图下,用户只有先进入这个命令所在的视图,才能运行相应的命...

摄像头没图像的几个检查方法(摄像头没图像怎么修复)

背景描述:安防监控项目上,用户的摄像头运行了一段时间有部分摄像头不能进行预览,需要针对不能预览的摄像头进行排查,下面列出几个常见的排查方法。问题解决:一般情况为网络、供电、设备配置等情况。一,网络检查...

小谈:必需脂肪酸(必需脂肪酸主要包括)

必需脂肪酸是指机体生命活动必不可少,但机体自身又不能合成,必需由食物供给的多不饱和脂肪酸(PUFA)。必需脂肪酸主要包括两种,一种是ω-3系列的α-亚麻酸(18:3),一种是ω-6系列的亚油酸(18:...

期刊推荐:15本sci四区易发表的机械类期刊

  虽然,Sci四区期刊相比收录在sci一区、二区、三区的期刊来说要求不是那么高,投稿起来也相对容易一些。但,sci四区所收录的期刊中每本期刊的投稿难易程度也是不一样的。为方便大家投稿,本文给大家推荐...

be sick of 用法考察(be in lack of的用法)

besick表示病了,做谓语.本身是形容词,有多种意思.最通常的是:生病,恶心,呕吐,不适,晕,厌烦,无法忍受asickchild生病的孩子Hermother'sverysi...