Lab1:实现简单的MapReduce框架
2022/9/2 23:23:13
本文主要是介绍Lab1:实现简单的MapReduce框架,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1.目标
Lab1提供了单机串行的MR框架,可以直接运行。需要改写为Master-Slave式的架构,lab1也提供的调用框架和RPC通信示例,我们的任务聚焦在MR框架即可,主要实现的内容为:
-
Worker不断请求Map任务,Coordinator将Map Task分发给Worker(一个原始输入文件对应一个Map Task)
-
Map阶段
a) Worker处理输入文件,Map函数输入为(filename string,content string),其中filename为输入文件名,content为该文件的内容,输出为KV数组;
b) 我们需要将相同Key的二元组聚集到一起,然后根据Lab1提供的ihash函数将ihash(Key)%ReduceN相同的二元组写到同一中间文件;其中ReduceN为Lab1设定的Reduce job数目。如果我们输入文件为M,那么总的中间文件数目应小于等于M*ReduceN -
Worker不断请求Rudce任务,Coordinator将Reduce Task分发给Worker(一个ReduceID对应一个Reduce Job,这里的Job我理解的是Master一次性发给Worker的批量数据,在下次请求Job前,Work需要先把这次的数据处理完)
-
Reduce阶段
a) Worker处理ReduceId对应的所有文件,由于一个中间文件中可能有不同的Key,我们需要先聚集相同Key的二元组,然后分别给Reduce处理
b) Reduce完成后,写入最终文件即可
2.实现
2.1 Worker端
worker.go
我们的Worker会不断的向Master请求任务,Master会将自己的状态(进行到哪一步)同步给Worker,Worker根据自己的状态决定请求Map Task还是Reduce Task
我们封装了Map函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件
func DoMap(reduceMax int, mapDone *bool, mapf func(string, string) []KeyValue) { //get map task reply := GetMapFileReply{} getMapFile(&reply, mapDone) //get content if reply.MaptaskNumber >= 0 { file, err := os.Open(reply.Filename) if err != nil { log.Fatalf("cannot open %v", reply.Filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", reply.Filename) } file.Close() //call application map function kva := mapf(reply.Filename, string(content)) sort.Sort(ByKey(kva)) i := 0 mapOutFileName := []string{} mapOutTmp2Final := make(map[string]string) mapOutFileContent := make(map[int][]MapOut) //split the content by key for i < len(kva) { j := i + 1 for j < len(kva) && kva[j].Key == kva[i].Key { j++ } mapout := MapOut{} for k := i; k < j; k++ { mapout.Value = append(mapout.Value, kva[k].Value) } mapout.Key = kva[i].Key reduceId := ihash(kva[i].Key) % reduceMax _, ok := mapOutFileContent[reduceId] if !ok { filename := "mr-" + strconv.Itoa(reply.MaptaskNumber) + "-" + strconv.Itoa(reduceId) //fmt.Println("file name is " + filename) mapOutFileName = append(mapOutFileName, filename) } mapOutFileContent[reduceId] = append(mapOutFileContent[reduceId], mapout) i = j } //generate the intermediate file for _, filename := range mapOutFileName { //get reduceId according to intermediate file name suffix := strings.Split(filename, "-") reduceId, _ := strconv.Atoi(suffix[2]) //file, _ = os.Create(filename) file, _ = ioutil.TempFile("", filename+"*") mapOutTmp2Final[file.Name()] = filename enc := json.NewEncoder(file) for _, content := range mapOutFileContent[reduceId] { err := enc.Encode(&content) if err != nil { fmt.Println("encode failed " + err.Error()) } } file.Close() } //send one map file done sendMapDone(reply.MaptaskNumber, reply.Filename, mapOutTmp2Final) } else if *mapDone == false && reply.MaptaskNumber < 0 { //wait for all map finish time.Sleep(1) } }
我们封装了Reduce函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件
func DoReduce(reduceDone *bool, reducef func(string, []string) string) { reply := GetReduceFileReply{} //get reduce task getReduceFile(&reply, reduceDone) kva := []MapOut{} ReduceOutTmp2Final := make(map[string]string) if !*reduceDone && reply.ReduceId >= 0 { //create out file outFileName := "mr-out-" + strconv.Itoa(reply.ReduceId) outfile, _ := ioutil.TempFile("", outFileName+"*") ReduceOutTmp2Final[outfile.Name()] = outFileName defer outfile.Close() //read from intermediate file for _, filename := range reply.Filename { file, _ := os.Open(filename) defer file.Close() dec := json.NewDecoder(file) for { var kv MapOut if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } } sort.Sort(MapOutByKey(kva)) //split content by key i := 0 for i < len(kva) { j := i + 1 for j < len(kva) && kva[j].Key == kva[i].Key { j++ } intermediate := []string{} for k := i; k < j; k++ { intermediate = append(intermediate, kva[k].Value...) } reduceRes := reducef(kva[i].Key, intermediate) fmt.Fprintf(outfile, "%v %v\n", kva[i].Key, reduceRes) i = j } //call application reduce sendReduceDone(reply.ReduceId, ReduceOutTmp2Final) //wait for all reduce done } else if !*reduceDone && reply.ReduceId < 0 { time.Sleep(1) } }
worker在完成任务后,会向Master发送“我搞定了”,Master此时检测是否当前阶段的所有任务都已经实现
2.2 Coordinator端
Master端需要对不同阶段、不同Task进度、超时时间等进行记录
type Coordinator struct { //map task : not started / running / finished mapTaskState map[string]int //used for timeout check mapTaskTime map[string]int64 //idicated map task mapTaskNumber int //if in map step mapDone bool //intermediate map out file mapOutFileArray map[int][]string //if in reduce step reduceDone bool //reduce task : not started / running / finished reduceTaskState map[int]int //used for timeout check reduceTaskTime map[int]int64 //given by caller,indicated the reduce job nReduce int //task state lock taskStatLock sync.Mutex //step state lock taskDone sync.Mutex }
这里Map和Reduce流程类似,如下Map流程:
首先获取Map任务,并分发给Worker
func (c *Coordinator) GetMapInFile(args *GetMapFileArgs, reply *GetMapFileReply) error { //get map file c.taskDone.Lock() if !c.mapDone { c.taskDone.Unlock() c.taskStatLock.Lock() for task, _ := range c.mapTaskState { if c.mapTaskState[task] == -1 { reply.Filename = task reply.MapDone = false reply.MaptaskNumber = c.mapTaskNumber c.mapTaskNumber++ c.mapTaskState[task] = 0 c.mapTaskTime[task] = time.Now().Unix() c.taskStatLock.Unlock() return nil } } reply.MapDone = false } else { reply.MapDone = true } reply.MaptaskNumber = -1 return nil }
在收到一个Map Task完成后,记录任务状态并检查是否所有任务完成能进入下一状态
func (c *Coordinator) MapSingleFileDone(args *MapDoneArgs, reply *MapDoneReply) error { //set this map task done c.taskStatLock.Lock() c.mapTaskState[args.Filename] = 1 c.taskStatLock.Unlock() //c.mapTaskDoneCollection = append(c.mapTaskDoneCollection, args.MaptaskNumber) //record reduceid <-> intermediate file name for tmpfile, filename := range args.MapOutTmp2Final { os.Rename(tmpfile, filename) suffix := strings.Split(filename, "-") reduceN, _ := strconv.Atoi(suffix[2]) c.mapOutFileArray[reduceN] = append(c.mapOutFileArray[reduceN], filename) } //test if all map done reply.Y = args.MaptaskNumber for _, i := range c.mapTaskState { if i != 1 { return nil } } //if all map done,set reduce task state c.taskStatLock.Lock() for key := range c.mapOutFileArray { c.reduceTaskState[key] = -1 } c.taskStatLock.Unlock() c.taskDone.Lock() c.mapDone = true c.taskDone.Unlock() return nil }
3.Lab1中提到的Tips
- 对于所有文件,可以先使用ioutil.TempFile创建临时文件,在任务结束后再改名为最终文件;避免部分中间部分Worker退出或崩溃,导致最终的文件混乱
//创建临时文件,临时文件不可见 file, _ = ioutil.TempFile("", filename+"*") //创建 临时文件 到 最终文件名的映射 mapOutTmp2Final[file.Name()] = filename ... //修改为最终文件,因为前两步在Worker中进行,这一步在Master进行,所以使用map来缓存临时文件的文件路径 os.Rename(ReduceOutTmp2Final[tmpName], filename)
-
对于每个任务,需要有超时判断,如果任务超时,将任务发给其他Worker来做,这里我另外起了一个线程来做超时判断。
-
在go run时,使用-race来判断是否有竞态,及时加锁
4.结果与改进
1.完成了所有测试
2.使用锁之后,明显性能下降很多,需要优化锁的粒度和类型
3.对于任务应该抽象为结构体,使用管道通信,会让流程更简洁
这篇关于Lab1:实现简单的MapReduce框架的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享
- 2024-11-22ansible 的archive 参数是什么意思?-icode9专业技术文章分享
- 2024-11-22ansible 中怎么只用archive 排除某个目录?-icode9专业技术文章分享
- 2024-11-22exclude_path参数是什么作用?-icode9专业技术文章分享
- 2024-11-22微信开放平台第三方平台什么时候调用数据预拉取和数据周期性更新接口?-icode9专业技术文章分享
- 2024-11-22uniapp 实现聊天消息会话的列表功能怎么实现?-icode9专业技术文章分享
- 2024-11-22在Mac系统上将图片中的文字提取出来有哪些方法?-icode9专业技术文章分享
- 2024-11-22excel 表格中怎么固定一行显示不滚动?-icode9专业技术文章分享
- 2024-11-22怎么将 -rwxr-xr-x 修改为 drwxr-xr-x?-icode9专业技术文章分享
- 2024-11-22在Excel中怎么将小数向上取整到最接近的整数?-icode9专业技术文章分享