2020 6.824 lab1解析

前置知识:

  • go
  • linux

2020 6.824 lab1解析

前言

  本系列主要是对mit课程6.824的lab进行解析,包括部分原理的讲解以及代码的实现。主要使用go来进行编写。本文讲解的是lab1,根据给定的代码框架来实现MapReduce结构。

介绍

  6.824是mit的分布式系统课程,而该课程的实验也就是lab目的为通过给定的代码框架来实现通用的分布式系统。该课程lab在2020年进行了部分更新,例如本次的lab1就进行了大改,与前几年几乎完全不同,因此以往网络上的各路教程也就过时了。详细课程资源可以到6.824官方网站进行查看。
  本文的lab1实现的是MapReduce架构,作为谷歌在2003年发表的论文,为现在的分布式计算思想提供了很明确的方向。它继承于函数式编程的思想,并且在谷歌内部真正进行了实现与运用。本文便根据给定的代码框架以及论文思想,来实现这么一个模型。

原理简介

  MapReduce如上文所说,源于函数式编程思想,简单来说Map对应“映射”,Reduce对应“归约”,前者将数据切分从而映射到键值对(key-value对)组,后者将键值对组进行整理归纳,并整合到输出文件。该算法的优势在于优秀的并行化,可以使用多台计算机来应对计算规模的增加,从而节省时间。
  但在具体原理上,我们还需要阅读谷歌发表的那篇论文从而了解更多细节。该论文写的很好,在实现细节不理解的情况下建议多读几遍。但在这里还是简述一下论文的观点。
  论文中提供了编程模型的实现:通过master机器来统筹工作,通过worker机器来进行具体的map或者reduce工作。master给worker分配map或者reduce工作,worker接收到map工作时便读取对应数据,执行map工作,将map完成后的键值对组存于磁盘,并且通知master完成工作。当master了解到所有map工作完成后,便通知worker进行reduce工作。worker接收到指令后便读取磁盘上的对应键值对。然后通常先对键值对组进行排序,然后进行reduce工作,并存于输出文件中。同样,在master了解到所有文件已经reduce完成后便可以进行退出。
  下面本文的实验也将基于上述模型来实现。

准备工作

  进入lab1官网,官网的准备工作说的相当清楚。要准备linux环境,go环境,然后下载实验文件。这里需要一些基本的linux知识,不过官网已经将步骤给的相当清楚了。再次不过多赘述。

实验目标与流程

  从官网可以看到,lab1的实验目标是完成mr文件夹中的master.go rpc.go worker.go三个文件,并且在main文件夹中通过mrmaster.go来创建一个master,通过执行多次mrworker.go来创建多个worker来运行。同时实验也给定了一个串行的结果正确的文件mrsequential.go,可以来参考以及改bug。在实验最后,我们需要对写好的文件进行测试。文件给了我们一个shell脚本test-mr.sh,该脚本会执行真正的应用程序以及给定各种实验场景,我们的目标是通过该测试。
  在mr文件夹中,rpc.go负责master与worker之间的通信。这里可能需要rpc的一些知识来理解,但在本次实验中可以简单理解为worker以rpc.go内的结构体为参数,调用master.go文件中的函数来与master进行通讯。在master和worker的文件内也给定了样例,并且在worker内部还封装好了call函数,即使不理解rpc也可以学样例编写程序。
  在worker文件中,可以看到Worker的参数为两个func,即map和reduce两个函数,这是在创建worker时候就给定的,那么我们可以想到在worker函数内部,我们需要主动找master要工作。同时在完成工作后,我们也需要找master汇报工作,同时接受下一份工作,直到master让你休假(但休假完还是要上班)或者master卷款跑路。(太资本主义了)
  在master文件中,可以看到初始化master时候给定了文件名序列以及reduce后文件的数量。毫无疑问,我们先要把这些存起来。然后根据上述worker的工作流程,master的工作也可以确认:接受worker的工作请求,查看手里有啥空闲的工作,有map就给map,map活都接完但是没干完的就让员工先休假,等到map活干完再来分配reduce工作。reduce工作都干完了就可以卷款跑路(调用Done函数return true)了。(太资本主义了)
  工作流程基本如上,虽然很形象,但接下来,我们需要具体用代码去实现它。

RPC.go

  master与worker要通过rpc进行通讯,那么我们先来规范好通讯协议,知道员工该告诉老板啥,老板该告诉员工啥,才能更好地展开工作。在这里,我们将worker发给master的数据称为请求,反过来称为相应。
  首先,无论是map工作还是reduce工作,无论是分配工作还是汇报工作,都要通过文件来进行。master需要告诉worker去读取哪个文件,worker需要告诉master哪个文件已经完成了。因此在请求和相应二者中,可以确定的一项是filename。数据类型这里选用的是string。
  其次,除了工作地点也就是文件名之外,还要汇报工作类型,是map还是reduce。所以无论是请求还是相应,都需要有表示工作类型的数据。这里我在请求中使用Status进行表示,同时也可以表示worker自己完成了工作还是在休假;响应中使用JobName进行表示。数据类型均为string。或许有些英语不准确但问题不大。
  我们可以得到以下代码:

1
2
3
4
5
6
7
8
9
10
11
type RPCArgs struct {
Status string
Filename string
}

type RPCReplys struct {
JobName string
Filename string
NReduce int
NMap int
}

  这里args为请求,replys为响应。在响应中多了两项,是为了worker更好地工作而传送的,在介绍worker时候会进行介绍。

master

  明确沟通协议后,master的工作就很明确了:存储各种工作状态,分配工作,接受汇报工作并且改变工作的状态,并在所有工作结束自己退出。我们先根据上述描述了解master需要存什么。
  首先,MakeMaster参数中的filenames和nReduce肯定是要存的,毫无疑问。
  其次,需要存储各个工作的状态。在这里可以使用数组来表示,0表示未分配,1表示工作中,2表示已完成。这样做的另一个好处是在工作时间过长的时候可以重新把该工作设置为未分配,从而达到容错的效果。这里需要两个数组,mapTasks []int 和 reduceTasks []int。
  第三,标志工作完成的标识符。这里用的mapDone和reduceDone两个int来标识。这也是确认工作完成来切换到reduce工作的标志。
于是我们可以得到以下声明:

1
2
3
4
5
6
7
8
9
10
type Master struct {
// Your definitions here.
filenames []string
mapTasks []int32
reduceTasks []int32
nReduce int
nMap int
mapDone int
reduceDone int
}

可以看到多了个nMap,也是为了worker便于工作而存放的。

MakeMaster函数便不用多说,进行初始化即可。

确定完协议后,就需要真正的函数来进行通信处理。这里使用的函数我命名为GetJob,接受参数如下:

1
func (m *Master) GetJob(args *RPCArgs, reply *RPCReplys) error 

  确定协议后就需要做事了。首先我们来接受worker的汇报工作,如果他完成的工作叫map,则找到他完成的工作,划掉工作。具体代码如下:

1
2
3
4
5
6
7
8
if args.Status == "map" {
for index, value := range m.filenames {
if args.Filename == value && atomic.LoadInt32(&m.mapTasks[index]) == 1 {
atomic.StoreInt32(&m.mapTasks[index], 2)
break
}
}
}

  横向有点长,主要是因为会有多个线程同时访问数组,要保证数组操作的原子性,从而调用函数。reduce代码基本一样,不过多赘述。

  接收汇报后便要先检测工作干完没有,干完了就可以直接结束。没干完就继续发配。这里检测是否完成与终止master进程的函数一样,调用了Done函数。

  Done函数遍历工作池,如果所有工作都做完了,则删掉MapReduce的中间产生的键值对组,返回true值。否则返回 false。这里写的麻烦了点,而且删掉中间产物也不应该由master来做,在shell测试脚本中由shell来清空了中间产物。但最终可以过测试就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *Master) Done() bool {
ret := false
m.mapDone = 1
m.reduceDone = 1
for _, value := range m.mapTasks {
if value != 2 {
m.mapDone = 0
break
}
}
for _, value := range m.reduceTasks {
if value != 2 {
m.reduceDone = 0
break
}
}
if m.mapDone == 1 && m.reduceDone == 1 {
ret = true
fmt.Println("Done jobs")
for i := 0; i < m.nReduce; i++ {
for j := 0; j < m.nMap; j++ {
f, err := os.OpenFile("mr-"+strconv.Itoa(i)+"-"+strconv.Itoa(j), os.O_RDWR|os.O_TRUNC, 0666)
if err != nil {
fmt.Println(err)
}
f.Close()
}
}//删掉中间产物
}
return ret
}

  在结束条件确认之后,身为master便可以尽情发配工作了。发配工作流程是,检测第一个未分配工作,如果有则分配下去并且开始计时,计时结束如果未完成说明遭受了意外完成不了,重新变为未分配。如果完成了则一切安好。如果所有工作都分配了但有的未完成,便可以先叫员工休假。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if m.mapDone == 0 {
index := m.getMapIndex()//手动封装的函数,找到第一个为0的数组索引,否则返回-1
if index != -1 {
reply.JobName = "map"
reply.Filename = m.filenames[index]
reply.NMap = index//在map时候传送index数,在reduce时候传输nmap总数,便于中间文件的命名
atomic.StoreInt32(&m.mapTasks[index], 1)
go func() {
time.Sleep(time.Duration(10) * time.Second)
if atomic.LoadInt32(&m.mapTasks[index]) != 2 {
atomic.StoreInt32(&m.mapTasks[index], 0)
}
}()//计时器,完成了则安好,未完成则改成未分配

} else {
reply.JobName = "free"//休假
reply.Filename = ""
}

}

  reduce同理,不过多赘述。至此,我们完成了master与worker所需要通信的所有东西……除了函数返回值。我们需要返回一个error类型。我选择的是干脆返回一个nil。

  不要忘了nMap和nReduce两个通信参数,在接下来的部分会用到。

  至此,master部分结束了,接下来讲解worker部分。

worker

  worker部分我们需要完成的也就是主要的Worker函数,也就是那个接收两个函数为参数的函数。我们需要完成的也很简单:循环调用GetJob来联系Master,接收到map活则干map,接收到reduce活则干reduce,接收到暂停任务则sleep一下,如果联系不到则说明Master工作已经完成,干脆自己也退出就行。

  worker主要部分也就确定了。一个大循环体,最开头是联系Master获得任务,代码如下:

1
2
3
if call("Master.GetJob", &args, &reply) == false {
return
}

  注意,go是没有while的,你需要使用for循环。

  接收到map工作后,先打开老板给的文件名,读取文件,将文件调用map函数,得到一个键值对组,存入中间文件中。中间文件的命名便有讲究:采用将key哈希成数字,然后对nReduce进行取余计算,这样可以保证相同的key计算值是一样的,同时也保证了分布式计算的正确性。将此数字作为中间文件名的一部分,同时另一部分采用map的号码,也就是传输过来的NMap号,来构成二元组。这样即保证了每个map任务的结果互不干扰,在一个map任务挂掉后重新执行该map,能够直接清空文件并再次编写。当进行reduce任务的时候,由于map数字是固定的,便于索引。

  由于在mrsequential.go中实现了串行的MapReduce,因此可以复制粘贴一部分代码到里面。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
else if reply.JobName == "map" {
filename := reply.Filename
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))//读取文件,调用map
var files []*os.File
for i := 0; i < reply.NReduce; i++ {
f, err := ioutil.TempFile("./", "tmp-map*")//建立临时文件,可以保证程序意外退出后能够直接清空
if err != nil {
fmt.Println(err)
}
files = append(files, f)
}
for _, kv := range kva {
name := ihash(kv.Key) % reply.NReduce//哈希操作
f := files[name]
fmt.Fprintf(f, "%v %v\n", kv.Key, kv.Value)//将key相同的放进一个文件
}
for index, value := range files {
value.Close()//保存临时文件,同时改名为正式的中间文件的名字。也就是NMAP的作用
os.Rename(value.Name(), "mr-"+strconv.Itoa(index)+"-"+strconv.Itoa(reply.NMap))
}
args.Status = "map"
args.Filename = filename
reply = RPCReplys{}
}

  代码不短,但有注释,很容易理解。

  到reduce部分,reduce部分干的活便是打开reduce号对应的所有中间文件,读取他们,并对他们进行排序。然后,同样是为了容错,根据reduce号建立输出文件的临时文件。然后,遍历排序过的键值对组,将key值相同的文件,调用reduce函数归纳整理,将结果输出到临时文件中。最后,重命名临时文件到正式文件。

  与map相同,由于在mrsequential.go中有对排序的实现以及文件名字命名的规范,文件内容的规范,我们也可以复制粘贴过来。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
else if reply.JobName == "reduce" {
var kvs []KeyValue
var kv KeyValue
filecode := reply.Filename
for i := 0; i < reply.NMap; i++ {
filename := "mr-" + filecode + "-" + strconv.Itoa(i)
ifile, err := os.OpenFile(filename, os.O_RDONLY, 0666)//根据reduce号计算中间文件名字,读取所有中间文件
if err != nil {
log.Fatalf("cannot read %v", filename)
}
for {
_, err := fmt.Fscanf(ifile, "%v %v", &kv.Key, &kv.Value)
if err != nil {
fmt.Println(err)
break
}
kvs = append(kvs, kv)
}
ifile.Close()
}
fmt.Println("len =", len(kvs))

sort.Sort(ByKey(kvs))//对键值对组排序,ByKey可以参考mrsequential.go
oname := "mr-out-" + reply.Filename
ofile, err := ioutil.TempFile("./", "out-tmp*")//临时文件,便于出错时直接丢弃
if err != nil {
fmt.Println("output error")
} else {
i := 0
for i < len(kvs) {
j := i + 1
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
output := reducef(kvs[i].Key, values)

fmt.Fprintf(ofile, "%v %v\n", kvs[i].Key, output)

i = j
}
}
ofile.Close()//这一部分参考mrsequential.go
os.Rename(ofile.Name(), oname)//重命名
args.Status = "reduce"
args.Filename = reply.Filename
reply = RPCReplys{}
}

  至此,worker基本完成了。还有个休假部分,简单的sleep就行,不过多赘述。

  整个MapReduce实验便完整地完成了。

测试

  代码写完当然要测试。在lab1的官网上,给了我们一个很好的各种测试办法。具体可以参考官网。这里说说最终测试。

  最后我们要执行的,是test-mr.sh的shell脚本。测试总共五项:测试word count程序,测试indexer程序,map并行化测试,reduce并行化测试以及容错测试。前两个是应用测试。中间两个我也不知道怎么测的,我也没看Shell源码,反正直接过了。最后一个容错测试值得多说几句。在现实生活中会遇到各种各样的错误,在上面实现的容错,也就是通过master在分配工作后检测给定时间内是否完成,未完成则重新分配,只能解决worker挂掉的问题。本次测试的也是如此。但现实生活中,可能master也会挂掉。这些问题在论文中提供了解决的思路。同时在论文中也有着多种优化的技巧与思路,所以论文值得多读几遍。

  在执行完test-mr.sh最后,会有如下输出。如果你的结果和以下一样,恭喜你,你完成了本次实验。

1
2
--- crash test: PASS
*** PASSED ALL TESTS

总结

  现在是凌晨2.15,写本博客比预先构想所花的时间要更长。写6.824的计划其实半年前学完go就想写了,但由于各种原因拖到最近才写完,从下载6.824到写完lab1过掉所有test只花了两天时间,和预想差不多。写本文花了两个半多小时。半年前最开始动手的时候感觉相当的迷茫,不知道从何下手,先是对go不熟悉,怕各种错误糊一脸,然后是对项目文件的不熟悉。这个过程中,仔细看官网,读论文,搜资料着实起到了很大的帮助。也幸亏MapReduce论文看了好几遍,让我交上了信息检索课的作业。

  总之,正确早日写完6.824,不要留坑。

Author

王钦砚

Posted on

2020-08-10

Licensed under

CC BY-NC-SA 4.0

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×