基于文件的Source定义

2021/7/2 23:26:17

本文主要是介绍基于文件的Source定义,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

上篇:并行的Source定义

一、概述

基于文件的Source,本质上就是使用指定的FileInputFormat格式读取数据,可以指定3种格式,分别是:TextInputFormat格式、CsvInputFormat格式、BinaryInputFormat格式。

基于文件的Source底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction,它们都是非并行的Source

1、readFile(FileInputFormat inputFormat, String filePath) 方法

可以指定读取文件的FileInputFormat 格式,有个重载的方法readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval) 

可以指定FileProcessingMode,有两个枚举类型

PROCESS_ONCE模式:

  • Source只读取文件中的数据一次,读取完成后,程序退出

PROCESS_CONTINUOUSLY模式:

  • Source会一直监听指定的文件,如果使用该模式,需要指定检测该文件是否发生变化的时间间隔

注意:使用这种模式,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取

代码编程(会重复读取数据)【增量读、了解】

package cn._51doit.flink.day01;

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

/**
 * Source-readFile的使用
 * readFile创建的Source是一个多并行度的Source,而且是一个无限的数据流(但是会重复读取数据),这个也会打印4个并行度
 * 场景:有新数据就会马上读进来
 *
 */
public class ReadFileDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);  //设置web ui的端口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        //改造,发现有新数据就读进来
        //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化
        String path = "E:\\englin";

        DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path,
                FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);



        int parallelism = lines.getParallelism();

        System.out.println("fromElements创建的并行度的DataStreamSource为:"+parallelism);

        lines.print();

        env.execute();

    }
}

场景:有新数据就会马上读进来,打印输出:

readTextFile(String filePath) 方法

可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码

该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行

该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE

readTextFile的使用只读取文件中的数据一次,读取完成后,程序退出,它是一个有限的数据流,会读指定的目录文件下的数据

readTextFile创建的DataStream也是一个多并行的,fromElements创建的并行度的DataStreamSource为:4

代码:

package cn._51doit.flink.day01;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Source-readTextFile的使用
 *Source只读取文件中的数据一次,读取完成后,程序退出,它是一个有限的数据流,会读指定的目录文件下的数据
 * readTextFile创建的DataStream也是一个多并行的
 */
public class ReadTextFileDemo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8181);  //设置web ui的端口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        //改造,发现有新数据就读进来
        //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化
        String path = "E:\\englin";

        DataStreamSource<String> lines = env.readTextFile(path);

        int parallelism = lines.getParallelism();

        System.out.println("fromElements创建的并行度的DataStreamSource为:"+parallelism);

        lines.print();

        env.execute();

    }
}



这篇关于基于文件的Source定义的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程