大数据入门--hadoop(三)--MR编程

2021/6/20 20:26:36

本文主要是介绍大数据入门--hadoop(三)--MR编程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

MR相关内容

  • InputFormat(切片和为maptask读取数据)
  • 分区(与Reducer的执行数量,自定义分区)
  • 排序
  • 合并Combiner(快速排序后、第一次归并后、不能影响执行结果,输入kv与输出kv类型一致)
  • 分组(第二次归并后,Reducer输入前,自定义分组)

切片(InputFormat)

类型切片方法getSplitskv方法createRecordReader用途说明
TextInputFormatFIF的切片方法LineRecordReader按照块大小分片,按行行读取记录。
KeyValueTextInputFormatFIF的切片方法KeyValueLineRecordReader按照块大小分片,按行读取记录。
SequenceFileInputFormatFIF的切片方法SequenceFileRecordReader按照块大小分片,专用读取上一个任务使用SequenceFileOutputFormat输出的文件。
FixedLengthInputFormatFIF的切片方法FixedLengthRecordReader按照块大小分片,定长读取记录。(使用频率低)
NLineInputFormat自定义,N行一片LineRecordReader通过指定行数进行分片,按行读取记录。
CombineFileInputFormat自定义LineRecordReader合并小文件进行分片读取,按行读取记录。

RecordReader分类

类型说明
LineRecordReader按行读取。
key:LongWritable,内容偏移量
value:Text,一行数据
KeyValueLineRecordReader按照指定分割符进行分割
key:Text,分割的前一部分
value:Text,分割的后一部分,可以通过configuration中的mapreduce.input.keyvaluelinerecordreader.key.value.separator属性指定,默认是\t
FixedLengthRecordReader读取固定长度内容(byte)
key:LongWritable,记录偏移量
value:BytesWritable,二进制数据
SequenceFileRecordReader主要串联job执行,读取上一个job的结果,作为当前job的输入,可以传递对象数据

自定义InputFormat

场景:
假设目前有一堆小文件,需要通过一个MR程序转换为一个SequenceFile文件,其中key:文件路径,value:文件内容
思路:

  1. 自定义一个Format类继承FileInputFormat,其中key:Text,value:BytesWritable(因为文件内容不一定是文本)
  2. 需要重写方法:public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context),此时我们需要定义一个自己的RecordReader
  3. 同时考虑分片问题,我们FileInputFormat默认是按照文件和块大小分片的,这里我们需要同一个文件不被切片即一个文件在一个分片内,需要重写方法protected boolean isSplitable(JobContext context, Path filename)

InputFormat.java

/**
 * 将一堆小文件,转换为一个SequenceFile文件,key:原文件路径,value:原文件内容
 * 这里的泛型即为MapTask的记录的输入类型,所以key:Text,value:BytesWritable(因为文件不一定是文本,所以用BytesWritable)
 */
public class PackageInputFormat extends FileInputFormat<Text, BytesWritable> {
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new PackageSequenceRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

RecordReader.java

public class PackageSequenceRecordReader extends RecordReader<Text, BytesWritable> {
    //标识文件是否已经被读取过
    private boolean notRead = true;
    private FileSplit fs = null;
    private Text key = new Text();
    private BytesWritable val = new BytesWritable();
    private FSDataInputStream inputStream;
    //初始化只执行一次
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        fs = (FileSplit) split;
        Path path = fs.getPath();
        FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
        inputStream =  fileSystem.open(path);
    }

    //是否还有下一个数据,返回true则标识还有数据,否则无数据
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        //判断是否已读取过,未读取过进行读取
        if(notRead){
            //设置key
            key.set(fs.getPath().getName());
            //设置val
            long fileLength = fs.getLength();
            System.out.println("fs.getlength():"+fileLength);
            byte[] buf = new byte[(int) fileLength];
            int length = inputStream.read(buf);
            val.set(buf,0,length);
            notRead = false;
            return true;
        }else{
            return false;
        }
    }

    //获取当前key
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    //获取当前value
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return val;
    }

    //去读进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return notRead ? 0 : 1;
    }

    //关闭资源
    @Override
    public void close() throws IOException {
        IOUtils.closeStream(inputStream);
    }
}

Driver.java

public class PackageDriver {
    public static void main(String[] args) throws Exception {
        //1. 创建一个Job
        Configuration conf = new Configuration();
        conf.set("mapred.reduce.child.java.opts", "-Xmx512m");
        Job job = Job.getInstance(conf);

        //2. 设置类路径
        job.setJarByClass(FlowDriver.class);

        //3. 设置Mapper和Reducer:不需要MR

        //4. 设置Mapper和Reducer的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //5. 设置输入输出数据
        FileInputFormat.setInputPaths(job,new Path("d://hadoop-study/inputformat/input"));
        FileOutputFormat.setOutputPath(job,new Path("d://hadoop-study/inputformat/output"));

        //设置InputFormat
        job.setInputFormatClass(PackageInputFormat.class);

        //设置OutputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        //6. 提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}


这篇关于大数据入门--hadoop(三)--MR编程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程