Flink-java(api)
2022/7/24 14:23:48
本文主要是介绍Flink-java(api),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Flink-java(api)
1. Map
package com.wt.flink.tf import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.scala._ object Demo1Map { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val studentDS: DataStream[String] = env.readTextFile("data/students.txt") //java api val kvDS: DataStream[(String, Int)] = studentDS.map(new MapFunction[String, (String, Int)] { /** * 马匹方法,每一条数据执行一次,传进来一条返回一条 * * @param value : 一行数据 * @return */ override def map(value: String): (String, Int) = { val clazz: String = value.split(",")(4) (clazz, 1) } }) kvDS.print() env.execute() } }
2. javaApi
package com.wt.flink.tf import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment object Demo2JavaApi { @throws[Exception] def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val studentDS: DataStreamSource[String] = env.readTextFile("data/students.txt") //java 代码 val kvDS: SingleOutputStreamOperator[Tuple2[String, Integer]] = studentDS.map(new MapFunction[String, Tuple2[String, Integer]]() { @throws[Exception] override def map(value: String): Tuple2[String, Integer] = { val clazz: String = value.split(",")(4) Tuple2.of(clazz, 1) } }) kvDS.print env.execute } }
3. FlatMapFunction
package com.wt.flink.tf import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object Demo3FlatMapFunction { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val linesDS: DataStream[String] = env.readTextFile("data/words.txt") //java api val wordsDS: DataStream[String] = linesDS.flatMap(new FlatMapFunction[String, String] { /** * flatMap: 一条数据执行一次,传入一条数据可以返回多条数据 * * @param value :一行数据 * @param out :用于将数据发送到下游 */ override def flatMap(value: String, out: Collector[String]): Unit = { val split: Array[String] = value.split(",") for (word <- split) { //将数据发送到下游 out.collect(word) } } }) wordsDS.print() env.execute() } }
4. KeyBy
package com.wt.flink.tf import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.scala._ object Demo5KeyBy { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val wordsDS: DataStream[String] = linesDS.flatMap(_.split(",")) val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1)) //java api val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(new KeySelector[(String, Int), String] { override def getKey(value: (String, Int)): String = { value._1 } }) //key之后进行聚合计算 val sumDS: DataStream[(String, Int)] = keyByDS.sum(1) sumDS.print() env.execute() } }
5. Reduce
package com.wt.flink.tf import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._ object Demo6Reduce { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val wordsDS: DataStream[String] = linesDS.flatMap(_.split(",")) val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1)) val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1) //再分组之后进行聚合计算 //val reduceDS: DataStream[(String, Int)] = keyByDS.reduce((kv1, kv2) => (kv1._1, kv1._2 + kv2._2)) //java api val reduceDS: DataStream[(String, Int)] = keyByDS.reduce(new ReduceFunction[(String, Int)] { override def reduce(kv1: (String, Int), kv2: (String, Int)): (String, Int) = { (kv1._1, kv1._2 + kv2._2) } }) reduceDS.print() env.execute() } }
6. Window
package com.wt.flink.tf import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow object Demo7Window { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val wordsDS: DataStream[String] = linesDS.flatMap(_.split(",")) val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1)) /** * 统计最近10秒单词的数量,每个5秒统计一次 * */ val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1) //滑动窗口 val windowDS: WindowedStream[(String, Int), String, TimeWindow] = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) //再开窗之后进行集合计算 val countDS: DataStream[(String, Int)] = windowDS.sum(1) countDS.print() env.execute() } }
7. Union
package com.wt.flink.tf import org.apache.flink.streaming.api.scala._ object Demo8Union { def main(args: Array[String]): Unit = { //创建flink环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val ds1: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5)) val ds2: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5)) //两个ds的类型要一致 val unionDS: DataStream[Int] = ds1.union(ds2) unionDS.print() env.execute() } }
这篇关于Flink-java(api)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南