flink处理分流处理(streaming)和批处理(batch)。本示例来展示flink流处理开发的一般步骤和方法。

一、版本和开发工具

flink版本:1.13.3
开发工具:Intellij IDEA
Java版本:1.8.0_261

二、Flink 程序开发步骤

1、获得一个执行环境
2、加载或者创建初始化数据
3、指定操作数据的Transaction算子
4、指定计算好的数据的存放位置
5、调用execute()触发程序执行

三、开发示例

1、创建项目,填写项目名称,存放路径、包名、版本号等
在这里插入图片描述
在这里插入图片描述

2、添加依赖,在本机注释掉scope,当发布到flink集群时,需要添加scope,因为集群已经存在这些依赖

<properties>
   <maven.compiler.source>8</maven.compiler.source>
   <maven.compiler.target>8</maven.compiler.target>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.13.3</flink.version>
   <scala.version>2.12</scala.version>
</properties>
<dependencies>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
  </dependency>
</dependencies>

3、开发需求:通过Socket模拟产生单词,实现每隔1s对最近2s内的数据进行汇总计算
4、java程序结构如下,代码下载地址:Flink开发示例源代码
在这里插入图片描述
5、核心代码如下

public static void main(String[] args) throws Exception {
        // 获取socket端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.out.println("没有设置端口号。使用默认端口号9109");
            port = 9109;
        }

        // 获取Flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        String hostname = "node01";
        String delimiter = "\n";
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // timeWindow 已过时,建议使用window
        DataStream<WordCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordCount>() {
                @Override
                public void flatMap(String s, Collector<WordCount> collector) throws Exception {
                    String[] splits = s.split("\\s");
                    for (String word : splits) {
                        collector.collect(new WordCount(word, 1L));
                    }
                }
            }).keyBy(new StreamWordCoutKeySelector())
            .window(SlidingProcessingTimeWindows.of(Time.seconds(2),Time.seconds(1)))
            .sum("count");

        // 把数据打印到控制台并且设置并行度
        wordCounts.print().setParallelism(1);

        // 触发执行程序
        env.execute("Socket window count");
    }

四、运行

1、打开终端,输入命令:nc -l 9109
在这里插入图片描述
2、运行SocketWindowWorldCount,会出现异常,直接退出。如下图所示,查看日志有两处错误:
1)是由于依赖中缺少slf4j的依赖,添加依赖slf4j-simple
2)缺少flink执行环境的依赖,添加flink客户端依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.version}</artifactId>
    <version>${flink.version}</version>
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-simple</artifactId>
   <version>1.7.32</version>
</dependency>

在这里插入图片描述

3、再次运行,在终端输入单词,可以看到统计的单词个数
在这里插入图片描述

在这里插入图片描述

Logo

智屏生态联盟致力于大屏生态发展,利用大屏快应用技术降低开发者开发、发布大屏应用门槛

更多推荐