这篇文章编写一个简单的 World Count
程序。
项目配置
项目约定
本工程使用的环境如下:
- Flink 版本:1.15
- IDE: IDEA 社区版
- JDK 版本:JDK 9
- 构建工具:Maven
创建项目
可以使用 maven 命令创建一个模板工程:1
2
3
4mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.15.0
或在 IDEA 中根据 Archetype
创建一个工程。
通过上面两种方式,会引入依赖的 Flink Jar, 同时生成一个骨架程序,便可在其中添加逻辑程序。
说明:
- 引入的 Flink Jar 包 Maven Scope 类型为
provided
, 执行时会报错误: 无法初始化主类 org.example.HelloWorldStreamJob
, 需要在Configurations
中加入Include dependencies with ‘Provided’ scope
选项; - 工程中引入了
maven-shade-plugin
插件,它会将所有的依赖连同代码打成一个 fat jar, 并指定启动类mainClass
.
骨架代码
程序的功能如下:从 TCP 9000 端口读取字符串,并统计每 15 S 单词出现的次数。
1 | public class HelloWorldStreamJob { |
代码流程如下:
- 设置执行环境为流执行环境;
- 从执行参加中读取
host
,port
参数; - 将时间设置为处理时间语义;
- 建立 TCP 连接,读取文本流;
- 将字符串按照空格分隔,得到二元组 <world, 1>, 第一个元素为单词,第二个元素为数字1,表示单词的数量;
- 将二元组按照第一个元素,即单词分组,然后按照时间进行分桶,统计每 15S 单词出现的次数;
- 设置并行度为1;
- 定义任务名称并启动任务。
任务执行
启动 nc
1 | # nc -lk 9000 |
启动任务
设置参数:
执行结果:
1 | (hello,1) |
代码部署
使用 Maven 打包输出 hello-count.jar, 有两种方式把任务提交到集群。
命令行模式
1 | # 切换到 Flink 目录 |
Flink Dashboard
Flink 提供了一个 Web Dashboard, 地址一般为:http://jobmanager:8081
, 可以在上面提交任务。
可以在 Task Managers
菜单中查看执行日志:
另外,也可以在页面上管理任务,如取消、停止任务等等。
参考: