因为毕业设计要用Go来写,因此一直想找一点东西来练习一下Go。恰巧在知乎上看到了ivan的回答中MIT的6.824,分布式系统的Lab似乎是用go来写的,正好自己也想系统学习一下分布式的知识,就下定决心要把这门课看完。

第一节课就给了Lab1,啥也没学就要做一个Lab,还好并不算难,算是用来熟悉Go语言的Lab吧。第一个Lab是关于Map Reduce的,应该是写了一个非常小的Map Reduce的计算框架,能实现单机多线程的计算,而且可以拓展成分布式的。那先看看这部分代码的结构吧~

代码结构

sequential模式下

在这个模式下,只有一个线程在跑,逻辑在下面

// Run jobs sequentially.
func RunSingle(nMap int, nReduce int, file string,
	Map func(string) *list.List,
	Reduce func(string, *list.List) string) {
	mr := InitMapReduce(nMap, nReduce, file, "")
	mr.Split(mr.file)
	for i := 0; i < nMap; i++ {
		DoMap(i, mr.file, mr.nReduce, Map)
	}
	for i := 0; i < mr.nReduce; i++ {
		DoReduce(i, mr.file, mr.nMap, Reduce)
	}
	mr.Merge()
}

这段代码基本可以涵盖整个程序的执行流程。首先初始化一个mr对象,mr对象就是管理Map Reduce任务的对象。然后调用Split切分任务,Split会把原本的文件按照MapJob的个数,来将输入文件平均切分,切分之后的文件名为"mrtmp." + fileName + "-" + strconv.Itoa(MapJob)

切分完之后,开始执行Map Job,具体逻辑在DoMap中。就是打开切分好之后的文件,然后对文件执行在用户写的应用代码中的Map函数。然后呢,对于每一个Map Job,都新建多个相当于是中间文件的文件,这些文件的数目随ReduceJob的数目而定,比方说,一个Map Reduce中有5个Map Job,3个Reduce Job,那就会有15个中间文件。这些中间文件是要交给Reduce Job来进行Reduce的,那这些文件里写着什么呢~之前说到会对输入文件执行用户写的应用代码中的Map函数,这样会产生一系列键值对,然后中间文件里就写着这样一个个键值对,那怎样决定一个键值对该交给哪个Reduce Job去做呢,这个Lab里用的是Hash。哈希函数是这样的:

func ihash(s string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(s))
	return h.Sum32()
}

嗯,并看不懂。好吧不需要看懂,只需要保证相同Key的键值对会被一个Reduce Job来处理就OK了。而哈希就可以保证这一点。

现在DoMap做完了,那就要做DoReduce了。DoReduce做的事情就是,每个Reduce Job都会生成一个Merge Result,Merge后的文件的文件名是"mrtmp." + fileName + "-res-" + strconv.Itoa(ReduceJob),有多少个Reduce Job就有多少个res文件。跟DoMap相同,DoReduce也会执行用户写的代码Reduce方法。

最后执行Merge,Merge操作就是把多个Reduce Job的结果合并,是把所有结果读入内存里再去做的,感觉有些粗暴,不过只是个Lab而已嘛。最后的结果会写到"mrtmp." + mr.file中。

并发模式下

并发的做法其实跟串行的时候差不多,主要的逻辑是这样的:

// Run jobs in parallel, assuming a shared file system
func (mr *MapReduce) Run() {
	fmt.Printf("Run mapreduce job %s %s\n", mr.MasterAddress, mr.file)

	mr.Split(mr.file)
	mr.stats = mr.RunMaster()
	mr.Merge()
	mr.CleanupRegistration()

	fmt.Printf("%s: MapReduce done\n", mr.MasterAddress)

	mr.DoneChannel <- true
}

最初的Split和Merge还是一样,只是在中间处理的时候不大一样,是使用了Master/Slave的架构。现在不直接跑Map Reduce的Job,而是跑一个Master,然后如果有Worker开始Run了,那Worker就会注册到Master上,然后Master就可以分配Job给Worker来做。不过跟真正的线程并发不太一样的是,Master和Slave是使用RPC来通信的,而不是直接线程间通信。估计是为了之后的拓展,以及为后面的Lab挖坑。其中的mr.RunMaster()的实现就是这个Lab的第二个任务,是这个Lab里最难的地方了。这里放下代码~?

func (mr *MapReduce) RunMaster() *list.List {
	// Your code here
	mapDoneChannel := make(chan int, mr.nMap)
	reduceDoneChannel := make(chan int, mr.nReduce)
	
	for i := 0; i < mr.nMap; i++ {
		go func (jobNumber int)  {
			for {
				// get the idle  worker
				worker := <- mr.idleWorkerChannel
				
				// set the jobargs and reply
				jobArgs := &DoJobArgs{}
				jobReply := &DoJobReply{}
				jobArgs.NumOtherPhase = mr.nReduce
				jobArgs.Operation = Map
				jobArgs.File = mr.file
				jobArgs.JobNumber = jobNumber
				
				// call worker.DoJob
				ok := call(worker, "Worker.DoJob", jobArgs, jobReply)
				if ok == true {
					mr.idleWorkerChannel <- worker
					mapDoneChannel <- jobNumber
					return
				}
			}
		}(i)
	}
	
	for i := 0; i < mr.nMap; i++ {
		<- mapDoneChannel
	}
	
	for i := 0; i < mr.nReduce; i++ {
		go func (jobNumber int)  {
			for {
				// get the idle  worker
				worker := <- mr.idleWorkerChannel
				
				// set the jobargs and reply
				jobArgs := &DoJobArgs{}
				jobReply := &DoJobReply{}
				jobArgs.NumOtherPhase = mr.nMap
				jobArgs.Operation = Reduce
				jobArgs.File = mr.file
				jobArgs.JobNumber = jobNumber
				
				// call worker.DoJob
				ok := call(worker, "Worker.DoJob", jobArgs, jobReply)
				if ok == true {
					mr.idleWorkerChannel <- worker
					reduceDoneChannel <- jobNumber
					return
				}
			}
		}(i)
	}
	
	for i := 0; i < mr.nReduce; i++ {
		<- reduceDoneChannel
	}
	
	fmt.Println("Jobs are all done.")
	
	return mr.KillWorkers()
}

似乎有些长了,讲道理可以短一些的,但是确实不熟悉Go语言,就先这样吧。其实想法是非常简单的,就是把串行模式下的直接调用DoMap,DoReduce变成RPC调用而已。但是会有一点点不同,就是如何保证程序能够停下来。我一直觉得多线程里这个问题是通病,在用scala写一个爬虫框架的时候,也遇到了这样的问题,截止到发这篇文章时仍然是把坑留在了那里。而这里的处理方法比那个坑要简单,因为程序是非常容易就知道自己什么时候需要停下来的,就是当所有的Job都做完的时候,就OK了。因此就建立一个channel,每做完一个Job就往里面放一个东西,然后在外层接收跟Job数目一样多次就好了。

至于为什么每个go func里都要写一个死循环for,这是因为Lab的第三个要求是要容错,这里的错误是指worker有可能跑着跑着跑挂了,不接受RPC调用了,于是OK就有可能是返回false。false并不一定意味着worker没在run,说不定是worker正在run没办法响应请求,或者网络不好之类的,那幂等请求怎么办呢~?那这个Lab设计的足够好,Master对于Worker的幂等请求是没有副作用的,只是浪费些计算资源啊之类的。所以幂等不需要管,只需要在失败的时候再调用一次就好了,这叫”at least once”吧~?这也是为什么要有个死循环,只有调用成功才return出去的原因。没有的话可以完成Lab的第一个和第二个要求,而第三个容错的要求就没办法实现。

func InitMapReduce(nmap int, nreduce int,
	file string, master string) *MapReduce {
	mr := new(MapReduce)
	mr.nMap = nmap
	mr.nReduce = nreduce
	mr.file = file
	mr.MasterAddress = master
	mr.alive = true
	mr.registerChannel = make(chan string)
	mr.DoneChannel = make(chan bool)

	// initialize any additional state here
	mr.idleWorkerChannel = make(chan string, maxWorkerNumber)
	return mr
}

func (mr *MapReduce) Register(args *RegisterArgs, res *RegisterReply) error {
	DPrintf("Register: worker %s\n", args.Worker)
	// set the registered worker free
	mr.idleWorkerChannel <- args.Worker
	
	// comment this because it will block, registerChannel is synchronized.
	// mr.registerChannel <- args.Worker
	res.OK = true
	return nil
}

不过需要注意,这种实现需要修改一下原本的注册代码。这是因为原本的注册代码是使用registerChannel来存放有哪些worker注册了。registerChannel是同步阻塞的,所以如果不取出来,是放不下下一个的。而在我们的runMaster中完全没有出现registerChannel,而是用idleWorkerChannel取代了它,idleWorkerChannel被定义为带缓存的channel,跟registerChannel作用有些不同,是在每次新添加了worker,或者某个worker做完了Job,又变得无事可做的时候,就写进去一个worker的值。之所以这样做是让runMaster的逻辑变得简单一点,不需要区分新注册的worker和空闲的worker。

通过这个Lab大致熟悉了下Go的基本语法,并发操作之类的,但是还是了解的比较浅薄。不过这学期到了期末了,比较忙,各种大作业,还有二专的考试,之后可能没多少时间来做了,只能放到寒假回家之后再来看看了。

不过做了Lab1收获还是很大的,对channel有了比较直观的了解,之后可能会针对go的并发写一篇文章。目前就我的直观了解,觉得channel就是一个线程安全的队列而已,其实也是有很多坑的,特别容易就死锁了。之前的那个registerChannel,因为一直没太关注,发现worker执行了注册后,master就被block住了,从代码的静态分析来看,根本看不出死锁,因为只有当两个worker同时来注册,同时没有任何线程来接受registerChannel的时候,才会出现问题。这挺惹人恼火的,这个Bug找了一下午。

而且发现一个奇怪的现象,go test和重定向go test > testresult.log后,最后的完成时间会差别好大。没有重定向,大概需要30秒,重定向后,完成时间就成了15秒,也没有找到合理的解释。最后似乎发现了自己代码有bug。在不重定向的时候有时候会跑不过,找时间再看看。。

评论