基于文件的Source定义
2021/7/2 23:26:17
本文主要是介绍基于文件的Source定义,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
上篇:并行的Source定义
一、概述
基于文件的Source,本质上就是使用指定的FileInputFormat格式读取数据,可以指定3种格式,分别是:TextInputFormat格式、CsvInputFormat格式、BinaryInputFormat格式。
基于文件的Source底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction,它们都是非并行的Source
1、readFile(FileInputFormat inputFormat, String filePath) 方法
可以指定读取文件的FileInputFormat 格式,有个重载的方法readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval)
可以指定FileProcessingMode,有两个枚举类型
PROCESS_ONCE模式:
- Source只读取文件中的数据一次,读取完成后,程序退出
PROCESS_CONTINUOUSLY模式:
- Source会一直监听指定的文件,如果使用该模式,需要指定检测该文件是否发生变化的时间间隔
注意:使用这种模式,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取
代码编程(会重复读取数据)【增量读、了解】
package cn._51doit.flink.day01; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; /** * Source-readFile的使用 * readFile创建的Source是一个多并行度的Source,而且是一个无限的数据流(但是会重复读取数据),这个也会打印4个并行度 * 场景:有新数据就会马上读进来 * */ public class ReadFileDemo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 8181); //设置web ui的端口 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); //改造,发现有新数据就读进来 //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化 String path = "E:\\englin"; DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000); int parallelism = lines.getParallelism(); System.out.println("fromElements创建的并行度的DataStreamSource为:"+parallelism); lines.print(); env.execute(); } }
场景:有新数据就会马上读进来,打印输出:
readTextFile(String filePath) 方法
可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码
该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行
该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE
readTextFile的使用只读取文件中的数据一次,读取完成后,程序退出,它是一个有限的数据流,会读指定的目录文件下的数据
readTextFile创建的DataStream也是一个多并行的,fromElements创建的并行度的DataStreamSource为:4
代码:
package cn._51doit.flink.day01; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Source-readTextFile的使用 *Source只读取文件中的数据一次,读取完成后,程序退出,它是一个有限的数据流,会读指定的目录文件下的数据 * readTextFile创建的DataStream也是一个多并行的 */ public class ReadTextFileDemo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 8181); //设置web ui的端口 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); //改造,发现有新数据就读进来 //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化 String path = "E:\\englin"; DataStreamSource<String> lines = env.readTextFile(path); int parallelism = lines.getParallelism(); System.out.println("fromElements创建的并行度的DataStreamSource为:"+parallelism); lines.print(); env.execute(); } }
这篇关于基于文件的Source定义的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-01使用 SVN合并操作时,怎么解决冲突的情况?-icode9专业技术文章分享
- 2025-01-01告别Anaconda?试试这些替代品吧
- 2024-12-31自学记录鸿蒙API 13:实现人脸比对Core Vision Face Comparator
- 2024-12-31自学记录鸿蒙 API 13:骨骼点检测应用Core Vision Skeleton Detection
- 2024-12-31自学记录鸿蒙 API 13:实现人脸检测 Core Vision Face Detector
- 2024-12-31在C++中的双端队列是什么意思,跟消息队列有关系吗?-icode9专业技术文章分享
- 2024-12-31内存泄漏(Memory Leak)是什么,有哪些原因和优化办法?-icode9专业技术文章分享
- 2024-12-31计算机中的内存分配方式堆和栈有什么关系和特点?-icode9专业技术文章分享
- 2024-12-31QT布局器的具体使用原理和作用是什么?-icode9专业技术文章分享
- 2024-12-30用PydanticAI和Gemini 2.0构建Airflow的AI助手