加入收藏 | 设为首页 |

星座屋-Flink快速入门

海外新闻 时间: 浏览:222 次

第1章针对Flink的根本原理、架构和组件进行了剖析,本章开端快速完结一个Flink的入门事例,这样能够加深对之前内容的了解。

2.1 Flink开发环境剖析

2.1.1 开发工具引荐

在实战之前,需求先阐明一下开发工具的问题。官方主张运用IntelliJ IDEA,由于它默许集成了Scala和Maven环境,运用愈加便利,当然运用Eclipse也是能够的。

开发Flink程序时,能够运用Java或许Scala言语,个人主张运用Scala,由于运用Scala完结函数式编程会比较简练。当然运用Java也能够,只不过完结起来代码逻辑比较粗笨算了。

在开发Flink程序的时分,主张运用Maven办理依靠。针对Maven库房,主张运用国内镜像库房地址,由于国外库房下载较慢,能够运用国内阿里云的Maven库房。

留意:假如发现依靠国内源无法下载的时分,记住切换回国外源。运用国内阿里云Maven库房镜像进行相关装备时,需求修正$Maven_HOME/conf/settings.xml文件。


aliMaven
aliyun Maven
http://Maven.aliyun.com/nexus/content/groups/public/
central

2.1.2 Flink程序依靠装备

在运用Maven办理Flink程序相关依靠的时分,需求提早将它们装备好。对应的Maven项目创立完结今后,也需求在这个项目的pom.xml文件中进行相关装备。

运用Java言语开发Flink程序的时分需求增加以下装备。

留意:在这里运用的Flink版别是1.6.1。假如运用的是其他版别,需求到Maven库房中查找对应版别的Maven装备。

 
org.apache.flink
flink-java
1.6.1
provided


org.apache.flink
flink-streaming-java_2.11
1.6.1
provided

运用Scala言语开发Flink程序的时分需求增加下面的装备。

 
org.apache.flink
flink-scala_2.11
1.6.1
provided


org.apache.flink
flink-streaming-scala_2.11
1.6.1
provided

留意:在IDEA等开发工具中运转代码的时分,需求把依靠装备中的scope特点注释掉。在编译打JAR包的时分,需求敞开scope特点,这样终究的JAR包就不会把这些依靠包也包括进去,由于集群中自身是有Flink的相关依靠的。

2.2 Flink程序开发过程

开发Flink程序有固定的流程。

(1)取得一个履行环境。

(2)加载/创立初始化数据。

(3)指定操作数据的Transaction算子。

(4)指定核算好的数据的寄存方位。

(5)调用execute()触发履行程序。

留意:Flink程序是推迟核算的,只要最终调用execute()办法的时分才会真实触发履行程序。

推迟核算的优点:你能够开发杂乱的程序,Flink会将这个杂乱的程序转成一个Plan,并将Plan作为一个全体单元履行!

在这里,提早创立一个Flink的Maven项目,起名为FlinkExample,作用如图2.1所示。

图2.1 项目目录

后边的Java代码悉数寄存在src/main/Java目录下,Scala代码悉数寄存在src/main/Scala目录下,流核算相关的代码寄存在对应的streaming目录下,批处理相关的代码则寄存在对应的batch目录下。

2.3 Flink流处理(Streaming)事例开发

需求剖析:经过Socket手艺实时发生一些单词,运用Flink实时接纳数据,对指定时刻窗口内(如2s)的数据进行聚合核算,而且把时刻窗口内核算的成果打印出来。

2.3.1 Java代码开发

首要增加Java代码对应的Maven依靠,参阅2.1.2节的内容。留意,鄙人面的代码中,咱们会创立一个WordWithCount类,这个类首要是为了便利核算每个单词呈现的总次数。

需求:完结每隔1s对最近2s内的数据进行汇总核算。

剖析:经过Socket模仿发生单词,运用Flink程序对数据进行汇总核算。

代码完结如下。

package xuwei.tech.streaming;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.utils.ParameterTool;
import org.apache.Flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.Flink.runtime.state.filesystem.FsStateBackend;
import org.apache.Flink.runtime.state.memory.MemoryStateBackend;
import org.apache.Flink.streaming.api.DataStream.DataStream;
import org.apache.Flink.streaming.api.DataStream.DataStreamSource;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.windowing.time.Time;
import org.apache.Flink.util.Collector;
/**
* 单词计数之滑动窗口核算
*
* Created by xuwei.tech
*/
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception{
//获取需求的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9000--Java");
port = 9000;
}
//获取Flink的运转环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "hadoop100";
String delimiter = "\n";
//衔接Socket获取输入的数据
DataStreamSource text = env.socketTextStream(hostname, port, delimiter);
// a a c
// a 1
// a 1
// c 1
DataStream windowCounts = text.flatMap(new FlatMapFunction
() {
public void flatMap(String value, Collector out) throws
Exception {
String[] splits = value.split("\\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))//指定时刻窗口巨细为2s,指定时刻距离为1s
.sum("count");//在这里运用sum或许reduce都能够
/*.reduce(new ReduceFunction() {
public WordWithCount reduce(WordWithCount a,
WordWithCount b) throws Exception {
return new WordWithCount(a.word,a.count+b.count);
}
})*/
//把数据打印到控制台而且设置并行度
windowCounts.print().setParallelism(1);
//这一行代码一定要完结,不然程序不履行
env.execute("Socket window count");
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word,long count){
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

2.3.2 Scala代码开发

首要增加Scala代码对应的Maven依靠,参阅2.1.2节的内容。在这里经过case clas星座屋-Flink快速入门s的办法在Scala中创立一个类。

需求:完结每隔1s对最近2s内的数据进行汇总核算。

剖析:经过Socket模仿发生单词,运用Flink程序对数据进行汇总核算。

代码完结如下。

package xuwei.tech.streaming
import org.apache.Flink.api.Java.utils.ParameterTool
import org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironment
import org.apache.Flink.streaming.api.windowing.time.Time
/**
* 单词计数之滑动星座屋-Flink快速入门窗口核算
*
* Created by xuwei.tech
*/
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
//获取Socket端口号
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
}ca星座屋-Flink快速入门tch {
case e: Exception => {
System.err.println("No port set. use default port 9000--Scala")
}
9000
}
//获取运转环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//衔接Socket获取输入数据
val text = env.socketTextStream("hadoop100",port,'\n')
//解析数据(把数据打平),分组,窗口核算,而且聚合求sum
//留意:必需求增加这一行隐式转行,不然下面的FlatMap办法履行会报错
import org.apache.Flink.api.Scala._
val windowCounts = text.flatMap(line => line.split("\\\s"))//打平,把每一行单词都切开
.map(w => WordWithCount(w,1))//把单词转成word , 1这种方式
.keyBy("word")//分组
.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口巨细,指定距离时刻
.sum("count");// sum或许reduce都能够
//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))
//打印到控制台
windowCounts.print().setParallelism(1);
//履行任务
env.execute("Socket window count");
}
case class WordWithCount(word: String,count: Long)
}

2.3.3 履行程序

在前面的事例代码中指定hostname为hadoop100,port默许为9000,表明流处理程序默许监听这个主机的9000端口。因此在履732233行程序之前,需求先在hadoop100这个节点上面监听这个端口,经过履行下面指令完结。

[root@hadoop100 soft]# nc -l 9000
a
b
a

然后在IDEA中运转编写完结的程序代码,成果如下。

WordWithCount{word='a', count=1}
WordWithCount{word='b', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='a', count=1}

2.4 Flink批处理(Batch)事例开发

前面运用Flink完结了一个典型的流式核算事例,下面来看一下Flink的另一个星座屋-Flink快速入门运用场景——Batch离线批处理。

2.4.1 Java代码开发

需求:核算一个文件中的单词呈现的总次数,而且把成果存储到文件中。

Java代码完结如下。

package xuwei.tech.batch;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.DataSet;
import org.apache.Flink.api.Java.ExecutionEnvironment;
import org.apache.Flink.api.Java.operators.DataSource;
import org.apache.Flink.api.Java.tuple.Tuple2;
import org.apache.Flink.util.Collector;
/**
*单词计数之离线核算
*
* Created by xuwei.tech
*/
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
String inputPath = "D:\\\data\\\file";
String outPath = "D:\\\data\\\result";
//获取运转环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//获取文件中的内容
DataSource text = env.readTextFile(inputPath);
DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
env.execute("batch word count");
}
public static class Tokenizer implements FlatMapFunction
Integer>>{
public v星座屋-Flink快速入门oid flatMap(String value, Collector> out)
throws Exception {
String[] tokens = value.toLowerCase().split("\\\W+");
for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2(token,1));
}
}
}
}
}

2.4.2 Scala代码开发

需求:核算一个文件中的单词呈现的总次数,而且把成果存储到文件中。

Scala代码完结如下。

package xuwei.tech.batch
import org.apache.Flink.api.Scala.ExecutionEnvironment
/**
* 单词计数之离线核算
* Created by xuwei.tech
*/
object BatchWordCountScala {
def main(args: Array[String]): Unit = {
val inputPath = "D:\\\data\\\file"
val outPut = "D:\\\data\\\result"
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath)
//引进隐式转化
import org.apache.Flink.api.Scala._
val counts = text.flatMap(_.toLowerCase.split("\\\W+"))
.filter(_.nonEmpty)
.map((_,1))
.groupBy(0)
.sum(1)
counts.writeAsCsv(outPut,"\n"," ").setParallelism(1)
env.execute("batch word count")
}
}

2.4.3 履行程序

首要,代码中指定的inputPath是D:\\\data\\\file目录,咱们需求在这个目录下面创立一些文件,并在文件中输入一些单词。

D:\data\file>dir
2018/03/20 09:01 24 a.txt
D:\data\file>type a.txt
hello a hello b
hello a

然后,在IDEA中运转程序代码,发生的成果会被存储到outPut指定的D:\\\data\\\result文件中。

D:\data>type result
hello 3
b 1
a 2

本文摘自刚刚上架的《Flink入门与实战》徐葳 著

  • 这是一本Flink入门级图书,力求具体而完整地描绘Flink基础理论与实践操作。
  • 选用Flink 1.6版别写作,事例丰厚有用,做到学以致用。
  • 细节与事例统筹,浅显易懂展示Flink技术精华。
  • 51CTO抢手网课配套教材,可与网课结合学习,快速提高大数据开发技术。

本书旨在协助读者从零开端快速把握Flink的根本原理与中心功用。本书首要介绍了Flink的根本原理和装置布置,并对Flink中的一些中心API进行了具体剖析。然后配套对应的事例剖析,别离运用Java代码和Scala代码完结事例。最终经过两个项目演示了Flink在实践工作中的一些运用场景,协助读者快速把握Flink开发。

学习本书需求我们具有一些大数据的基础知识,比方Hadoop、Kafka、Redis、Elasticsearch等结构的根本装置和运用。本书也合适对大数据实时核算感兴趣的读者阅览。