Apache Flink-基于Java专案模板建立Flink应用(流计算和批计算) Apache Flink建立模板专案有2种方式: 1. 通过Maven archetype命令建立; 2. 通过Flink 提供的Quickstart shell指令码建立; 关于Apache Flink的环境搭建,请参考相关连结: Apache Flink快速入门-基本架构、核心概念和执行流程 Apache Flink v1.8 本地单机环境安装和执行Flink应用 mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.0 -DgroupId=com.rickie -DartifactId=flink-tutorial -Dversion=0.1 -Dpackage=com.rickie.tutorial -DinteractiveMode=false 引数说明: 原型archetype有关引数表 专案相关引数: 通过上述mvn 命令建立的Java模板专案结构。 从上述专案结构可以看出,该专案是一个比较完善的Maven专案,其中Java程式码部分,BatchJob.java和StreamingJob.java 分别对应Flink 批量界面DataSet的例项程式码和流式界面DataStream的例项程式码。 开启StreamingJob.java档案,实现简单的单词统计(Word Count)业务功能。 具体程式码如下所示。 package com.rickie.tutorial; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * Skeleton for a Flink Streaming Job. * * For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the Flink Website. * * To package your application into a JAR file for execution, run * \mvn clean package\ on the command line. * * If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for \mainClass\). */ public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment // 设定streaming执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 连线socket获取输入的资料 DataStream text = env.socketTextStream(127.0.0.1, 9000); // split(\\W+) 使用非数字字母切分字串 DataStream> dataStream = text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String s, Collector> collector) throws Exception { String[] tokens = s.toLowerCase().split(\\W+); for(String token : tokens) { if(token.length() > 0) { collector.collect(new Tuple2(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1); dataStream.print(); // execute program env.execute(Flink Streaming Java API Skeleton); } } nc -l -p 9000 然后,在IDEA中执行StreamingJob,和本地server socket 9000埠建立连线,如下图所示。 接着,在nc命令视窗,输入一些单词,如下所示。 StreamingJob应用根据实现的业务逻辑,进行单词聚合,并输出。单词在5秒的时间视窗(翻滚时间视窗)中计算并打印到stdout。 说明:timeWindow(Time.seconds(5)),只有一个引数,表示是翻滚时间视窗(Tumbling window),即不重叠的时间视窗,只统计本视窗内的资料。 因为没启动Flink服务,所以去localhost:8081的web UI中进行监控。程式码 StreamExecutionEnvironment.getExecutionEnvironment()会建立一个LocalEnvironment,然后在Java虚拟机器上执行。 在Linux/Flink单机模式下执行 Linux 单机模式下启动Flink相当简单,直接执行bin/start-cluster.sh,会启动Flink的JobManager和TaskManager两个程序。如果想将上述程式提交到Flink,需要执行maven命令打成jar包,然后在命令列中,进入到bin目录下执行 flink run xxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务视窗中输出。 在IDEA中,开启BatchJob.java档案,编写如下程式码,实现批量计算逻辑。 package com.rickie.tutorial; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Skeleton for a Flink Batch Job. * * For a tutorial how to write a Flink batch application, check the * tutorials and examples on the Flink Website. * * To package your application into a JAR file for execution, * change the main class in the POM.xml file to this class (simply search for \mainClass\) * and run \mvn clean package\ on the command line. */ public class BatchJob { public static void main(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 通过字串构建资料集 DataSet text = env.fromElements( I am Rickie , Hello Rickie, Good morning ... Rickie ); // 分割字串,按照key进行分组,统计相同的key个数 DataSet> wordCount = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); // Print wordCount.print(); // execute program // env.execute(Flink Batch Java API Skeleton); } // 分割字串的方法 public static class LineSplitter implements FlatMapFunction> { @Override public void flatMap(String line, Collector> out) { for (String word : line.split( )) { out.collect(new Tuple2(word, 1)); } } } } 在IDEA中执行,检视输出的单词聚合结果。1. 通过Maven archetype建立Flink专案
#使用Maven建立2. 编写业务程式码
将上述专案汇入到IDEA中,Flink应用程序模板如下图所示。3. IDEA中执行应用、测试执行效果
首先,使用nc命令启动一个本地9000埠,和上一步程式码中的埠号保持一致。4. 使用IDEA开发批计算应用
Flink支援DataSet API 用于处理批量资料,资料集通过source进行初始化,例如读取档案或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)完成资料集转换操作,然后通过sink进行储存,既可以写入HDFS这种分散式档案系统,也可以打印控制台。