大数据之Hadoop集群中MapReduce的Join操作
2022/6/17 23:28:29
本文主要是介绍大数据之Hadoop集群中MapReduce的Join操作,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
需求分析
如下两张输入表格
order
表
id | pid | amount |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
pd
表
pid | pname |
---|---|
01 | 小米 |
02 | 华为 |
03 | 格力 |
将商品信息表中数据根据商品pid合并的订单数据表中
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 华为 | 2 |
1005 | 华为 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
Reduce Join
创建一个TableBean
对象,其包含两个文件的所有属性,方便在map阶段封装数据
public class TableBean implements Writable { private String id; private String pid; private Integer amount; private String pname; private String flag; public TableBean() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public Integer getAmount() { return amount; } public void setAmount(Integer amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(id); dataOutput.writeUTF(pid); dataOutput.writeInt(amount); dataOutput.writeUTF(pname); dataOutput.writeUTF(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readUTF(); this.pid = dataInput.readUTF(); this.amount = dataInput.readInt(); this.pname = dataInput.readUTF(); this.flag = dataInput.readUTF(); } @Override public String toString() { return id + '\t' + pname + '\t' + amount; } }
在map阶段根据文件名来区分加载对象,setup
方法一个文件只会执行一次,在该方法中获取文件名称,在map
方法中根据文件名来执行不同的操作,值得注意的是属性不能为默认的NULL
。
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> { private String filename; private Text outK = new Text(); private TableBean outV = new TableBean(); @Override protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException { // 初始化 FileSplit inputSplit = (FileSplit) context.getInputSplit(); filename = inputSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); //判断是哪个文件 if (filename.contains("order")) { String[] split = line.split("\t"); // 封装k v outK.set(split[1]); outV.setId(split[0]); outV.setPid(split[1]); outV.setAmount(Integer.parseInt(split[2])); outV.setPname(""); outV.setFlag("order"); } else { String[] split = line.split("\t"); // 封装k v outK.set(split[0]); outV.setId(""); outV.setPid(split[0]); outV.setAmount(0); outV.setPname(split[1]); outV.setFlag("pd"); } //写出 context.write(outK, outV); } }
由于使用pid
为key
,两个表中相同的pid
会进入同一个reduce
,再根据flag
判断是哪个表中的数据,如果是order
将其保存到数组中,如果是pd
则获取其pname
,循环order
数组赋值。值得注意的是,由于values
并非Java
中默认的迭代器,如果只是add(value)
赋值的是地址,无法达到预期要求。
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException { ArrayList<TableBean> orderBeans = new ArrayList<>(); TableBean pBean = new TableBean(); for (TableBean value : values) { if ("order".equals(value.getFlag())) { TableBean tempTableBean = new TableBean(); try { BeanUtils.copyProperties(tempTableBean, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tempTableBean); } else { try { BeanUtils.copyProperties(pBean, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } // 遍历orderBeans for (TableBean orderBean : orderBeans) { orderBean.setPname(pBean.getPname()); context.write(orderBean, NullWritable.get()); } } }
总结:如果数据量非常大,所有的压力都会来到reduce
阶段,这样会导致数据倾斜。为了防止发生,可以将Join
操作放到map
阶段,因为map
阶段处理的数据都是块大小128M
。
Map Join
Map Join适用与一张十分小、一张很大的表的场景
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
采用DistributedCache的方法:
(1)在Mapper的setup阶段,将文件读取到缓存集合中
(2)在Driver驱动类中加载缓存
// 缓存普通文件到Task运行节点 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); // 如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
实操案例
Mapper
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private HashMap<String, String> pdMap = new HashMap<>(); private Text outK = new Text(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 获取缓存文件,并把文件内容封装到集合中 pd.txt URI[] cacheFiles = context.getCacheFiles(); URI cacheFile = cacheFiles[0]; FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFile)); // 从流中读取数据 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); String line; while (StringUtils.isNotEmpty(line=reader.readLine())) { // 切割 String[] fields = line.split("\t"); pdMap.put(fields[0], fields[1]); } IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 处理 order.txt String line = value.toString(); String[] split = line.split("\t"); String pName = pdMap.get(split[1]); outK.set(split[0] + "\t" + pName + "\t" + split[2]); context.write(outK, NullWritable.get()); } }
Driver
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(new Configuration()); job.setMapperClass(MapJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.addCacheFile(new URI("file:///D:/hadoop/input/mapjoincache/pd.txt")); // 不需要reduce阶段 job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\mapjoin")); FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output\\mapjoin")); boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }
下篇文章:
相关文章:
大数据之Hadoop集群中MapReduce的Join操作
大数据之Hadoop集群的HDFS压力测试
这篇关于大数据之Hadoop集群中MapReduce的Join操作的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享
- 2024-11-22ansible 的archive 参数是什么意思?-icode9专业技术文章分享
- 2024-11-22ansible 中怎么只用archive 排除某个目录?-icode9专业技术文章分享
- 2024-11-22exclude_path参数是什么作用?-icode9专业技术文章分享
- 2024-11-22微信开放平台第三方平台什么时候调用数据预拉取和数据周期性更新接口?-icode9专业技术文章分享
- 2024-11-22uniapp 实现聊天消息会话的列表功能怎么实现?-icode9专业技术文章分享
- 2024-11-22在Mac系统上将图片中的文字提取出来有哪些方法?-icode9专业技术文章分享
- 2024-11-22excel 表格中怎么固定一行显示不滚动?-icode9专业技术文章分享
- 2024-11-22怎么将 -rwxr-xr-x 修改为 drwxr-xr-x?-icode9专业技术文章分享
- 2024-11-22在Excel中怎么将小数向上取整到最接近的整数?-icode9专业技术文章分享