hdfs,Java编程以及SequenceFile,java编程
2021/6/16 20:21:07
本文主要是介绍hdfs,Java编程以及SequenceFile,java编程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
创建目录的两种方法。第二种不会出现权限问题
个人建议用第二种
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.testng.annotations.Test; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; //已经有的文件夹不会再创建 public class 创建目录 { //第一种创建方式 @Test public static void main(String[] args) throws IOException { Configuration con = new Configuration(); //设置namenode con.set("fs.defaultFS","hdfs://node00:9000"); FileSystem fileSystem =FileSystem.get(con); boolean mkdirs = fileSystem.mkdirs(new Path("/kkb/dir1")); fileSystem.close(); } //第二种创建方式 @Test public void mkDirONHDFS2() throws URISyntaxException, IOException, InterruptedException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root"); //通过filesystem创建目录 boolean mkdirs = fileSystem.mkdirs(new Path("/kkb/dir2")); fileSystem.close(); } //第二种创建方式,以及设置文件权限 @Test public void mkDirONHDFS3() throws IOException, URISyntaxException, InterruptedException { Configuration con = new Configuration(); //创建FsPermission对象,为文件设置权限 FsPermission fsPermission = new FsPermission(FsAction.READ_WRITE,FsAction.READ,FsAction.READ); //创建fileSystem对象 FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), con,"root"); boolean mkdirs = fileSystem.mkdirs(new Path("/kkb/dir4"),fsPermission); fileSystem.close(); } }
上传下载文件file1-2方法
查看文件信息file3方法
IO方式上传下载文件(输入输出流)file4-5方法
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class 上传文件 { //从本地上传文件 @Test public void uploadFile1HDFS() throws IOException, URISyntaxException, InterruptedException { //获取文件系统对象 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root"); //传输文件 fileSystem.copyFromLocalFile(new Path("D:/12.txt"),new Path("/kkb")); fileSystem.close(); } 从hdfs下载文件/// @Test public void downloadFile2HDFS() throws IOException, URISyntaxException, InterruptedException { //获取文件系统对象 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root"); //传输文件 fileSystem.copyToLocalFile(new Path("/kkb/12.txt"),new Path("E:")); fileSystem.close(); } 查看文件信息,这只是范例,其他方法自己尝试/// @Test public void viewFile3HDFS() throws IOException, URISyntaxException, InterruptedException { //获取文件系统对象 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration,"root"); RemoteIterator<LocatedFileStatus> listFile = fileSystem.listFiles(new Path("/kkb"), true); while (listFile.hasNext()){ LocatedFileStatus FileStatus = listFile.next(); String name = FileStatus.getPath().getName();//获取路径 System.out.println("name:"+name); BlockLocation[] blockLocations = FileStatus.getBlockLocations(); //获取block块的位置 for (BlockLocation bl :blockLocations ){ String[] hosts = bl.getHosts();//获取block块所在节点,一般有三个节点 for (int i = 0; i < hosts.length; i++) { System.out.println(hosts[i]); } for (String host : hosts) { System.out.println("host:"+host); } } } fileSystem.close(); } // IO方式上传下载文件// //上传单个文件 @Test public void putFile4HDFS() throws URISyntaxException, IOException, InterruptedException { //获取文件系统对象 Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), configuration, "root"); //创建输入流不要加file,否则会报错 FileInputStream fin = new FileInputStream(new File("D:/12.txt")); //创建输出流 FSDataOutputStream fout = fileSystem.create(new Path("/kkb/13.txt")); //调用IO完成流拷贝 IOUtils.copy(fin,fout); IOUtils.closeQuietly(fin); IOUtils.closeQuietly(fout); fileSystem.close(); } //同时上传多个文件到一个里面 @Test public void putFile5HDFS() throws URISyntaxException, IOException, InterruptedException { //获取分布式文件系统对象 FileSystem fileSystem = FileSystem.get(new URI("hdfs://node00:9000"), new Configuration(), "root"); FSDataOutputStream fout = fileSystem.create(new Path("/kkb/14.txt")); //文件输入流创建 // FSDataInputStream open = fileSystem.open(new Path("/kkb/14.txt")); //获取本地文件系统对象 Configuration configuration = new Configuration(); LocalFileSystem localfileSystem = FileSystem.getLocal(new Configuration()); FileStatus[] loacalfileStatus = localfileSystem.listStatus(new Path("D:/txt")); for (FileStatus fStatus: loacalfileStatus){ Path path = fStatus.getPath(); FSDataInputStream fin = localfileSystem.open(path); IOUtils.copy(fin,fout); IOUtils.closeQuietly(fin); } IOUtils.closeQuietly(fout); fileSystem.close(); } }
写入文件
package wl_02编程操作sequence_files; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class SequenceFiles_write { //模拟数据源;数组中一个元素表示一个文件的内容(DATA长度为5) private static final String[] DATA = { "The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.", "It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.", "Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer", "o delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.", "Hadoop Common: The common utilities that support the other Hadoop modules." }; public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { //获取文件系统对象 Configuration conf = new Configuration(); FileSystem.get(new URI("hdfs://node00:9000/kkb/2.txt"),conf,"root"); System.setProperty("HADOOP_USER_NAME","root"); //创建各种option //将文件写入到那个文件--》option SequenceFile.Writer.Option file = SequenceFile.Writer.file(new Path("hdfs://node00:9000/kkb/2.txt")); 因为SequenceFile每个record是键值对的,K是optipn SequenceFile.Writer.Option keyClass = SequenceFile.Writer.keyClass(IntWritable.class); //v也是option SequenceFile.Writer.Option valueClass = SequenceFile.Writer.valueClass(Text.class); //SequenceFile压缩方式:NONE | RECORD | BLOCK三选一 //方案一:RECORD、不指定压缩算法 // SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD); // SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption); //方案二:BLOCK、不指定压缩算法 // SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK); // SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption); //方案三:使用BLOCK、压缩算法BZip2Codec;压缩耗时间 //再加压缩算法 // BZip2Codec codec = new BZip2Codec(); // codec.setConf(conf); // SequenceFile.Writer.Option compressAlgorithm = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, codec); //开始写SequenceFile文件 SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD); SequenceFile.Writer writer = SequenceFile.createWriter(conf, file, keyClass, valueClass,compressOption); IntWritable key = new IntWritable(); Text value = new Text(); for (int i = 0; i < 10000; i++) { //设置key,value写入文件 key.set(i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key,value); } IOUtils.closeStream(writer); } }
命令查看SequenceFile内容
hdfs dfs -text /kkb/2.txt 或者打开网页查看
读取文件
package wl_02编程操作sequence_files; import com.sun.corba.se.spi.ior.Writeable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.util.ReflectionUtils; import sun.reflect.Reflection; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class SequenceFiles_read { public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { //获取文件系统对象 Configuration conf = new Configuration(); // //要读的SequenceFile // FileSystem.get(new URI("hdfs://node00:9000/kkb/2.txt"), conf, "root"); //指定用户 System.setProperty("HADOOP_USER_NAME", "root"); // 要读的SequenceFile文件 SequenceFile.Reader.Option file = SequenceFile.Reader.file(new Path("hdfs://node00:9000/kkb/2.txt")); // Reader对象 SequenceFile.Reader reader = new SequenceFile.Reader(conf, file); //创建key,value对象,用于存储key,value值 Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); //获取现在要读的位置 long position = reader.getPosition(); System.out.println("position"+position); while (reader.next(key, value)) { //看一下当前位置是否是同步点 String syncSeen = reader.syncSeen() ? "True" : "False"; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); //移动到下一个record开头的位置 position = reader.getPosition(); // beginning of next record } IOUtils.closeStream(reader); } }
这篇关于hdfs,Java编程以及SequenceFile,java编程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-06小米11i印度快充版ROM合集:极致体验,超越期待
- 2024-10-06【ROM下载】小米11i 5G 印度版系统, 疾速跃迁,定义新速度
- 2024-10-06【ROM下载】小米 11 青春活力版,青春无极限,活力全开
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求