各位情人节快乐0.0
今天又是非常充实的一天,读了半天的论文,写了半天的代码,晚上就来写这篇Lab的分享了。分布式的Lab在update后,架构变得更加简洁易读,代码也更加好看了。这篇文章可以结合MIT分布式系统Lab1分享一起看,能了解到Lab1到底有了哪些的变化。内容还是一样的,实现一个map reduce框架。
代码结构
相比于之前的Lab1,最大的变动应该是把调度部分抽象出来了。现在master运行的逻辑是统一的:
func (mr *Master) run(jobName string, files []string, nreduce int,
schedule func(phase jobPhase),
finish func(),
) {
mr.jobName = jobName
mr.files = files
mr.nReduce = nreduce
fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)
schedule(mapPhase)
schedule(reducePhase)
finish()
mr.merge()
fmt.Printf("%s: Map/Reduce task completed\n", mr.address)
mr.doneChannel <- true
}
可以看到,对于map和reduce过程,被抽象成了phase,然后调用schedule来执行。
sequential模式下
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
可以看到,sequential模式的schedule函数,是针对不同的phase,顺序执行,其实实现跟之前的lab里是一样的,只是抽象了一层schedule函数,这样做的好处是什么呢,多一层抽象,可以使得schedule函数被两个phase复用,添加了抽象来提高代码复用,同时可以允许来写新的schedule函数来构造不同的执行逻辑,这是我的看法。
并发模式下
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
go mr.run(jobName, files, nreduce, mr.schedule, func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}
func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)
// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
//
// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
//
doneChannel := make(chan int, ntasks)
for i := 0; i < ntasks; i++ {
go func(taskNumber int, nios int, phase jobPhase) {
for {
// get a worker
worker := <-mr.registerChannel
// rpc call
taskArgs := &DoTaskArgs{}
taskArgs.File = mr.files[taskNumber]
taskArgs.JobName = mr.jobName
taskArgs.NumOtherPhase = nios
taskArgs.Phase = phase
taskArgs.TaskNumber = taskNumber
ok := call(worker, "Worker.DoTask", taskArgs, nil)
// success
if ok == true {
go func() {
// the order to send is important, must send taskNumber to doneChannel first
doneChannel <- taskNumber
mr.registerChannel <- worker
}()
return
}
}
}(i, nios, phase)
}
for i := 0; i < ntasks; i++ {
<-doneChannel
}
fmt.Printf("Schedule: %v phase done\n", phase)
}
在并发模式下,会使用这样一个schedule函数,这个函数的实现也是lab的一项任务,相比于之前没有update时候的lab1,update后任务更加明确了。而且值得注意的是,update后的lab不再允许往master里加新的变量,之前我的实现是废弃掉原本的registerChannel,用自己的带buffer的idleChannel来做worker的发现。现在就不能这样做了,相应地要对代码进行一点点修改。主要的改动是在call返回ok后,要用一个goroutine来发送消息,因为现在registerChannel是一个会阻塞的channel。而且正因为如此,需要先发送消息到doneChannel,再发送worker到registerChannel,否则最后两个task的doneMessage会被锁住。
Bonus-倒排索引
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// TODO: you should complete this to do the inverted index challenge
// split
words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
// emit
var result []mapreduce.KeyValue
for _, w := range words {
result = append(result, mapreduce.KeyValue{w, document})
}
return result
}
func reduceF(key string, values []string) string {
// TODO: you should complete this to do the inverted index challenge
// remove duplicates
values = removeDuplicates(values)
// sort the slice
sort.Strings(values)
// construct the result
var result = fmt.Sprintf("%d ", len(values))
for _, v := range values {
result += v + ","
}
return result[:len(result)-1]
}
update后的lab有了一个可选的任务,这个任务非常简单,就是实现一个inversed index。map函数现在有两个参数,一个是document,这个参数是专门为了这个bonus设定的吧,之前测试和写word count都完全不会用到这样一个参数。它是后面的values来自的文件的文件名,有了这个的话,简简单单就可以水水过。
结
总的来说,跟之前没update的时候差不多,反而因为架构的改进更加好写了。