flink流处理示例开发
flink 开发示例一、版本和开发工具二、Flink 程序开发步骤三、开发示例四、运行一、版本和开发工具flink版本:1.13.3开发工具:Intellij IDEAJava版本:1.8.0_261二、Flink 程序开发步骤1、获得一个执行环境2、加载或者创建初始化数据3、指定操作数据的Transaction算子4、指定计算好的数据的存放位置5、调用execute()触发程序执行三、开发示例1
·
flink流处理示例开发
flink处理分流处理(streaming)和批处理(batch)。本示例来展示flink流处理开发的一般步骤和方法。
一、版本和开发工具
flink版本:1.13.3
开发工具:Intellij IDEA
Java版本:1.8.0_261
二、Flink 程序开发步骤
1、获得一个执行环境
2、加载或者创建初始化数据
3、指定操作数据的Transaction算子
4、指定计算好的数据的存放位置
5、调用execute()触发程序执行
三、开发示例
1、创建项目,填写项目名称,存放路径、包名、版本号等
2、添加依赖,在本机注释掉scope,当发布到flink集群时,需要添加scope,因为集群已经存在这些依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.3</flink.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
3、开发需求:通过Socket模拟产生单词,实现每隔1s对最近2s内的数据进行汇总计算
4、java程序结构如下,代码下载地址:Flink开发示例源代码
5、核心代码如下
public static void main(String[] args) throws Exception {
// 获取socket端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.out.println("没有设置端口号。使用默认端口号9109");
port = 9109;
}
// 获取Flink 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取数据
String hostname = "node01";
String delimiter = "\n";
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
// timeWindow 已过时,建议使用window
DataStream<WordCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String s, Collector<WordCount> collector) throws Exception {
String[] splits = s.split("\\s");
for (String word : splits) {
collector.collect(new WordCount(word, 1L));
}
}
}).keyBy(new StreamWordCoutKeySelector())
.window(SlidingProcessingTimeWindows.of(Time.seconds(2),Time.seconds(1)))
.sum("count");
// 把数据打印到控制台并且设置并行度
wordCounts.print().setParallelism(1);
// 触发执行程序
env.execute("Socket window count");
}
四、运行
1、打开终端,输入命令:nc -l 9109
2、运行SocketWindowWorldCount,会出现异常,直接退出。如下图所示,查看日志有两处错误:
1)是由于依赖中缺少slf4j的依赖,添加依赖slf4j-simple
2)缺少flink执行环境的依赖,添加flink客户端依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
</dependency>
3、再次运行,在终端输入单词,可以看到统计的单词个数
更多推荐
已为社区贡献2条内容
所有评论(0)