各位情人节快乐0.0

今天又是非常充实的一天,读了半天的论文,写了半天的代码,晚上就来写这篇Lab的分享了。分布式的Lab在update后,架构变得更加简洁易读,代码也更加好看了。这篇文章可以结合MIT分布式系统Lab1分享一起看,能了解到Lab1到底有了哪些的变化。内容还是一样的,实现一个map reduce框架。

代码结构

相比于之前的Lab1,最大的变动应该是把调度部分抽象出来了。现在master运行的逻辑是统一的:

func (mr *Master) run(jobName string, files []string, nreduce int,
	schedule func(phase jobPhase),
	finish func(),
) {
	mr.jobName = jobName
	mr.files = files
	mr.nReduce = nreduce

	fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

	schedule(mapPhase)
	schedule(reducePhase)
	finish()
	mr.merge()

	fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

	mr.doneChannel <- true
}

可以看到,对于map和reduce过程,被抽象成了phase,然后调用schedule来执行。

sequential模式下

func Sequential(jobName string, files []string, nreduce int,
	mapF func(string, string) []KeyValue,
	reduceF func(string, []string) string,
) (mr *Master) {
	mr = newMaster("master")
	go mr.run(jobName, files, nreduce, func(phase jobPhase) {
		switch phase {
		case mapPhase:
			for i, f := range mr.files {
				doMap(mr.jobName, i, f, mr.nReduce, mapF)
			}
		case reducePhase:
			for i := 0; i < mr.nReduce; i++ {
				doReduce(mr.jobName, i, len(mr.files), reduceF)
			}
		}
	}, func() {
		mr.stats = []int{len(files) + nreduce}
	})
	return
}

可以看到,sequential模式的schedule函数,是针对不同的phase,顺序执行,其实实现跟之前的lab里是一样的,只是抽象了一层schedule函数,这样做的好处是什么呢,多一层抽象,可以使得schedule函数被两个phase复用,添加了抽象来提高代码复用,同时可以允许来写新的schedule函数来构造不同的执行逻辑,这是我的看法。

并发模式下

func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
	mr = newMaster(master)
	mr.startRPCServer()
	go mr.run(jobName, files, nreduce, mr.schedule, func() {
		mr.stats = mr.killWorkers()
		mr.stopRPCServer()
	})
	return
}

func (mr *Master) schedule(phase jobPhase) {
	var ntasks int
	var nios int // number of inputs (for reduce) or outputs (for map)
	switch phase {
	case mapPhase:
		ntasks = len(mr.files)
		nios = mr.nReduce
	case reducePhase:
		ntasks = mr.nReduce
		nios = len(mr.files)
	}

	fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)

	// All ntasks tasks have to be scheduled on workers, and only once all of
	// them have been completed successfully should the function return.
	// Remember that workers may fail, and that any given worker may finish
	// multiple tasks.
	//
	// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
	//

	doneChannel := make(chan int, ntasks)

	for i := 0; i < ntasks; i++ {
		go func(taskNumber int, nios int, phase jobPhase) {
			for {
				// get a worker
				worker := <-mr.registerChannel

				// rpc call
				taskArgs := &DoTaskArgs{}
				taskArgs.File = mr.files[taskNumber]
				taskArgs.JobName = mr.jobName
				taskArgs.NumOtherPhase = nios
				taskArgs.Phase = phase
				taskArgs.TaskNumber = taskNumber
				ok := call(worker, "Worker.DoTask", taskArgs, nil)

				// success
				if ok == true {
					go func() {
						// the order to send is important, must send taskNumber to doneChannel first
						doneChannel <- taskNumber
						mr.registerChannel <- worker
					}()
					return
				}
			}
		}(i, nios, phase)
	}

	for i := 0; i < ntasks; i++ {
		<-doneChannel
	}

	fmt.Printf("Schedule: %v phase done\n", phase)
}

在并发模式下,会使用这样一个schedule函数,这个函数的实现也是lab的一项任务,相比于之前没有update时候的lab1,update后任务更加明确了。而且值得注意的是,update后的lab不再允许往master里加新的变量,之前我的实现是废弃掉原本的registerChannel,用自己的带buffer的idleChannel来做worker的发现。现在就不能这样做了,相应地要对代码进行一点点修改。主要的改动是在call返回ok后,要用一个goroutine来发送消息,因为现在registerChannel是一个会阻塞的channel。而且正因为如此,需要先发送消息到doneChannel,再发送worker到registerChannel,否则最后两个task的doneMessage会被锁住。

Bonus-倒排索引

func mapF(document string, value string) (res []mapreduce.KeyValue) {
	// TODO: you should complete this to do the inverted index challenge

	// split
	words := strings.FieldsFunc(value, func(r rune) bool {
		return !unicode.IsLetter(r)
	})

	// emit
	var result []mapreduce.KeyValue
	for _, w := range words {
		result = append(result, mapreduce.KeyValue{w, document})
	}
	return result
}

func reduceF(key string, values []string) string {
	// TODO: you should complete this to do the inverted index challenge

	// remove duplicates
	values = removeDuplicates(values)

	// sort the slice
	sort.Strings(values)

	// construct the result
	var result = fmt.Sprintf("%d ", len(values))
	for _, v := range values {
		result += v + ","
	}

	return result[:len(result)-1]
}

update后的lab有了一个可选的任务,这个任务非常简单,就是实现一个inversed index。map函数现在有两个参数,一个是document,这个参数是专门为了这个bonus设定的吧,之前测试和写word count都完全不会用到这样一个参数。它是后面的values来自的文件的文件名,有了这个的话,简简单单就可以水水过。

总的来说,跟之前没update的时候差不多,反而因为架构的改进更加好写了。

评论