使用Java在HDFS上传文件

2021/9/10 14:05:27

本文主要是介绍使用Java在HDFS上传文件,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

上传前先在启动HDFS,然后新建一个文件,文件名为tmp:

 

 

在web上查看是否新建(授权)成功:

 打开idea:

package com.njbdqn.myhdfs.services;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;

public class UploadFileToHDFS {
    public static void main(String[] args) throws Exception{
        Configuration cfg = new Configuration();
        //cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//这里只连上了windows系统提供的Hadoop环境,与Linux的环境无关
        /*System.out.println(fs);*/
        //上传文件
          //获得上传文件的路径(要包含文件名)
        Path src = new Path("E:/mylog/log_20200102.log");
          //上传的位置(HDFS路径)
        Path dst = new Path("/tmp");
          //下达上传命令
        fs.copyFromLocalFile(src,dst);
        fs.close();
    }
}

运行:

 查看上传的位置:

转化

 JSON文件(半结构化数据)很难直接变成表格(结构化数据)。

可以用Java将JSON文件变为一种容易变为表格的文件,间接变为表格。----格式转化。

准备工作:

package com.njbdqn.myhdfs.services;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.net.URI;


public class UploadFileToHDFS {
    public static void main(String[] args) throws Exception{
        Configuration cfg = new Configuration();
        //cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//连上windows系统提供的Hadoop环境,与Linux的环境无关
        /*System.out.println(fs);*/
       

        //文件转换
        FileReader fis = new FileReader("E:/mylog/log_20200102.log");
        BufferedReader bis = new BufferedReader(fis);
        String line = "";
        while ((line=bis.readLine()) != null) {
            Info info = JSON.parseObject(line, Info.class);
            System.out.println(info.getGoodid()+","+info.getMachine().getMemory());
        }
        bis.close();
        fis.close();
    }
}

运行结果(获得文件E:/mylog/log_20200102.log里的信息):

 第二步:向HDFS传一个文件

新建几个class:

package com.njbdqn.myhdfs.services;

public class Info {
    private Machine machine;
    private String actTime;
    private String actType;
    private String goodid;
    private String page;
    private String userid;
    private Browse browse;

    @Override
    public String toString() {
        return "Info{" +
                "machine=" + machine +
                ", actTime='" + actTime + '\'' +
                ", actType='" + actType + '\'' +
                ", goodid='" + goodid + '\'' +
                ", page='" + page + '\'' +
                ", userid='" + userid + '\'' +
                ", browse='" + browse + '\'' +
                '}';
    }

    public Machine getMachine() {
        return machine;
    }

    public void setMachine(Machine machine) {
        this.machine = machine;
    }

    public String getActTime() {
        return actTime;
    }

    public void setActTime(String actTime) {
        this.actTime = actTime;
    }

    public String getActType() {
        return actType;
    }

    public void setActType(String actType) {
        this.actType = actType;
    }

    public String getGoodid() {
        return goodid;
    }

    public void setGoodid(String goodid) {
        this.goodid = goodid;
    }

    public String getPage() {
        return page;
    }

    public void setPage(String page) {
        this.page = page;
    }

    public String getUserid() {
        return userid;
    }

    public void setUserid(String userid) {
        this.userid = userid;
    }

    public Browse getBrowse() {
        return browse;
    }

    public void setBrowse(Browse browse) {
        this.browse = browse;
    }
}

 

package com.njbdqn.myhdfs.services;


public class Browse {
    private String browseType;
    private String browseVersion;

    public String getBrowseType() {
        return browseType;
    }

    public void setBrowseType(String browseType) {
        this.browseType = browseType;
    }

    public String getBrowseVersion() {
        return browseVersion;
    }

    public void setBrowseVersion(String browseVersion) {
        this.browseVersion = browseVersion;
    }
}

package com.njbdqn.myhdfs.services;


public class Machine {
    private String cpuType;
    private String memory;
    private String cpuSeed;

    public String getCpuType() {
        return cpuType;
    }

    public void setCpuType(String cpuType) {
        this.cpuType = cpuType;
    }

    public String getMemory() {
        return memory;
    }

    public void setMemory(String memory) {
        this.memory = memory;
    }

    public String getCpuSeed() {
        return cpuSeed;
    }

    public void setCpuSeed(String cpuSeed) {
        this.cpuSeed = cpuSeed;
    }
}


 运行下面类:

package com.njbdqn.myhdfs.services;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.net.URI;


public class UploadFileToHDFS {
    public static void main(String[] args) throws Exception{
        Configuration cfg = new Configuration();
        //cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//连上windows系统提供的Hadoop环境,与Linux的环境无关

        //文件转换
        FileReader fis = new FileReader("E:/mylog/log_20200102.log");
        BufferedReader bis = new BufferedReader(fis);
        //在HDFS上创建一个文件(不是文件夹)
        FSDataOutputStream fos = fs.create(new Path("/tmp/lg_20200102.log"));
        String line = "";
        while ((line=bis.readLine()) != null) {
            Info info = JSON.parseObject(line, Info.class);
            String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
                    info.getMachine().getCpuType(),
                    info.getMachine().getMemory(),
                    info.getMachine().getCpuSeed(),
                    info.getActTime(),
                    info.getActType(),
                    info.getGoodid(),
                    info.getPage(),
                    info.getUserid(),
                    info.getBrowse().getBrowseType(),
                    info.getBrowse().getBrowseVersion());//String模板
            fos.write(ctx.getBytes());
        }
        fos.flush();
        fos.close();
        bis.close();
        fis.close();
    }
}


web上得结果:

 JSON格式转为下图的格式:

  第三步:向HDFS传多个文件(ETR:抽取、转化、加载)

Hadoop不能用多线程完成任务(单文件不能用,多文件可以),下面写的是多线程,这是因为是多个文件

package com.njbdqn.myhdfs.services;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class UploadFileToHDFS {

/*   确定文件没问题
public static void main(String[] args) {
        File file = new File("e:/mylog");
        String[] fst = file.list();
        for (String  f:fst){
            System.out.println(f);
        }

    }
    */

    public void writeFileToHDFS(String path,String fileName) {
        FileSystem fs = null;
        FileReader fis = null;
        BufferedReader bis = null;
        FSDataOutputStream fos = null;
        try {
            fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),new Configuration());
            fis = new FileReader(path+"/"+fileName);
            bis = new BufferedReader(fis);
            //在HDFS上创建一个文件
            fos = fs.create(new Path("/logs/"+fileName));
            String line = "";
            while((line=bis.readLine())!=null) {
            Info info = JSON.parseObject(line, Info.class);
            String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
                    info.getMachine().getCpuType(),
                    info.getMachine().getMemory(),
                    info.getMachine().getCpuSeed(),
                    info.getActTime(),
                    info.getActType(),
                    info.getGoodid(),
                    info.getPage(),
                    info.getUserid(),
                    info.getBrowse().getBrowseType(),
                    info.getBrowse().getBrowseVersion());//String模板
            fos.write(ctx.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }finally {
            try {
                fos.close();
                bis.close();
                fis.close();
                //fs.close();不一定要关闭,视具体情况而定
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(30);
        final  UploadFileToHDFS ufh = new UploadFileToHDFS();
        String filePath = "e:/mylog";
        //循环获取所有的文件
        File file = new File(filePath);
        String [] fs = file.list();
        for (String fileName:fs) {
            es.execute(new Runnable() {
                @Override
                public void run() {
                    ufh.writeFileToHDFS(filePath,fileName);
                }
            });
        }
        es.shutdown();
    }
}



 出现下图表示上传成功:

 上图中的列名为block是128MB,但列名为size是22.11MB,浪费了很多,所以要将小文件合并为大文件。



这篇关于使用Java在HDFS上传文件的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程