Flink 流处理 API
2021/11/14 23:40:23
本文主要是介绍Flink 流处理 API,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Flink 流处理 API
- 1. Environment
- getExecutionEnvironment
- createLocalEnvironment
- createRemoteEnvironment
- 2. Source
- 从集合读取数据
- 从文件读取数据
- 从 kafka 读取数据
- 自定义 Source
- 3. Transform
- map
- flatMap
- Fliter
- keyBy
- 滚动聚合算子
- Reduce
- split 和 select
- connect 和 coMap、coFlatMap
- union
- 支持的数据类型
- UDF 函数(更细粒度的控制流)
- 函数类(Function Classes)
- 匿名函数(Lambda Functions)
- 富函数(Rich Functions)
- 数据重分区
- 4. Sink
- kafka
- Redis
- JDBC 自定义 Sink
Flink 流处理 API 的调用过程主要分为以下几个步骤:
所以 Flink API 也是围绕以下几个环节进行介绍。
1. Environment
Environment,即 Flink 程序的执行环境。
getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。
如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。即 getExecutionEnvironment
会根据查询执行的方式返回什么样的运行环境,最常用的一种创建执行环境的方式。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
如果没有设置并行度,会以flink-conf.yaml
中的配置为准,默认为 1。
可以通过 setParallelism()
方法更改并行度。
env.setParallelism(2);
createLocalEnvironment
返回本地执行环境,可以在调用时指定默认的并行度。
LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment(); // 指定并行度为 2 LocalStreamEnvironment localEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment(2);
createRemoteEnvironment
返回集群执行环境,将 Jar 提交到远程服务器。
需要在调用时指定JobManager 的 IP 和 Port,并指定要在集群中运行到 Jar 包。
StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, "WordCount.jar");
2. Source
Source,即数据源,通过数据源读入数据至 Flink 程序中执行,Source 支持多种方式读取数据。
存在如下一个类,用于记录用户日志。
import lombok.Data; @Data public class UserLog { private String id; private Long timeStamp; private String log; public UserLog() { } public UserLog(String id, Long timeStamp, String log) { this.id = id; this.timeStamp = timeStamp; this.log = log; } }
从集合读取数据
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<UserLog> userLogs = Arrays.asList( new UserLog("user_1", 1547718199L, "aaa"), new UserLog("user_2", 1547718199L, "bbb"), new UserLog("user_3", 1547718199L, "ccc") ); // 从集合中读取数据 DataStream<UserLog> dataStream = env.fromCollection(userLogs); // 打印输出 dataStream.print("userLogs"); // 执行 env.execute(); }
从文件读取数据
# userLog.txt user_1, 1547718212, aaa user_1, 1547718213, bbb user_1, 1547718214, ccc user_2, 1547718212, ddd user_2, 1547718213, eee user_3, 1547718214, fff
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataStream<String> dataStream = env.readTextFile("userLog.txt"); // 打印输出 dataStream.print(); // 执行 env.execute(); }
从 kafka 读取数据
需要引入 kafka 连接器到依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.1</version> </dependency>
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从文件中读取数据 DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>( "user_log", new SimpleStringSchema(), properties)); // 打印输出 dataStream.print(); // 执行 env.execute(); }
自定义 Source
除了以上的source
数据来源,我们还可以自定义source
。只需要传入一个SourceFunction
就可以。具体调用如下:
DataStream<UserLog> dataStream = env.addSource(new SourceFunction<UserLog>() { @Override public void run(SourceContext<UserLog> ctx) throws Exception { } @Override public void cancel() { } });
我们通过重写run
方法控制数据的生成,通过cancel
方法控制不再读入数据。
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从自定义读取数据 final UserLogSource userLogSource = new UserLogSource(); DataStream<UserLog> dataStream = env.addSource(userLogSource); // 打印输出 dataStream.print(); // 执行 env.execute(); } // 实现自定义的 SourceFunction public static class UserLogSource implements SourceFunction<UserLog> { // 定义标识位,用来控制数据的产生 private volatile boolean running = true; @Override public void run(SourceContext<UserLog> ctx) throws Exception { int num = 0; while (running) { UserLog userLog = new UserLog(); userLog.setId(num++ + ""); userLog.setTimeStamp(System.currentTimeMillis()); userLog.setLog("log:" + num); ctx.collect(userLog); } } @Override public void cancel() { running = false; } }
可以看到,自定义source
的方式非常灵活,可以实现任意形式的数据读入。如从 kafka、文件、mysql、redis、es 等都可以通过自定义source
等的方式进行读入。
3. Transform
在将数据读入后,我们可以需要进行大量的转换操作,而转换操作就是通过如下 API 实现的。
map
将数据元素转换为另一种格式的数据元素。
// 将元素由 string 转换为对应 string 的长度 DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return value.length(); } });
test.txt a bb ccc dddd
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // map 把 String 转换成长度输出 DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return value.length(); } }); // 打印输出 mapStream.print("map"); env.execute(); }
flatMap
map
只支持一对一的转换,如果需要进行一对多转换,则需要通过flatMap
实现。
test.txt a,1 bb,2 ccc,3 dddd,4
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // flatMap,按逗号分字段 DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> collector) { String[] fields = value.split(","); for (String field : fields) { collector.collect(field); } } }); // 打印输出 flatMapStream.print("flatMap"); env.execute(); }
Fliter
过滤掉数据流中的元素。
test.txt a1 b1 c1 d1
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream =env.readTextFile("test.txt"); // filter,筛选 a 开头的数据 DataStream<String> filterStream = inputStream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) { return value.startsWith("a"); } }); // 打印输出 filterStream.print("filter"); env.execute(); }
keyBy
逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同的key
的元素,在内部以hash
的形式实现。
调用keyBy
,数据类型由DataStream
转换为KeyedStream
。
滚动聚合算子
这些算子可以针对KeyedStream
的每一个支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
test.txt user_1,100000,log_1 user_1,300000,log_3 user_1,200000,log_2 user_2,200000,log_5 user_2,100000,log_4 user_3,300000,log_6
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成UserLog类型 DataStream<UserLog> dataStream = inputStream.map(new MapFunction<String, UserLog>() { public UserLog map(String value) throws Exception { String[] array = value.split(","); return new UserLog(array[0], Long.parseLong(array[1]), array[2]); } }); // 分组 KeyedStream<UserLog, Tuple> keyedStream = dataStream.keyBy("id"); // 滚动聚合,取当前最大的时间戳 DataStream<UserLog> resultStream = keyedStream.max("timeStamp"); resultStream.print(); env.execute(); }
通过输出可以看到,随着数据流的到来,成功获得最大的数据。
Reduce
一个分组数据流的聚合操作,合并当前元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
有如下一个需求,我们不仅要知道各科目最大的成绩,还要知道最大成绩的所属学生。如果只通过 max
的话,是无法满足我们的需求的,此时,就需要通过 reduce
来实现该需求。
@Data public class Score { private String course; private String student; private int score; public Score() { } public Score(String course, String student, int score) { this.course = course; this.student = student; this.score = score; } }
test.txt 语文,小明,80 语文,小王,70 语文,小张,90 语文,小李,100 数学,小明,100 数学,小王,90 数学,小张,80 数学,小李,70 英语,小明,70 英语,小王,80 英语,小张,90 英语,小李,100
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成Score类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); // 分组 KeyedStream<Score, Tuple> keyedStream = dataStream.keyBy("course"); // reduce 聚合,取最大成绩 DataStream<Score> reduceStream = keyedStream.reduce(new ReduceFunction<Score>() { @Override public Score reduce(Score value1, Score value2) throws Exception { String student = value1.getStudent(); if (value1.getScore() < value2.getScore()) { student = value2.getStudent(); } return new Score(value1.getCourse(), student, Math.max(value1.getScore(), value2.getScore())); } }); reduceStream.print(); env.execute(); }
split 和 select
split
:根据某些特征把一个DataStream
拆分成两个或者多个DataStream
。
select
:从一个 SplitStream
中获取一个或者多个DataStream
。
test.txt 语文,小明,80 语文,小王,30 语文,小张,90 语文,小李,100 数学,小明,100 数学,小王,40 数学,小张,80 数学,小李,70 英语,小明,70 英语,小王,80 英语,小张,50 英语,小李,100
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成SensorReading类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); // 分流,按照分数60分为界,分成两条流 SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() { @Override public Iterable<String> select(Score score) { return score.getScore() > 60 ? Collections.singletonList("pass") : Collections.singletonList("unPass"); } }); DataStream<Score> passStream = splitStream.select("pass"); DataStream<Score> unPassStream = splitStream.select("unPass"); passStream.print("pass"); unPassStream.print("unPass"); env.execute(); }
connect 和 coMap、coFlatMap
connect
:连接两个保持他们类型的数据流,两个数据流被 connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
coMap
、coFlatMap
:作用于ConnectedStreams
上,功能与map
和flatMap
一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成SensorReading类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); // 1、分流,按照分数60分为界,分成两条流 SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() { @Override public Iterable<String> select(Score score) { return score.getScore() > 60 ? Collections.singletonList("pass") : Collections.singletonList("unPass"); } }); DataStream<Score> passStream = splitStream.select("pass"); DataStream<Score> unPassStream = splitStream.select("unPass"); // 2、合流,将及格对数据流转换成二元组类型 DataStream<Tuple2<String, Integer>> warningStream = passStream.map(new MapFunction<Score, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Score score) throws Exception { return new Tuple2<String, Integer>(score.getCourse(), score.getScore()); } }); // 3、与不及格对数据流连接合并之后,输出状态信息 ConnectedStreams<Tuple2<String, Integer>, Score> connectedStream = warningStream.connect(unPassStream); DataStream<Object> resultStream = connectedStream.map(new CoMapFunction<Tuple2<String, Integer>, Score, Object>() { @Override public Object map1(Tuple2<String, Integer> value) throws Exception { return new Tuple3<>(value.f0, value.f1, "pass"); } @Override public Object map2(Score value) throws Exception { return new Tuple3<>(value.getCourse(), value.getScore(), "unpass"); } }); resultStream.print("resultStream"); }
union
union
:对两个或者两个以上的DataStream
进行union
操作,产生一个包含所有DataStream
元素的新 DataStream
。
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("text.txt"); // 转换成SensorReading类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); // 1、分流,按照分数60分为界,分成两条流 SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() { @Override public Iterable<String> select(Score score) { return score.getScore() > 60 ? Collections.singletonList("pass") : Collections.singletonList("unPass"); } }); DataStream<Score> passStream = splitStream.select("pass"); DataStream<Score> unPassStream = splitStream.select("unPass"); // 2、union 联合多条流 DataStream<Score> allScoreStream = passStream.union(unPassStream); allScoreStream.print("allScoreStream"); env.execute(); }
支持的数据类型
Flink
流应用程序处理的是以数据对象表示的事件流。在Flink
内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送他们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda
函数或范型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink 支持 Java 和 Scala 中常见的数据类型,使用最广泛的类型有以下几种。
- 基础数据类型
Flink
支持所有的Java
和Scala
基础数据类型,Int、Double、Long、String。
- Java 和 Scala 元组(
Tuples
) - Scala 样例类(case classes)
- Java 简单对象(POJOs)
- 其他(Arrays、List、Maps、Enums)等
UDF 函数(更细粒度的控制流)
函数类(Function Classes)
Flink 暴露了所有 udf 函数的接口(实现方式为接口或抽象类)。例如MapFunction
,FilterFunction
,ProcessFunction
等等。
下面例子实现了FilterFunction
接口:
DataStream<String> filterStream = dataStream.filter(new FlinkFilter()); public static class FlinkFilter implements FilterFunction<String> { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } }
也可以将函数实现为匿名类:
DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } });
匿名函数(Lambda Functions)
DataStream<String> filterStream = dataStream.filter(value -> value.contains("flink"));
富函数(Rich Functions)
“富函数”是DataStream API
提供的一个函数类的接口,所有 Flink 函数类都有其Rich
版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
Rich Function
有一个生命周期的概念。典型的生命周期方法有:
open()
方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open
会被调用。close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。getRuntimeContext()
方法提供了函数的RuntimeContext
的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。
public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成 Score 类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); DataStream<Tuple2<String, Integer>> resultStream = dataStream.map(new MyMapper()); resultStream.print(); env.execute(); } public static class MyMapper extends RichMapFunction<Score, Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> map(Score score) throws Exception { return new Tuple2<>(score.getCourse(), getRuntimeContext().getIndexOfThisSubtask()); } @Override public void open(Configuration parameters) throws Exception { // 初始化工作,一般是定义状态,或者建立数据库连接 System.out.println("open"); } @Override public void close() throws Exception { // 一般是关闭连接 System.out.println("close"); } }
数据重分区
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); inputStream.print("input"); // 1. shuffle DataStream<String> shuffleStream = inputStream.shuffle(); shuffleStream.print("shuffleStream"); // 2. keyBy DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); dataStream.keyBy("course").print("keyBy"); // 3、global inputStream.global().print("global"); env.execute(); }
4. Sink
Flink
没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。所有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义 sink。
kafka
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.1</version> </dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从kafka中读取数据 DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<String>( "score_consume", new SimpleStringSchema(), properties)); // 转换成SensorReading类型 DataStream<String> dataStream = inputStream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { String[] array = value.split(","); System.out.println(new Score(array[0], array[1], Integer.parseInt(array[2])).toString()); return new Score(array[0], array[1], Integer.parseInt(array[2])).toString(); } }); // 输出到 kafka dataStream.addSink(new FlinkKafkaProducer011<String>( "localhost:9092", "score_produce", new SimpleStringSchema() )); env.execute(); }
Redis
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成 Score 类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); // 定义 Jedis 连接配置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build(); dataStream.addSink(new RedisSink<Score>(config, new MyRedisMapper())); env.execute(); } // 自定义 RedisMapper public static class MyRedisMapper implements RedisMapper<Score> { // 定义保存数据到 Redis 的命令,存成Hash表,hset scores course student-score @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription( RedisCommand.HSET, "scores" ); } @Override public String getKeyFromData(Score score) { return score.getCourse(); } @Override public String getValueFromData(Score score) { return score.getStudent() + "-" + score.getScore(); } }
JDBC 自定义 Sink
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.29</version> </dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream<String> inputStream = env.readTextFile("test.txt"); // 转换成 Score 类型 DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() { @Override public Score map(String value) throws Exception { String[] array = value.split(","); return new Score(array[0], array[1], Integer.parseInt(array[2])); } }); dataStream.addSink(new MyJdbcSink()); env.execute(); } // 实现自定义的SinkFunction public static class MyJdbcSink extends RichSinkFunction<Score> { Connection connection = null; PreparedStatement insertStmt = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection( "jdbc:mysql://localhost:3306/test", "root", "111111"); insertStmt = connection.prepareStatement("insert into t_score (course, student, score) value (?, ?, ?)"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(Score score, Context context) throws Exception { insertStmt.setString(1, score.getCourse()); insertStmt.setString(2, score.getStudent()); insertStmt.setInt(3, score.getScore()); insertStmt.execute(); } @Override public void close() throws Exception { insertStmt = null; connection.close(); } }
这篇关于Flink 流处理 API的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南