3.SQL 语法篇
3.1.DDL:Create 子句
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
目前 Flink SQL 支持下列 CREATE 语句:
- ⭐ CREATE TABLE
- ⭐ CREATE DATABASE
- ⭐ CREATE VIEW
- ⭐ CREATE FUNCTION
此节重点介绍建表,建数据库、视图和 UDF 会在后面的扩展章节进行介绍。
3.1.1.建表语句
下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。
代码语言:javascript复制CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
3.1.2.表中的列
- ⭐ 常规列(即物理列)
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。
其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
举一个仅包含常规列的表的案例:
代码语言:javascript复制CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
- ⭐ 元数据列
元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。
举例:
代码语言:javascript复制CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 读取 kafka 本身自带的时间戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
元数据列可以用于后续数据的处理,或者写入到目标表中。
举例:
代码语言:javascript复制INSERT INTO MyTable
SELECT
user_id
, name
, record_time INTERVAL '1' SECOND
FROM MyTable;
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。
举例:
代码语言:javascript复制CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 读取 kafka 本身自带的时间戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
关于 Flink SQL 的每种 Connector 都提供了哪些 metadata 字段,详细可见官网文档 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/
如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。但是这要求两种数据类型是可以强转的。举例如下:
代码语言:javascript复制CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
默认情况下,Flink SQL planner 认为 metadata 列是可以 读取 也可以 写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。
那么在往一个表写入的场景下,我们就可以使用 VIRTUAL 关键字来标识某个元数据列不写入到外部存储中(不持久化)。
以 Kafka 举例:
代码语言:javascript复制CREATE TABLE MyTable (
-- sink 时会写入
`timestamp` BIGINT METADATA,
-- sink 时不写入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
在上面这个案例中,Kafka 引擎的 offset 是只读的。所以我们在把 MyTable 作为数据源(输入)表时,schema 中是包含 offset 的。在把 MyTable 作为数据汇(输出)表时,schema 中是不包含 offset 的。如下:
-- 当做数据源(输入)的 schema
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
-- 当做数据汇(输出)的 schema
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
所以这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。
- ⭐ 计算列
计算列其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。这些列本身是没有以物理形式存储到数据源中的。
举例:
代码语言:javascript复制CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);
注意!!! 计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。
小伙伴萌这时会问到一个问题,既然只能包含列、常量或者函数计算,我就直接在 DML query 代码中写就完事了呗,为啥还要专门在 DDL 中定义呢?
结论:没错,如果只是简单的四则运算的话直接写在 DML 中就可以,但是计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。比如要把输入数据的时间格式标准化。处理时间、事件时间分别举例如下:
- ⭐ 处理时间:使用
PROCTIME()函数来定义处理时间列 - ⭐ 事件时间:事件时间的时间戳可以在声明 Watermark 之前进行预处理。比如如果字段不是 TIMESTAMP(3) 类型或者时间戳是嵌套在 JSON 字符串中的,则可以使用计算列进行预处理。
注意!!!和虚拟 metadata 列是类似的,计算列也是只能读不能写的。
也就是说,我们在把 MyTable 作为数据源(输入)表时,schema 中是包含 cost 的。
在把 MyTable 作为数据汇(输出)表时,schema 中是不包含 cost 的。举例:
-- 当做数据源(输入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
-- 当做数据汇(输出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
3.1.3.定义 Watermark
Watermark 是在 Create Table 中进行定义的。具体 SQL 语法标准是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
其中:
- ⭐
rowtime_column_name:表的事件时间属性字段。该列必须是TIMESTAMP(3)、TIMESTAMP_LTZ(3)类,这个时间可以是一个计算列。 - ⭐
watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由rowtime_column_name列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。
注意:
- 如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。
- Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由
pipeline.auto-watermark-interval进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,然如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。
Flink SQL 提供了几种 WATERMARK 生产策略:
- ⭐ 有界无序:设置方式为
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。此类策略就可以用于设置最大乱序时间,假如设置为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND,则生成的是运行 5s 延迟的 Watermark。。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。 - ⭐ 严格升序:设置方式为
WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用这种方式。如果你能保证你的数据源的时间戳是严格升序的,那就可以使用这种方式。严格升序代表 Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。 - ⭐ 递增:设置方式为
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。
3.1.4.Create Table With 子句
先看一个案例:
代码语言:javascript复制CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
可以看到 DDL 中 With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的。
一般 With 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的 With 配置项都是不同的。
注意:
- Flink SQL 中 Connector 其实就是 Flink 用于链接外部数据源的接口。举一个类似的例子,在 Java 中想连接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去链接。映射到 Flink SQL 中,在 Flink SQL 中要连接到 Kafka,需要使用 kafka connector
- Flink SQL 已经提供了一系列的内置 Connector,具体可见 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/
回到上述案例中,With 声明了以下几项信息:
- ⭐
'connector' = 'kafka':声明外部存储是 Kafka - ⭐
'topic' = 'user_behavior':声明 Flink SQL 任务要连接的 Kafka 表的 topic 是 user_behavior - ⭐
'properties.bootstrap.servers' = 'localhost:9092':声明 Kafka 的 server ip 是 localhost:9092 - ⭐
'properties.group.id' = 'testGroup':声明 Flink SQL 任务消费这个 Kafka topic,会使用 testGroup 的 group id 去消费 - ⭐
'scan.startup.mode' = 'earliest-offset':声明 Flink SQL 任务消费这个 Kafka topic 会从最早位点开始消费 - ⭐
'format' = 'csv':声明 Flink SQL 任务读入或者写出时对于 Kafka 消息的序列化方式是 csv 格式
从这里也可以看出来 With 中具体要配置哪些配置项都是和每种 Connector 决定的。
3.1.4.Create Table Like 子句
Like 子句是 Create Table 子句的一个延伸。举例:
下面定义了一张 Orders 表:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定义一张带 Watermark 的新表:
CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定义
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;
上面这个语句的效果就等同于:
代码语言:javascript复制CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
不过这种不常使用。就不过多介绍了。如果小伙伴萌感兴趣,直接去官网参考具体注意事项:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like
3.2.DML:With 子句
- ⭐ 应用场景(支持 BatchStreaming):With 语句和离线 Hive SQL With 语句一样的,xdm,语法糖 1,使用它可以让你的代码逻辑更加清晰。
- ⭐ 直接上案例:
-- 语法糖 1
WITH orders_with_total AS (
SELECT
order_id
, price tax AS total
FROM Orders
)
SELECT
order_id
, SUM(total)
FROM orders_with_total
GROUP BY
order_id;
3.3.DML:SELECT & WHERE 子句
- ⭐ 应用场景(支持 BatchStreaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,xdm,常用作 ETL,过滤,字段清洗标准化
- ⭐ 直接上案例:
INSERT INTO target_table
SELECT * FROM Orders
INSERT INTO target_table
SELECT order_id, price tax FROM Orders
INSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
INSERT INTO target_table
SELECT price tax FROM Orders WHERE id = 10
-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3
- ⭐
SQL 语义:
其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。
以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了
代码语言:javascript复制INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3
这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:
- ⭐
数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的过滤和字段标准化算子 - ⭐
过滤和字段标准化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游数据汇算子 - ⭐
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中
可以看到这个实时任务的所有算子是以一种 pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

select & where
关于看如何看一段 Flink SQL 最终的执行计划: 最好的方法就如上图,看 Flink web ui 的算子图,算子图上详细的标记清楚了每一个算子做的事情。以上图来说,我们可以看到主要有三个算子:
- ⭐ Source 算子:Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]) ,其中 Source 表名称为
table=[[default_catalog, default_database, Orders],字段为select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time],Watermark 策略为rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]。 - ⭐ 过滤算子:Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id]),其中过滤条件为
where=[(order_id > 3)],结果字段为select=[order_id, name, row_time] - ⭐ Sink 算子:Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time]),其中最终产出的表名称为
table=[default_catalog.default_database.target_table],表字段为fields=[order_id, name, row_time]
可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 debug、调优前最基础的一个技巧。
那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:
- ⭐
数据源算子(From Order):数据源从 Order Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游过滤和字段标准化算子,然后数据源算子就运行结束了,释放资源了 - ⭐
过滤和字段标准化算子(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游数据汇算子,然后过滤和字段标准化算子就运行结束了,释放资源了 - ⭐
数据汇算子(INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了
可以看到离线任务的算子是分阶段(stage)进行运行的,每一个 stage 运行结束之后,然后下一个 stage 开始运行,全部的 stage 运行完成之后,这个离线任务就跑结束了。
注意: 很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释
- 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是
在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。 - 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是
在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。
flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?
3.4.DML:SELECT DISTINCT 子句
- ⭐ 应用场景(支持 BatchStreaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,xdm,用作根据 key 进行数据去重
- ⭐ 直接上案例:
INSERT into target_table
SELECT
DISTINCT id
FROM Orders
- ⭐
SQL 语义:
也是拿离线和实时做对比。
这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:
- ⭐
数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的去重算子 - ⭐
去重算子(DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游数据汇算子 - ⭐
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中

select distinct
注意: 对于实时任务,计算时的状态可能会无限增长。 状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。
那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:
- ⭐
数据源算子(From Order):数据源从 Order Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游去重算子,然后数据源算子就运行结束了,释放资源了 - ⭐
去重算子(DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游数据汇算子,然后去重算子就运行结束了,释放资源了 - ⭐
数据汇算子(INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了
3.5.DML:窗口聚合
由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走。思路如下:
- ⭐ 先介绍 Flink SQL 支持的 4 种时间窗口
- ⭐ 分别详细介绍上述的 4 种时间窗口的功能及 SQL 语法
- ⭐ 结合实际案例介绍 4 种时间窗口
首先来看看 Flink SQL 中支持的 4 种窗口的运算。
- ⭐ 滚动窗口(TUMBLE)
- ⭐ 滑动窗口(HOP)
- ⭐ Session 窗口(SESSION)
- ⭐ 渐进式窗口(CUMULATE)
3.5.1.滚动窗口(TUMBLE)
- ⭐ 滚动窗口定义:滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中,如下图所示。

tumble window
- ⭐ 应用场景:常见的按照一分钟对数据进行聚合,计算一分钟内 PV,UV 数据。
- ⭐ 实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额
那么上面这个案例的 SQL 要咋写呢?
关于滚动窗口,在 1.13 版本之前和 1.13 及之后版本有两种 Flink SQL 实现方式,分别是:
- ⭐ Group Window Aggregation(1.13 之前只有此类方案,此方案在 1.13 及之后版本已经标记为废弃,不推荐小伙伴萌使用)
- ⭐ Windowing TVF(1.13 及之后建议使用 Windowing TVF)
博主这里两种方法都会介绍:
- ⭐ Group Window Aggregation 方案(支持 BatchStreaming 任务):
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
select
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute)
可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval '1' minute),第一个参数为事件时间的时间戳;第二个参数为滚动窗口大小。
- ⭐ Window TVF 方案(1.13 只支持 Streaming 任务):
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
count(distinct user_id) as uv
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分参数。
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明滚动窗口大小为 1 min。
可以直接在公众号后台回复1.13.2 最全 flink sql获取源代码。所有的源码都开源到 github 上面了。里面包含了非常多的案例。可以直接拿来在本地运行的!!!肥肠的方便。
- ⭐
SQL 语义:
由于离线没有相同的时间窗口聚合概念,这里就直接说实时场景 SQL 语义,假设 Orders 为 kafka,target_table 也为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:
- ⭐
数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的窗口聚合算子 - ⭐
窗口聚合算子(TUMBLE 算子):接收到上游算子发的一条一条的数据,然后将每一条数据按照时间戳划分到对应的窗口中(根据事件时间、处理时间的不同语义进行划分),上述案例为事件时间,事件时间中,滚动窗口算子接收到上游的 Watermark 大于窗口的结束时间时,则说明当前这一分钟的滚动窗口已经结束了,将窗口计算完的结果发往下游算子(一条一条发给下游数据汇算子) - ⭐
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中
这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。
注意: 事件时间中滚动窗口的窗口计算触发是由 Watermark 推动的。
3.5.2.滑动窗口(HOP)
- ⭐ 滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动窗口之间每个窗口是可以重叠。在这种情况下,一条数据就会分配到多个窗口当中。举例,有 10 分钟大小的窗口,滑动步长为 5 分钟。这样,每 5 分钟会划分一次窗口,这个窗口包含的数据是过去 10 分钟内的数据,如下图所示。

hop window
- ⭐ 应用场景:比如计算同时在线的数据,要求结果的输出频率是 1 分钟一次,每次计算的数据是过去 5 分钟的数据(有的场景下用户可能在线,但是可能会 2 分钟不活跃,但是这也要算在同时在线数据中,所以取最近 5 分钟的数据就能计算进去了)
- ⭐ 实际案例:简单且常见的分维度分钟级别同时在线用户数,1 分钟输出一次,计算最近 5 分钟的数据
依然是 Group Window Aggregation、Windowing TVF 两种方案:
- ⭐ Group Window Aggregation 方案(支持 BatchStreaming 任务):
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT dim,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '1' minute, interval '5' minute) AS STRING)) * 1000 as window_start,
count(distinct user_id) as uv
FROM source_table
GROUP BY dim
, hop(row_time, interval '1' minute, interval '5' minute)
可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval '1' minute, interval '5' minute)。其中:
第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。
- ⭐ Windowing TVF 方案(1.13 只支持 Streaming 任务):
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(distinct user_id) as bucket_uv
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES)),包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '1' MINUTES 声明滚动窗口滑动步长大小为 1 min。第四个参数 INTERVAL '5' MINUTES 声明滚动窗口大小为 5 min。
- ⭐
SQL 语义:
滑动窗口语义和滚动窗口类似,这里不再赘述。
flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路
flink sql 知其所以然(九):window tvf tumble window 的奇思妙解
3.5.3.Session 窗口(SESSION)
- ⭐ Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。如下图对比所示:

session window
- ⭐ 实际案例:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开
目前 1.13 版本中 Flink SQL 不支持 Session 窗口的 Window TVF,所以这里就只介绍 Group Window Aggregation 方案:
- ⭐ Group Window Aggregation 方案(支持 BatchStreaming 任务):
-- 数据源表,用户购买行为记录表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT, -- 购买商品数量
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' minute) AS STRING)) * 1000 as window_start,
count(1) as pv
FROM source_table
GROUP BY dim
, session(row_time, interval '5' minute)
注意: 上述 SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持
处理时间也支持事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。
可以看到 Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中,即 session(row_time, interval '5' minute)。其中:
第一个参数为事件时间的时间戳;第二个参数为 Session gap 间隔。
- ⭐
SQL 语义:
Session 窗口语义和滚动窗口类似,这里不再赘述。
可以直接在公众号后台回复1.13.2 最全 flink sql获取源代码。所有的源码都开源到 github 上面了。里面包含了非常多的案例。可以直接拿来在本地运行的!!!肥肠的方便。
3.5.4.渐进式窗口(CUMULATE)
- ⭐ 渐进式窗口定义(1.13 只支持 Streaming 任务):渐进式窗口在其实就是
固定窗口间隔内提前触发的的滚动窗口,其实就是Tumble Window early-fire的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。如下图所示:

cumulate window
- ⭐ 应用场景:周期内累计 PV,UV 指标(如每天累计到当前这一分钟的 PV,UV)。这类指标是一段周期内的累计状态,对分析师来说更具统计分析价值,而且几乎所有的复合指标都是基于此类指标的统计(不然离线为啥都要累计一天的数据,而不要一分钟累计的数据呢)。
- ⭐ 实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别。举例如下:
明细输入数据:
time | id | money |
|---|---|---|
2021-11-01 00:01:00 | A | 3 |
2021-11-01 00:01:00 | B | 5 |
2021-11-01 00:01:00 | A | 7 |
2021-11-01 00:02:00 | C | 3 |
2021-11-01 00:03:00 | C | 10 |
预期经过渐进式窗口计算的输出数据:
time | count distinct id | sum money |
|---|---|---|
2021-11-01 00:01:00 | 2 | 15 |
2021-11-01 00:02:00 | 3 | 18 |
2021-11-01 00:03:00 | 3 | 28 |
转化为折线图长这样:

当日累计
可以看到,其特点就在于,每一分钟的输出结果都是当天零点累计到当前的结果。
渐进式窗口目前只有 Windowing TVF 方案支持:
- ⭐ Windowing TVF 方案(1.13 只支持 Streaming 任务):
-- 数据源表
CREATE TABLE source_table (
-- 用户 id
user_id BIGINT,
-- 用户
money BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
window_end bigint,
window_start bigint,
sum_money BIGINT,
count_distinct_id bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_start,
sum(money) as sum_money,
count(distinct id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end
可以看到 Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明渐进式窗口触发的渐进步长为 1 min。第四个参数 INTERVAL '1' DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。
- ⭐
SQL 语义:
渐进式窗口语义和滚动窗口类似,这里不再赘述。
3.5.5.Window TVF 支持 Grouping Sets、Rollup、Cube
具体应用场景:实际的案例场景中,经常会有多个维度进行组合(cube)计算指标的场景。如果把每个维度组合的代码写一遍,然后 union all 起来,这样写起来非常麻烦,而且会导致一个数据源读取多遍。
这时,有离线 Hive SQL 使用经验的小伙伴萌就会想到,如果有了 Grouping Sets,我们就可以直接用 Grouping Sets 将维度组合写在一条 SQL 中,写起来方便并且执行效率也高。当然,Flink 支持这个功能。
但是目前 Grouping Sets 只在 Window TVF 中支持,不支持 Group Window Aggregation。
来一个实际案例感受一下,计算每日零点累计到当前这一分钟的分汇总、age、sex、age sex 维度的用户数。
代码语言:javascript复制-- 用户访问明细表
CREATE TABLE source_table (
age STRING,
sex STRING,
user_id BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.age.length' = '1',
'fields.sex.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000'
);
CREATE TABLE sink_table (
age STRING,
sex STRING,
uv BIGINT,
window_end bigint
) WITH (
'connector' = 'print'
);
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end,
-- grouping sets 写法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
小伙伴萌这里需要注意下!!!
Flink SQL 中 Grouping Sets 的语法和 Hive SQL 的语法有一些不同,如果我们使用 Hive SQL 实现上述 SQL 的语义,其实现如下:
代码语言:javascript复制insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM source_table
GROUP BY
age
, sex
-- hive sql grouping sets 写法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
3.6.DML:Group 聚合
- ⭐ Group 聚合定义(支持 BatchStreaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中
按颜色分 key(横向)就是 Group 聚合,按窗口划分(纵向)就是窗口聚合。

tumble window key
- ⭐ 应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 count、sum 等聚合操作。
那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?
首先来举一个例子看看怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是按照 1 分钟的粒度进行聚合,如下 SQL:
- ⭐ 滚动窗口(TUMBLE)
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口写法划分窗口
tumble(row_time, interval '1' minute)
转换为 Group 聚合的写法如下:
- ⭐ Group 聚合
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 将秒级别时间戳 / 60 转化为 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
确实没错,上面这个转换是一点问题都没有的。
但是窗口聚合和 Group by 聚合的差异在于:
- ⭐ 本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出
- ⭐ 运行层面:窗口聚合是和
时间绑定的,窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。
- ⭐
SQL 语义
也是拿离线和实时做对比,Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:
- ⭐
数据源算子(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中 - ⭐
Group 聚合算子(group by key sumcountmaxmin):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sumcountmaxmin 结果。如果有结果oldResult,拿出来和当前的数据进行sumcountmaxmin计算出这个 key 的新结果newResult,并将新结果[key, newResult]更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult],然后再将新结果发往下游[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 summaxmin 结果newResult,并将新结果[key, newResult]更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送[key, newResult]。 - ⭐
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中
这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。
特别注意:
- Group by 聚合涉及到了回撤流(也叫 retract 流),会产生回撤流是因为从整个 SQL 的语义来看,上游的 Kafka 数据是源源不断的,无穷无尽的,那么每次这个 SQL 任务产出的结果都是一个中间结果,所以每次结果发生更新时,都需要将上一次发出的中间结果给撤回,然后将最新的结果发下去。
- Group by 聚合涉及到了状态:状态大小也取决于不同 key 的数量。为了防止状态无限变大,我们可以设置状态的 TTL。以上面的 SQL 为例,上面 SQL 是按照分钟进行聚合的,理论上到了今天,通常我们就可以不用关心昨天的数据了,那么我们可以设置状态过期时间为一天。关于状态过期时间的设置参数可以参考下文
运行时参数小节。
如果这个 SQL 放在 Hive 中执行时,其中 Orders 为 Hive,target_table 也为 Hive,其也会生成三个相同的算子,但是其和实时任务的执行方式完全不同:
- ⭐
数据源算子(From Order):数据源算子从 Order Hive 中读取到所有的数据,然后所有数据发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个算子中,然后这个算子就运行结束了,释放资源了 - ⭐
Group 聚合算子(group by sumcountmaxmin):接收到上游算子发的所有数据,然后遍历计算 sumcountmaxmin 结果,批量发给下游数据汇算子,这个算子也就运行结束了,释放资源了 - ⭐
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Hive 中,整个任务也就运行结束了,整个任务的资源也就都释放了
3.6.1.Group 聚合支持 Grouping sets、Rollup、Cube
Group 聚合也支持 Grouping sets、Rollup、Cube
举一个 Grouping sets 的案例:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)
3.7.DML:Over 聚合
- ⭐ Over 聚合定义(支持 BatchStreaming):可以理解为是一种特殊的滑动窗口聚合函数。
那这里我们拿 Over 聚合 与 窗口聚合 做一个对比,其之间的最大不同之处在于:
- ⭐ 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
- ⭐ Over 聚合:能够保留原始字段
注意: 其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?
- ⭐ 应用场景:计算最近一段滑动窗口的聚合结果数据。
- ⭐ 实际案例:查询每个产品最近一小时订单的金额总和:
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
Over 聚合的语法总结如下:
代码语言:javascript复制SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
其中:
- ⭐ ORDER BY:必须是时间戳列(事件时间、处理时间)
- ⭐ PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
- ⭐ range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为
按照行数聚合,第二种为按照时间区间聚合。如下案例所示:
a. ⭐ 时间区间聚合:
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。
代码语言:javascript复制CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 1 小时的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table
结果如下:
代码语言:javascript复制 I[2, 2021-12-24T22:08:26.583, 7, 73]
I[2, 2021-12-24T22:08:27.583, 7, 80]
I[2, 2021-12-24T22:08:28.583, 4, 84]
I[2, 2021-12-24T22:08:29.584, 7, 91]
I[2, 2021-12-24T22:08:30.583, 8, 99]
I[1, 2021-12-24T22:08:31.583, 9, 138]
I[2, 2021-12-24T22:08:32.584, 6, 105]
I[1, 2021-12-24T22:08:33.584, 7, 145]
b. ⭐ 行数聚合:
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。
代码语言:javascript复制CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table
预跑结果如下:
代码语言:javascript复制 I[2, 2021-12-24T22:18:19.147, 1, 9]
I[1, 2021-12-24T22:18:20.147, 2, 11]
I[1, 2021-12-24T22:18:21.147, 2, 12]
I[1, 2021-12-24T22:18:22.147, 2, 12]
I[1, 2021-12-24T22:18:23.148, 2, 12]
I[1, 2021-12-24T22:18:24.147, 1, 11]
I[1, 2021-12-24T22:18:25.146, 1, 10]
I[1, 2021-12-24T22:18:26.147, 1, 9]
I[2, 2021-12-24T22:18:27.145, 2, 11]
I[2, 2021-12-24T22:18:28.148, 1, 10]
I[2, 2021-12-24T22:18:29.145, 2, 10]
当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:
代码语言:javascript复制SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
3.8.DML:Joins
Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:
- ⭐ 动态表(流)与动态表(流)的 Join
- ⭐ 动态表(流)与外部维表(比如 Redis)的 Join
- ⭐ 动态表字段的列转行(一种特殊的 Join)
细分 Flink SQL 支持的 Join:
- ⭐ Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join
- ⭐ Interval Join:流与流的 Join,两条流一段时间区间内的 Join
- ⭐ Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join
- ⭐ Lookup Join:流与外部维表的 Join
- ⭐ Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行
- ⭐ Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join
3.8.1.Regular Join
- ⭐ Regular Join 定义(支持 BatchStreaming):Regular Join 其实就是和离线 Hive SQL 一样的 Regular Join,通过条件关联两条流数据输出。
- ⭐ 应用场景:Join 其实在我们的数仓建设过程中应用是非常广泛的。离线数仓可以说基本上是离不开 Join 的。那么实时数仓的建设也必然离不开 Join,比如日志关联扩充维度数据,构建宽表;日志通过 ID 关联计算 CTR。
- ⭐ Regular Join 包含以下几种(以
L作为左流中的数据标识,R作为右流中的数据标识):
- ⭐ Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出
[L, R] - ⭐ Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出
[L, R],没 Join 到输出[L, null]),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出-[L, null],然后输出[L, R] - ⭐ Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
- ⭐ Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出
[L, R],没 Join 到输出[null, R];对左流来说:Join 到输出[L, R],没 Join 到输出[L, null])。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤-[null, R],输出[L, R],右流数据到达为例:回撤-[L, null],输出[L, R])。
- ⭐ 实际案例:案例为曝光日志关联点击日志筛选既有曝光又有点击的数据,并且补充点击的扩展参数(show inner click):
下面这个案例为 Inner Join 案例:
-- 曝光日志数据
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '100'
);
-- 点击日志数据
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
-- 流的 INNER JOIN,条件为 log_id
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
代码语言:javascript复制 I[5, d, 5, f]
I[5, d, 5, 8]
I[5, d, 5, 2]
I[3, 4, 3, 0]
I[3, 4, 3, 3]
...
如果为 Left Join 案例:
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
代码语言:javascript复制 I[5, f3c, 5, c05]
I[5, 6e2, 5, 1f6]
I[5, 86b, 5, 1f6]
I[5, f3c, 5, 1f6]
-D[3, 4ab, null, null]
-D[3, 6f2, null, null]
I[3, 4ab, 3, 765]
I[3, 6f2, 3, 765]
I[2, 3c4, null, null]
I[3, 4ab, 3, a8b]
I[3, 6f2, 3, a8b]
I[2, c03, null, null]
...
如果为 Full Join 案例:
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
代码语言:javascript复制 I[null, null, 7, 6]
I[6, 5, null, null]
-D[1, c, null, null]
I[1, c, 1, 2]
I[3, 1, null, null]
I[null, null, 7, d]
I[10, 0, null, null]
I[null, null, 2, 6]
-D[null, null, 7, 6]
-D[null, null, 7, d]
...
关于 Regular Join 的注意事项:
- ⭐ 实时 Regular Join 可以不是
等值 join。等值 join和非等值 join区别在于,等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联 - ⭐ Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。
- ⭐ 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。
- ⭐
SQL 语义:
详细的 SQL 语义案例可以参考:
flink sql 知其所以然(十二):流 join 很难嘛???(上)
flink sql 知其所以然(十三):流 join 很难嘛???(下)
3.8.2.Interval Join(时间区间 Join)
- ⭐ Interval Join 定义(支持 BatchStreaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。
- ⭐ 应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。
- ⭐ Interval Join 包含以下几种(以
L作为左流中的数据标识,R作为右流中的数据标识):
- ⭐ Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 满足其他等值条件)才输出,输出
[L, R] - ⭐ Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出
[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。 - ⭐ Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反
- ⭐ Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出
[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出[L, null],右流过期输出-[null, R])
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
- ⭐ 实际案例:还是刚刚的案例,曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):
下面为 Inner Interval Join:
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;
输出结果如下:
代码语言:javascript复制6> I[2, a, 2, 6]
6> I[2, 6, 2, 6]
2> I[4, 1, 4, 5]
2> I[10, 8, 10, d]
2> I[10, 7, 10, d]
2> I[10, d, 10, d]
2> I[5, b, 5, d]
6> I[1, a, 1, 7]
如果是 Left Interval Join:
CREATE TABLE show_log (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time INTERVAL '5' SECOND;
输出结果如下:
代码语言:javascript复制 I[6, e, 6, 7]
I[11, d, null, null]
I[7, b, null, null]
I[8, 0, 8, 3]
I[13, 6, null, null]
如果是 Full Interval Join:
CREATE TABLE show_log (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '5',
'fields.log_id.max' = '15'
);
CREATE TABLE click_log (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time INTERVAL '5' SECOND;
输出结果如下:
代码语言:javascript复制 I[6, 1, null, null]
I[7, 3, 7, 8]
I[null, null, 6, 6]
I[null, null, 4, d]
I[8, d, null, null]
I[null, null, 3, b]
关于 Interval Join 的注意事项: ⭐ 实时 Interval Join 可以不是
等值 join。等值 join和非等值 join区别在于,等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出
- ⭐ SQL 语义:
关于详细的 SQL 语义可以参考。
flink sql 知其所以然(十三):流 join 很难嘛???(下)
3.8.3.Temporal Join(快照 Join)
- ⭐ Temporal Join 定义(支持 BatchStreaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做
拉链快照表,使用一个明细表去 join 这个拉链快照表的 join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做Versioned Table,使用一个明细表去 join 这个Versioned Table的 join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table其实就是对同一条 key(在 DDL 中以 primary key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择Versioned Table对应时间区间内的快照数据进行 join。 - ⭐ 应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在 12:00 之前(事件时间),人民币和美元汇率是 7:1,在 12:00 之后变为 6:1,那么在 12:00 之前数据就要按照 7:1 进行计算,12:00 之后就要按照 6:1 计算。在事件时间语义的任务中,事件时间 12:00 之前的数据,要按照 7:1 进行计算,12:00 之后的数据,要按照 6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做
Versioned Table。 - ⭐ Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率的场景的案例来看一下一个 Versioned Table 的两种定义方式。
- ⭐ PRIMARY KEY 定义方式:
-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
-- PRIMARY KEY 定义方式
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
- ⭐ Deduplicate 定义方式:
-- 定义一个 append-only 的数据源表
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
-- 将数据源表按照 Deduplicate 方式定义为 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time -- 1. 定义 `update_time` 为时间字段
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 `currency` 为主键
ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列
) AS rownum
FROM currency_rates)
WHERE rownum = 1;
- ⭐ Temporal Join 支持的时间语义:事件时间、处理时间
- ⭐ 实际案例:就是上文提到的汇率计算。
以 事件时间 任务举例:
-- 1. 定义一个输入订单表
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time,
FROM orders
-- 3. Temporal Join 逻辑
-- SQL 语法为:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:
代码语言:javascript复制order_id price 货币 汇率 order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
注意:
- ⭐ 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
- ⭐ 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。
还是相同的案例,如果是 处理时间 语义:
10:15> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:30> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
-- 10:42 时,Euro 的汇率从 114 变为 116
10:52> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 116 <==== 从 114 变为 116
Yen 1
-- 从 Orders 表查询数据
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== 在处理时间 10:15 到达的一条数据
1 US Dollar <== 在处理时间 10:30 到达的一条数据
2 Euro <== 在处理时间 10:52 到达的一条数据
-- 执行关联查询
SELECT
o.amount, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
-- 结果如下:
amount currency rate amount*rate
====== ========= ======= ============
2 Euro 114 228 <== 在处理时间 10:15 到达的一条数据
1 US Dollar 102 102 <== 在处理时间 10:30 到达的一条数据
2 Euro 116 232 <== 在处理时间 10:52 到达的一条数据
可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates 维护了最新的状态数据,不需要关心历史版本的数据。
3.8.4.Lookup Join(维表 Join)
- ⭐ Lookup Join 定义(支持 BatchStreaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。
- ⭐ 应用场景:小伙伴萌会问,我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来
- ⭐ 实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。
来一波输入数据:
曝光用户日志流(show_log)数据(数据存储在 kafka 中):
代码语言:javascript复制log_id timestamp user_id
1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 c
用户画像维表(user_profile)数据(数据存储在 redis 中):
代码语言:javascript复制user_id(主键) age sex
a 12-18 男
b 18-24 女
c 18-24 男
注意: redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。
具体 SQL:
代码语言:javascript复制CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
输出数据如下:
代码语言:javascript复制log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18 男
2 2021-11-01 00:03:00 b 18-24 女
3 2021-11-01 00:05:00 c 18-24 男
4 2021-11-01 00:06:00 b 18-24 女
5 2021-11-01 00:07:00 c 18-24 男
注意: 实时的 lookup 维表关联能使用
处理时间去做关联。
- ⭐ SQL 语义:
详细 SQL 语义及案例可见:
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)
其实,Flink 官方并没有提供 redis 的维表 connector 实现。
没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!
注意:
- ⭐ 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 的数据在 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08:01 failover 之后从 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
- ⭐ 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据
再说说维表常见的性能问题及优化思路。
所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。
举个例子:
- ⭐ 在没有使用维表的情况下:一条数据从输入 Flink 任务到输出 Flink 任务的时延假如为
0.1 ms,那么并行度为 1 的任务的吞吐可以达到1 query / 0.1 ms = 1w qps。 - ⭐ 在使用维表之后:每条数据访问维表的外部存储的时长为
2 ms,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到1 query / 2.1 ms = 476 qps。两者的吞吐量相差21 倍。
这就是为什么维表 join 的算子会产生背压,任务产出会延迟。
那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:
- ⭐ 按照 redis 维表的 key 分桶 local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。
- ⭐ 异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps。
- ⭐ 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。
博主认为上述优化效果中,最好用的是 1 3,2 相比 3 还是一条一条发请求,性能会差一些。
既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作
- ⭐ 按照 redis 维表的 key 分桶 local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做访问 redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 redis 压力。在博主实现的 redis connector 中,内置了 local cache 的实现,小伙伴萌可以参考下面这部篇文章进行配置。
- ⭐ 异步访问外存:目前博主实现的 redis connector 不支持异步访问,但是官方实现的 hbase connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
- ⭐ 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 redis 的批量访问外存优化的功能。具体可以参考下文。
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)
3.8.5.Array Expansion(数组列转行)
- ⭐ 应用场景(支持 BatchStreaming):将表中 ARRAY 类型字段(列)拍平,转为多行
- ⭐ 实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。
CREATE TABLE show_log_table (
log_id BIGINT,
show_params ARRAY<STRING>
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
log_id BIGINT,
show_param STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
log_id,
t.show_param as show_param
FROM show_log_table
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)
show_log_table 原始数据:
代码语言:javascript复制 I[7, [a, b, c]]
I[5, [d, e, f]]
输出结果如下所示:
代码语言:javascript复制-- I[7, [a, b, c]] 一行转为 3 行
I[7, a]
I[7, b]
I[7, b]
-- I[5, [d, e, f]] 一行转为 3 行
I[5, d]
I[5, e]
I[5, f]
3.8.6.Table Function(自定义列转行)
- ⭐ 应用场景(支持 BatchStreaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑
- ⭐ Table Function 使用分类:
- ⭐ Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 行转为 0 行,这行数据直接被丢弃
- ⭐ Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值
- ⭐ 实际案例:直接上 SQL 。
public class TableFunctionInnerJoin_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"
"._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';n"
"n"
"CREATE TABLE source_table (n"
" user_id BIGINT NOT NULL,n"
" name STRING,n"
" row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),n"
" WATERMARK FOR row_time AS row_time - INTERVAL '5' SECONDn"
") WITH (n"
" 'connector' = 'datagen',n"
" 'rows-per-second' = '10',n"
" 'fields.name.length' = '1',n"
" 'fields.user_id.min' = '1',n"
" 'fields.user_id.max' = '10'n"
");n"
"n"
"CREATE TABLE sink_table (n"
" user_id BIGINT,n"
" name STRING,n"
" age INT,n"
" row_time TIMESTAMP(3)n"
") WITH (n"
" 'connector' = 'print'n"
");n"
"n"
"INSERT INTO sink_tablen"
"SELECT user_id,n"
" name,n"
" age,n"
" row_timen"
"FROM source_table,n"
// Table Function Join 语法对应 LATERAL TABLE
"LATERAL TABLE(user_profile_table_func(user_id)) t(age)";
Arrays.stream(sql.split(";"))
.forEach(flinkEnv.streamTEnv()::executeSql);
}
public static class UserProfileTableFunction extends TableFunction<Integer> {
public void eval(long userId) {
// 自定义输出逻辑
if (userId <= 5) {
// 一行转 1 行
collect(1);
} else {
// 一行转 3 行
collect(1);
collect(2);
collect(3);
}
}
}
}
执行结果如下:
代码语言:javascript复制-- <= 5,则只有 1 行结果
I[3, 7, 1, 2021-05-01T18:23:42.560]
-- > 5,则有行 3 结果
I[8, e, 1, 2021-05-01T18:23:42.560]
I[8, e, 2, 2021-05-01T18:23:42.560]
I[8, e, 3, 2021-05-01T18:23:42.560]
-- <= 5,则只有 1 行结果
I[4, 9, 1, 2021-05-01T18:23:42.561]
-- > 5,则有行 3 结果
I[8, c, 1, 2021-05-01T18:23:42.561]
I[8, c, 2, 2021-05-01T18:23:42.561]
I[8, c, 3, 2021-05-01T18:23:42.561]
3.9 DML:集合操作
集合操作支持 BatchStreaming 任务。
- ⭐ UNION:将集合合并并且去重。

union
- ⭐ UNION ALL:将集合合并,不做去重。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
---
| s|
---
| c|
| a|
| b|
| d|
| e|
---
Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
---
| c|
---
| c|
| a|
| b|
| b|
| c|
| d|
| e|
| a|
| b|
| b|
---
- ⭐ Intersect:交集并且去重
- ⭐ Intersect ALL:交集不做去重
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
---
| s|
---
| a|
| b|
---
Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
---
| s|
---
| a|
| b|
| b|
---
- ⭐ Except:差集并且去重
- ⭐ Except ALL:差集不做去重
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
---
| s |
---
| c |
---
Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
---
| s |
---
| c |
| c |
---
上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流。
- ⭐ In 子查询:这个大家比较熟悉了,但是注意,In 子查询的结果集只能有一列
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
上述 SQL 的 In 子句其实就和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,大家注意设置 State 的 TTL。
3.10.DML:Order By、Limit 子句
3.10.1.Order By 子句
支持 BatchStreaming,但在实时任务中一般用的非常少。
实时任务中,Order By 子句中必须要有时间属性字段,并且时间属性必须为升序时间属性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column。
举例:
代码语言:javascript复制CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc
3.10.2.Limit 子句
支持 BatchStreaming,但实时场景一般不使用,但是此处依然举一个例子:
代码语言:javascript复制CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3
结果如下,只有 3 条输出:
代码语言:javascript复制 I[5]
I[9]
I[4]
3.11.DML:TopN 子句
- ⭐ TopN 定义(支持 BatchStreaming):TopN 其实就是对应到离线数仓中的 row_number(),可以使用 row_number() 对某一个分组的数据进行排序
- ⭐ 应用场景:根据
某个排序条件,计算某个分组下的排行榜数据 - ⭐ SQL 语法标准:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
- ⭐
ROW_NUMBER():标识 TopN 排序子句 - ⭐
PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的partition by key,就是根据需求中的搜索关键词(key)做为分区 - ⭐
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序 - ⭐
WHERE rownum <= N:这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个 TopN 的查询,其中 N 代表 TopN 的条目数 - ⭐
[AND conditions]:其他的限制条件也可以加上
- ⭐ 实际案例:取某个搜索关键词下的搜索热度前 10 名的词条数据。
输入数据为搜索词条数据的搜索热度数据,当搜索热度发生变化时,会将变化后的数据写入到数据源的 Kafka 中:
数据源 schema:
代码语言:javascript复制-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);
-- 数据汇 schema:
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
`timestamp` TIMESTAMP(3)
) WITH (
...
);
-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (
SELECT key, name, search_cnt, row_time,
-- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 100 名
ROW_NUMBER() OVER (PARTITION BY key
ORDER BY search_cnt desc) AS rownum
FROM source_table)
WHERE rownum <= 100
输出结果:
代码语言:javascript复制-D[关键词1, 词条1, 4944]
I[关键词1, 词条1, 8670]
I[关键词1, 词条2, 1735]
-D[关键词1, 词条3, 6641]
I[关键词1, 词条3, 6928]
-D[关键词1, 词条4, 6312]
I[关键词1, 词条4, 7287]
可以看到输出数据是有回撤数据的,为什么会出现回撤,我们来看看 SQL 语义。
- ⭐ SQL 语义
上面的 SQL 会翻译成以下三个算子:
- ⭐
数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进行 hash 分发到下游排序算子,相同的 key 数据将会发送到一个并发中 - ⭐
排序算子:为每个 Key 维护了一个 TopN 的榜单数据,接受到上游的一条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加入 TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据比原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据之前下发的排名数据撤回(即回撤数据),然后下发新的排名数据 - ⭐
数据汇:接收到上游的数据之后,然后输出到外部存储引擎中
上面三个算子也是会 24 小时一直运行的。
3.12.DML:Window TopN
- ⭐ Window TopN 定义(支持 Streaming):Window TopN 是一种特殊的 TopN,它的返回结果是每一个窗口内的 N 个最小值或者最大值。
- ⭐ 应用场景:小伙伴萌会问了,我有了 TopN 为啥还需要 Window TopN 呢?还记得上文介绍 TopN 说道的 TopN 时会出现中间结果,从而出现回撤数据的嘛?Window TopN 不会出现回撤数据,因为 Window TopN 实现是在窗口结束时输出最终结果,不会产生中间结果。而且注意,因为是窗口上面的操作,Window TopN 在窗口结束时,会自动把 State 给清除。
- ⭐ SQL 语法标准:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]
- ⭐ 实际案例:取当前这一分钟的搜索关键词下的搜索热度前 10 名的词条数据
输入表字段:
代码语言:javascript复制-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);
-- 输出表字段:
-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- window_start 窗口开始时间戳
-- window_end 窗口结束时间戳
CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
...
);
-- 处理 sql:
INSERT INTO sink_table
SELECT key, name, search_cnt, window_start, window_end
FROM (
SELECT key, name, search_cnt, window_start, window_end,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end, key
ORDER BY search_cnt desc) AS rownum
FROM (
SELECT window_start, window_end, key, name, max(search_cnt) as search_cnt
-- window tvf 写法
FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end, key, name
)
)
WHERE rownum <= 100
输出结果:
代码语言:javascript复制 I[关键词1, 词条1, 8670, 2021-1-28T22:34, 2021-1-28T22:35]
I[关键词1, 词条2, 6928, 2021-1-28T22:34, 2021-1-28T22:35]
I[关键词1, 词条3, 1735, 2021-1-28T22:34, 2021-1-28T22:35]
I[关键词1, 词条4, 7287, 2021-1-28T22:34, 2021-1-28T22:35]
...
可以看到结果是符合预期的,其中没有回撤数据。
- ⭐ SQL 语义
- ⭐
数据源:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,将数据按照窗口聚合的 key 通过 hash 分发策略发送到下游窗口聚合算子 - ⭐
窗口聚合算子:进行窗口聚合计算,随着时间的推进,将窗口聚合结果计算完成发往下游窗口排序算子 - ⭐
窗口排序算子:这个算子其实也是一个窗口算子,只不过这个窗口算子为每个 Key 维护了一个 TopN 的榜单数据,接受到上游发送的窗口结果数据进行排序,随着时间的推进,窗口的结束,将排序的结果输出到下游数据汇算子。 - ⭐
数据汇:接收到上游的数据之后,然后输出到外部存储引擎中
3.13.DML:Deduplication
- ⭐ Deduplication 定义(支持 BatchStreaming):Deduplication 其实就是去重,也即上文介绍到的 TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其排序字段一定是时间属性列,不能是其他非时间属性的普通列。在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。
- ⭐ 应用场景:比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重。
- ⭐ SQL 语法标准:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
其中:
- ⭐
ROW_NUMBER():标识当前数据的排序值 - ⭐
PARTITION BY col1[, col2...]:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序 - ⭐
ORDER BY time_attr [asc|desc]:标识排序规则,必须为时间戳列,当前 Flink SQL 支持处理时间、事件时间,ASC 代表保留第一行,DESC 代表保留最后一行 - ⭐
WHERE rownum = 1:这个子句是一定需要的,而且必须为 rownum = 1
- ⭐ 实际案例:
博主这里举两个案例:
- ⭐ 案例 1(事件时间):是腾讯 QQ 用户等级的场景,每一个 QQ 用户都有一个 QQ 用户等级,需要求出当前用户等级在
星星,月亮,太阳的用户数分别有多少。
-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用户 id',
level STRING COMMENT '用户等级',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件时间戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 数据汇:输出即每一个等级的用户数
CREATE TABLE sink_table (
level STRING COMMENT '等级',
uv BIGINT COMMENT '当前等级用户数',
row_time timestamp(3) COMMENT '时间戳'
) WITH (
'connector' = 'print'
);
-- 处理逻辑:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level
输出结果:
代码语言:javascript复制 I[等级 1, 6928, 2021-1-28T22:34]
-I[等级 1, 6928, 2021-1-28T22:34]
I[等级 1, 8670, 2021-1-28T22:34]
-I[等级 1, 8670, 2021-1-28T22:34]
I[等级 1, 77287, 2021-1-28T22:34]
...
可以看到其有回撤数据。
其对应的 SQL 语义如下:
- ⭐
数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 key 通过 hash 分发策略发送到下游去重算子 - ⭐
Deduplication 去重算子:接受到上游数据之后,根据 order by 中的条件判断当前的这条数据和之前数据时间戳大小,以上面案例来说,如果当前数据时间戳大于之前数据时间戳,则撤回之前向下游发的中间结果,然后将最新的结果发向下游(发送策略也为 hash,具体的 hash 策略为按照 group by 中 key 进行发送),如果当前数据时间戳小于之前数据时间戳,则不做操作。次算子产出的结果就是每一个用户的对应的最新等级信息。 - ⭐
Group by 聚合算子:接受到上游数据之后,根据 Group by 聚合粒度对数据进行聚合计算结果(每一个等级的用户数),发往下游数据汇算子 - ⭐
数据汇:接收到上游的数据之后,然后输出到外部存储引擎中 - ⭐ 案例 2(处理时间):最原始的日志是明细数据,需要我们根据用户 id 筛选出这个用户当天的第一条数据,发往下游,下游可以据此计算分各种维度的 DAU
-- 数据源:原始日志明细数据
CREATE TABLE source_table (
user_id BIGINT COMMENT '用户 id',
name STRING COMMENT '用户姓名',
server_timestamp BIGINT COMMENT '用户访问时间戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);
-- 数据汇:根据 user_id 去重的第一条数据
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 处理逻辑:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1
输出结果:
代码语言:javascript复制 I[1, 用户 1, 2021-1-28T22:34]
I[2, 用户 2, 2021-1-28T22:34]
I[3, 用户 3, 2021-1-28T22:34]
...
可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:
- ⭐
数据源:消费到 Kafka 中数据后,将数据按照 partition by 的 key 通过 hash 分发策略发送到下游去重算子 - ⭐
Deduplication 去重算子:处理时间语义下,如果是当前 key 的第一条数据,则直接发往下游,如果判断(根据 state 中是否存储过改 key)不是第一条,则直接丢弃 - ⭐
数据汇:接收到上游的数据之后,然后输出到外部存储引擎中
注意: 在 Deduplication 关于是否会出现回撤流,博主总结如下:
- ⭐ Order by 事件时间 DESC:会出现回撤流,因为当前 key 下
可能会有比当前事件时间还大的数据 - ⭐ Order by 事件时间 ASC:会出现回撤流,因为当前 key 下
可能会有比当前事件时间还小的数据 - ⭐ Order by 处理时间 DESC:会出现回撤流,因为当前 key 下
可能会有比当前处理时间还大的数据 - ⭐ Order by 处理时间 ASC:不会出现回撤流,因为当前 key 下
不可能会有比当前处理时间还小的数据
3.14.EXPLAIN 子句
- ⭐ 应用场景:EXPLAIN 子句其实就是用于查看当前这个 sql 查询的逻辑计划以及优化的执行计划。
- ⭐ SQL 语法标准:
EXPLAIN PLAN FOR <query_statement_or_insert_statement>
- ⭐ 实际案例:
public class Explain_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(1);
String sql = "CREATE TABLE source_table (n"
" user_id BIGINT COMMENT '用户 id',n"
" name STRING COMMENT '用户姓名',n"
" server_timestamp BIGINT COMMENT '用户访问时间戳',n"
" proctime AS PROCTIME()n"
") WITH (n"
" 'connector' = 'datagen',n"
" 'rows-per-second' = '1',n"
" 'fields.name.length' = '1',n"
" 'fields.user_id.min' = '1',n"
" 'fields.user_id.max' = '10',n"
" 'fields.server_timestamp.min' = '1',n"
" 'fields.server_timestamp.max' = '100000'n"
");n"
"n"
"CREATE TABLE sink_table (n"
" user_id BIGINT,n"
" name STRING,n"
" server_timestamp BIGINTn"
") WITH (n"
" 'connector' = 'print'n"
");n"
"n"
"EXPLAIN PLAN FORn"
"INSERT INTO sink_tablen"
"select user_id,n"
" name,n"
" server_timestampn"
"from (n"
" SELECTn"
" user_id,n"
" name,n"
" server_timestamp,n"
" row_number() over(partition by user_id order by proctime) as rnn"
" FROM source_tablen"
")n"
"where rn = 1";
/**
* 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}
* -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}
*/
for (String innerSql : sql.split(";")) {
TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);
tableResult.print();
}
}
}
上述代码执行结果如下:
代码语言:javascript复制1. 抽象语法树
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])
- LogicalFilter(condition=[=($3, 1)])
- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
- LogicalTableScan(table=[[default_catalog, default_database, source_table]])
2. 优化后的物理计划
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
- Calc(select=[user_id, name, server_timestamp])
- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])
- Exchange(distribution=[hash[user_id]])
- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])
- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])
3. 优化后的执行计划
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
- Calc(select=[user_id, name, server_timestamp])
- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])
- Exchange(distribution=[hash[user_id]])
- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])
- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])
3.15.USE 子句
- ⭐ 应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module
- ⭐ SQL 语法标准:
- ⭐ 切换 Catalog
USE CATALOG catalog_name
- ⭐ 使用 Module
USE MODULES module_name1[, module_name2, ...]
- ⭐ 切换 Database
USE db名称
- ⭐ 实际案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();
// -----------------
// | catalog name |
// -----------------
// | default_catalog |
// | cat1 |
// -----------------
// change default catalog
tEnv.executeSql("USE CATALOG cat1");
tEnv.executeSql("SHOW DATABASES").print();
// databases are empty
// ---------------
// | database name |
// ---------------
// ---------------
// create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();
// ---------------
// | database name |
// ---------------
// | db1 |
// ---------------
// change default database
tEnv.executeSql("USE db1");
// change module resolution order and enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();
// ------------- -------
// | module name | used |
// ------------- -------
// | hive | true |
// | core | false |
// ------------- -------
3.16.SHOW 子句
- ⭐ 应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。
- ⭐ SQL 语法标准:
SHOW CATALOGS:展示所有 Catalog
SHOW CURRENT CATALOG:展示当前的 Catalog
SHOW DATABASES:展示当前 Catalog 下所有 Database
SHOW CURRENT DATABASE:展示当前的 Database
SHOW TABLES:展示当前 Database 下所有表
SHOW VIEWS:展示所有视图
SHOW FUNCTIONS:展示所有的函数
SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)
- ⭐ 实际案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();
// -----------------
// | catalog name |
// -----------------
// | default_catalog |
// -----------------
// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();
// ----------------------
// | current catalog name |
// ----------------------
// | default_catalog |
// ----------------------
// show databases
tEnv.executeSql("SHOW DATABASES").print();
// ------------------
// | database name |
// ------------------
// | default_database |
// ------------------
// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();
// -----------------------
// | current database name |
// -----------------------
// | default_database |
// -----------------------
// create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
// show tables
tEnv.executeSql("SHOW TABLES").print();
// ------------
// | table name |
// ------------
// | my_table |
// ------------
// create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");
// show views
tEnv.executeSql("SHOW VIEWS").print();
// -----------
// | view name |
// -----------
// | my_view |
// -----------
// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();
// ---------------
// | function name |
// ---------------
// | mod |
// | sha256 |
// | ... |
// ---------------
// create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");
// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();
// ---------------
// | function name |
// ---------------
// | f1 |
// | ... |
// ---------------
// show modules
tEnv.executeSql("SHOW MODULES").print();
// -------------
// | module name |
// -------------
// | core |
// -------------
// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();
// ------------- -------
// | module name | used |
// ------------- -------
// | core | true |
// | hive | false |
// ------------- -------
3.17.LOAD、UNLOAD 子句
- ⭐ 应用场景:我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module
- ⭐ SQL 语法标准:
-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name
- ⭐ 实际案例:
- ⭐ LOAD 案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 加载 Flink SQL 体系内置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();
// -------------
// | module name |
// -------------
// | core |
// | hive |
// -------------
- ⭐ UNLOAD 案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 卸载唯一的一个 CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();
// 结果啥 Moudle 都没有了
3.18.SET、RESET 子句
- ⭐ 应用场景:SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。
- ⭐ SQL 语法标准:
SET (key = value)?
RESET (key)?
- ⭐ 实际案例:
启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:
代码语言:javascript复制Flink SQL> SET table.planner = blink;
[INFO] Session property has been set.
Flink SQL> SET;
table.planner=blink;
Flink SQL> RESET table.planner;
[INFO] Session property has been reset.
Flink SQL> RESET;
[INFO] All session properties have been set to their default values.
3.19.SQL Hints
- ⭐ 应用场景:比如有一个 kafka 数据源表 kafka_table1,用户想直接从
latest-offsetselect 一些数据出来预览,其元数据已经存储在 Hive MetaStore 中,但是 Hive MetaStore 中存储的配置中的scan.startup.mode是earliest-offset,通过 SQL Hints,用户可以在 DML 语句中将scan.startup.mode改为latest-offset查询,因此可以看出 SQL Hints 常用语这种比较临时的参数修改,比如 Ad-hoc 这种临时查询中,方便用户使用自定义的新的表参数而不是 Catalog 中已有的表参数。 - ⭐ SQL 语法标准:
以下 DML SQL 中的 /* OPTIONS(key=val [, key=val]*) */ 就是 SQL Hints。
SELECT *
FROM table_path /* OPTIONS(key=val [, key=val]*) */
- ⭐ 实际案例:
启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:
代码语言:javascript复制CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- 1. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select id, name from kafka_table1 /* OPTIONS('scan.startup.mode'='earliest-offset') */;
-- 2. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select * from
kafka_table1 /* OPTIONS('scan.startup.mode'='earliest-offset') */ t1
join
kafka_table2 /* OPTIONS('scan.startup.mode'='earliest-offset') */ t2
on t1.id = t2.id;
-- 3. 使用 'sink.partitioner'='round-robin' 覆盖原来的 Sink 表的 sink.partitioner
insert into kafka_table1 /* OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;


