4.9

2022/4/9 6:22:06

本文主要是介绍4.9,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

解压
在hadoop102节点服务器上创建安装目录/opt/module,将flink执行解压命令,解压至当前目录。
§tar-zxvf flink-1.13.0-bin-scala _2.12. tgz-C/opt/module/
flink-1.13.0/
flink-1.13.0γ1og/
flink-1.13.0/LICENSE
flink-1.13.0/lib/
3.启动
进入解压后的目录,执行启动命令,并查看进程。
$cd flink-1.13.0/
$bin/start-cluster.sh
Starting cluster.
Starting taskexecutor daemon on host hadoop102(hadoop102主机名)
$jps
10369 StandaloneSessionClusterEntrypoint
10680 TaskManagerRunner
10717 Jps

 

 

ackage com. atguigu. wc;
import org. apache. flink. api. java. ExecutionEnvironment ;

public class BoundedStreamWordCount {
public static void main(String[]args) {
1.创建流式的执行环境
StreamExecutionEnvironment env(自定义) streamExecutionEnvironmentecutionEnvironment();

//2.读取文件
Datastreams οurce<String) lineDatastreamSource =enν. readTextFile (hile(fiput/words. txt")

//3.转换计算
Singlecutuputstramperator < Tuple2KString , Long-> worldmonenuple =line0ates/ transmsrunce . flathttp/(String line, co).
String[]words=line. split(regex:“");
for(String word:words){
out. collect(Tuple2. of(word,1L));
}
})
returns(Types.TUPLE(Types,STRING, Types.LONG));

//4.分组
Keyed Stream. Cuplec<String, long>, String> wordqndonekeyed StreBm=world name Tuple, keygy(data->data, ff).

//5.求和
SingleOutputstreamoperatorkTuple χ(string, long>>sum awordOnekeyedStream . sum

//6、打印
sum. print();

//7.启动执行
env.execute();
}



这篇关于4.9的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程