这篇文章简要介绍 Flink TopN 计算流程。
概述
TopN 实现的功能用一句话来描述: 得到一个统计周期内排名前几位(TopN)的数据。它包括两个重要的步骤:
- 对数据进行分组,得到统计周期内(如 1分钟)一个分组(一个元素)的聚合结果(如1分钟内的平均值);
- 归并所有的分组,并对分组进行排序,输出排名前几位的数据。
代码实例
在这个实例中,采集传感器的数据,统计每一个传感器 15S 内的平均值,并根据平均值,获取排名前 3位的传感器,操作步骤如下:
- 采集传感器数据数据;
- 根据传感器 id 进行分组,并计算出每一个传感器每 15S 的温度平均值;
- 将得到的数据按照时间窗口进行分组,将同个窗口内的传感器数据存入状态列表;
- 待所有的传感器数据都加入到列表后,对列表进行排序,输出前几位的数据。
代码如下所示:
1 | public class TopNStreamJob { |
相关类说明:
AvgValue
, 代表传感器平均值对象,用于增量累加计算;SensorReading
, 代表传感器对象,包括传感器 id, 时间戳及温度值;TopSensor
, 代表开窗计算之后的传感器对象,包括传感器 id, 窗口开始时间,窗口结束时间及平均值。
在 TopSensorKeyedProcessFunction
对象中,使用一个列表状态对象 ListState
来存储 TopSensor
对象,什么时候触发排序操作呢?需要等待所有的传感器数据开窗计算结束。在这里,使用了一个技巧,定义了时间戳为 WindowEnd + 1
的定时器,它在开窗计算之后执行,从而保证在所有的数据已经加入到 ListState
之后再执行排序操作。
测试输出
输入以下测试数据:1
2
3
4
5
6
7
8
9sensor_1,1663041360,20.0
sensor_2,1663041360,30.0
sensor_3,1663041361,10.0
sensor_4,1663041361,36.0
sensor_1,1663041365,30.0
sensor_3,1663041365,30.0
sensor_1,1663041366,10.0
sensor_1,1663041377,10.0
sensor_1,1663041378,20.0
结果如下:1
2
3
4
5
6
7top>
====================================
时间: 1663041375000
No0: sensorId=sensor_4 平均值=36.0
No1: sensorId=sensor_2 平均值=30.0
No2: sensorId=sensor_1 平均值=20.0
====================================
结果分析:
窗口为 [1663041360000,1663041375000), 因为在代码中设置了 forBoundedOutOfOrderness(Duration.ofSeconds(2))
, 可以接受 2S 的乱序数据,即 Watermark
推迟 2S. 在该窗口下,sensor_1,1663041377,10.0
时间戳为 1663041377
S, 比窗口结束时间大 2S, 触发窗口计算,但此时未触发定时器执行,因为定时器时间比窗口时间大 1MS。收到 sensor_1,1663041378,20.0
后才真正定时器执行,得到 Top3 数据。
工程代码:https://github.com/noahsarkzhang-ts/flink-lab/tree/main/flink-topn-training)