智能

Apache Flink-基于Java专案模板建立Flink应用流计算和批计算

Apache Flink-基于Java专案模板建立Flink应用(流计算和批计算)

Apache Flink建立模板专案有2种方式:

1. 通过Maven archetype命令建立;

2. 通过Flink 提供的Quickstart shell指令码建立;

关于Apache Flink的环境搭建,请参考相关连结:

Apache Flink快速入门-基本架构、核心概念和执行流程

Apache Flink v1.8 本地单机环境安装和执行Flink应用

1. 通过Maven archetype建立Flink专案

#使用Maven建立

mvn archetype:generate

-DarchetypeGroupId=org.apache.flink

-DarchetypeArtifactId=flink-quickstart-java

-DarchetypeVersion=1.8.0

-DgroupId=com.rickie

-DartifactId=flink-tutorial

-Dversion=0.1

-Dpackage=com.rickie.tutorial

-DinteractiveMode=false

引数说明:

原型archetype有关引数表

专案相关引数:

通过上述mvn 命令建立的Java模板专案结构。

从上述专案结构可以看出,该专案是一个比较完善的Maven专案,其中Java程式码部分,BatchJob.java和StreamingJob.java 分别对应Flink 批量界面DataSet的例项程式码和流式界面DataStream的例项程式码。

2. 编写业务程式码

将上述专案汇入到IDEA中,Flink应用程序模板如下图所示。

开启StreamingJob.java档案,实现简单的单词统计(Word Count)业务功能。

具体程式码如下所示。

package com.rickie.tutorial;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

/**

* Skeleton for a Flink Streaming Job.

*

*

For a tutorial how to write a Flink streaming application, check the

* tutorials and examples on the Flink Website.

*

*

To package your application into a JAR file for execution, run

* \mvn clean package\ on the command line.

*

*

If you change the name of the main class (with the public static void main(String[] args))

* method, change the respective entry in the POM.xml file (simply search for \mainClass\).

*/

public class StreamingJob {

public static void main(String[] args) throws Exception {

// set up the streaming execution environment

// 设定streaming执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 连线socket获取输入的资料

DataStream text = env.socketTextStream(127.0.0.1, 9000);

// split(\\W+) 使用非数字字母切分字串

DataStream> dataStream = text.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String s, Collector> collector) throws Exception {

String[] tokens = s.toLowerCase().split(\\W+);

for(String token : tokens) {

if(token.length() > 0) {

collector.collect(new Tuple2(token, 1));

}

}

}

}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

dataStream.print();

// execute program

env.execute(Flink Streaming Java API Skeleton);

}

}

3. IDEA中执行应用、测试执行效果

首先,使用nc命令启动一个本地9000埠,和上一步程式码中的埠号保持一致。

nc -l -p 9000

然后,在IDEA中执行StreamingJob,和本地server socket 9000埠建立连线,如下图所示。

接着,在nc命令视窗,输入一些单词,如下所示。

StreamingJob应用根据实现的业务逻辑,进行单词聚合,并输出。单词在5秒的时间视窗(翻滚时间视窗)中计算并打印到stdout。

说明:timeWindow(Time.seconds(5)),只有一个引数,表示是翻滚时间视窗(Tumbling window),即不重叠的时间视窗,只统计本视窗内的资料。

因为没启动Flink服务,所以去localhost:8081的web UI中进行监控。程式码 StreamExecutionEnvironment.getExecutionEnvironment()会建立一个LocalEnvironment,然后在Java虚拟机器上执行。

在Linux/Flink单机模式下执行

Linux 单机模式下启动Flink相当简单,直接执行bin/start-cluster.sh,会启动Flink的JobManager和TaskManager两个程序。如果想将上述程式提交到Flink,需要执行maven命令打成jar包,然后在命令列中,进入到bin目录下执行 flink run xxx/xxx/xxx.jar 即可,输出结果会在TaskManager的服务视窗中输出。

4. 使用IDEA开发批计算应用

Flink支援DataSet API 用于处理批量资料,资料集通过source进行初始化,例如读取档案或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)完成资料集转换操作,然后通过sink进行储存,既可以写入HDFS这种分散式档案系统,也可以打印控制台。

在IDEA中,开启BatchJob.java档案,编写如下程式码,实现批量计算逻辑。

package com.rickie.tutorial;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

/**

* Skeleton for a Flink Batch Job.

*

*

For a tutorial how to write a Flink batch application, check the

* tutorials and examples on the Flink Website.

*

*

To package your application into a JAR file for execution,

* change the main class in the POM.xml file to this class (simply search for \mainClass\)

* and run \mvn clean package\ on the command line.

*/

public class BatchJob {

public static void main(String[] args) throws Exception {

// set up the batch execution environment

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 通过字串构建资料集

DataSet text = env.fromElements(

I am Rickie ,

Hello Rickie,

Good morning ... Rickie

);

// 分割字串,按照key进行分组,统计相同的key个数

DataSet> wordCount = text

.flatMap(new LineSplitter())

.groupBy(0)

.sum(1);

// Print

wordCount.print();

// execute program

// env.execute(Flink Batch Java API Skeleton);

}

// 分割字串的方法

public static class LineSplitter implements FlatMapFunction> {

@Override

public void flatMap(String line, Collector> out) {

for (String word : line.split( )) {

out.collect(new Tuple2(word, 1));

}

}

}

}

在IDEA中执行,检视输出的单词聚合结果。

你可能也会喜欢...