Flink实现单词计数并写入MySQL
2021/10/11 19:15:49
本文主要是介绍Flink实现单词计数并写入MySQL,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Flink实现单词计数并写入MySQL
依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> </dependencies>
导包:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FlatMapIterator; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Row; import java.util.Arrays; import java.util.Iterator;
java代码:
public class toMySQL { public static void main(String[] args) throws Exception { JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456") .setQuery("insert into words (word,count) values (?,?) ") //设置为每2条数据就提交一次 .setBatchInterval(2) .finish(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> data = env.readTextFile("datas/1.txt"); //将读取的字符串按照空格分割成单个单词 FlatMapOperator<String, String> data1 = data.flatMap(new FlatMapIterator<String, String>() { @Override public Iterator<String> flatMap(String s) throws Exception { //先把标点符号都去除 String s1 = s.replace("?", ""); String s2 = s1.replace(".", ""); String s3 = s2.replace(",", ""); return Arrays.asList(s3.split(" ")).iterator(); } }); MapOperator<String, Tuple2<String, Integer>> data2 = data1.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }); //将Tuple对象,按第一个元素进行分区,再将第二个元素进行累加 AggregateOperator<Tuple2<String, Integer>> data3 = data2.groupBy(0).sum(1); MapOperator<Tuple2<String, Integer>, Row> data4 = data3.map(new MapFunction<Tuple2<String, Integer>, Row>() { @Override public Row map(Tuple2<String, Integer> ss) throws Exception { Row row = new Row(2); row.setField(0, ss.f0); row.setField(1, ss.f1); return row; } }); data4.print(); data4.output(jdbcOutput); env.execute(); } }
结果:
ps注意:
由于数据库字段名不区分大小写,因此不要把word设置为主键。因为存在大小写不同的单词,会发生主键冲突。
这篇关于Flink实现单词计数并写入MySQL的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16MySQL资料:新手入门教程
- 2024-11-16MySQL资料:新手入门教程
- 2024-11-15MySQL教程:初学者必备的MySQL数据库入门指南
- 2024-11-15MySQL教程:初学者必看的MySQL入门指南
- 2024-11-04部署MySQL集群项目实战:新手入门教程
- 2024-11-04如何部署MySQL集群资料:新手入门指南
- 2024-11-02MySQL集群项目实战:新手入门指南
- 2024-11-02初学者指南:部署MySQL集群资料
- 2024-11-01部署MySQL集群教程:新手入门指南
- 2024-11-01如何部署MySQL集群:新手入门教程