Implementing a MapReduce Framework with Golang RPC from scratch

I just finished one small piece of work this morning, a MapReduce framework, implemented wit Golang RPC. Before I forget everything, I will document the design and code here. For original source code, it is available at https://github.com/PeterYaoNYU/mit-distributed-sys .

PeterYaoNYU/mit-distributed-sys

Distributed System Related Projects in GO: MapReduce, Raft, KV-Storage

I have been coding a lot recently, but not much time has been devoted to writing about what I have done. This is perilous, because I will quickly forget what I have done. Replication is super important for finite state machine as well as leanring computer science.

A MapReduce is a paradigm of abstraction for distributed workflows. The orginal paper published by Google is still wildly influentital today, though 2 decades have gone by. It is divided into 2 phases, a map phase, and a reduce phase. I cannot think of a better description than this image found in the original paper:

In another words, MR manages and hides all ascpects of distribution.

An Abstract view of a MapReduce job – word count

Input1 -> Map -> a,1 b,1
Input2 -> Map -> b,1
Input3 -> Map -> a,1 c,1
| | |
| | -> Reduce -> c,1
| —–> Reduce -> b,2
———> Reduce -> a,2

  • input is (already) split into M files
  • MR calls Map() for each input file, produces list of k,v pairs
    “intermediate” data
    each Map() call is a “task”
  • when Maps are done,
    MR gathers all intermediate v’s for each k,
    and passes each key + values to a Reduce call
  • final output is set of <k,v> pairs from Reduce()s

Implementation of the Worker

The logic for the worker code is more straightforward, and servers as a good starting point. The worker just first acquire the knowledge of how many reduce there are (this decides how many intermediate output each Map should produce), periodically ask for job from the coordinator, wait for the job to arrive, and based on the RPC reply argument, do either Map job or reduce job. After doing the job, report back to the coordinator that the job asked for has been finished, and it can be assigned future tasks then.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

args := GetNReduceArgs{}
reply := GetNReduceReply{}
call("Coordinator.GetNReduce", &args, &reply)
nReduce := reply.NReduce

for {
// 1. Request a task from the coordinator
reply, succ := requestTask()

if !succ {
// fmt.Println("Request task failed")
return
}

if reply.TaskType == ExitTask {
// fmt.Println("No task left")
return
}

if reply.TaskType == MapTask {
doMap(mapf, reply.File, reply.TaskId, nReduce)
// 3. Report the task is done to the coordinator
reportMapDone(reply.TaskId)
} else {
// 2. Do the reduce task
doReduce(reducef, reply.TaskId)
// 3. Report the task is done to the coordinator
reportReduceDone(reply.TaskId)
}
}
}

This is the big framework for the Worker. All we need to do next is to fill in the details. We need to implement:

  • the actual map code and reduce code
  • report back to the coordinator that the job has been done
  • A heartbeat function that peridically asks for more task from the coordinator
  • A writer function that output the intermediate values after Map operation to a shared distributed storage system (Like a Google File System)

Let’s fill in the details:
This function is asking for tasks periodically

1
2
3
4
5
6
func requestTask() (reply *RequestTaskReply, succ bool) {
args := RequestTaskArgs{WorkerID: os.Getpid()}
reply = &RequestTaskReply{}
succ = call("Coordinator.RequestTask", &args, reply)
return reply, succ
}

If the task is a Map task, we do the map, and then write to the disk, then report to the coordinator that the map job has been done.

One detail is that, when we are writing to the shared storage, we are not directly writing to it. To be fault tolerant and avoid problems, first we write to a temporary file, and then we do an atomic rename operation.

Another Detail

When reporting back to the coordinator that a task is finished, we also need to include the host name and PID, because the coordinator needs to be sure that the output comes from someone that is currently responsible for the job, not someone whe was responsible but got timed out, and then came back live again. I made this mistake before, and it is hard to debug, because it may seem like we don’t need a worker PID in the reply struct. Without this, the MapReduce will fail the Crash unit test. Distributed applications are very hard to debug!!!

Each intermediate output is hashed to nReduce partitions, for later consumption of the reduce RPC.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func doMap(mapf func(string, string) []KeyValue, filename string, mapID int, nReduce int) {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))

writeMapOutput(kva, mapID, nReduce)
}

func reportMapDone(TaskId int) {
args := ReportTaskDoneArgs{TaskType: MapTask, TaskId: TaskId, WorkerID: os.Getpid()}
reply := ReportTaskDoneReply{}
call("Coordinator.ReportTaskDone", &args, &reply)
}

func writeMapOutput(kva []KeyValue, mapID int, nReduce int) {
tempFiles := make([]*os.File, nReduce)
encoders := make([]*json.Encoder, nReduce)

for i := 0; i < nReduce; i++ {
tempFile, err := ioutil.TempFile(TempDir, "intermediate-")
if err != nil {
log.Fatalf("cannot create temp file")
}
tempFiles[i] = tempFile
encoders[i] = json.NewEncoder(tempFiles[i])
}

for _, kv := range kva {
// fmt.Println(nReduce)
reduceID := ihash(kv.Key) % nReduce
err := encoders[reduceID].Encode(&kv)
if err != nil {
log.Fatalf("cannot encode kv")
}
if err != nil {
log.Fatalf("cannot encode kv")
}
}

for i, tempFile := range tempFiles {
tempFile.Close()
filename := fmt.Sprintf("%v/mr-%d-%d", TempDir, mapID, i)
os.Rename(tempFile.Name(), filename)
}
}

Similar procedures happen with the Reduce part of the worker code:

  • get assigned a job
  • read in the correspond partition based on RPC reply
  • do the reduce call
  • write the file back, do atomic rename to be fault tolerant
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func reportReduceDone(TaskId int) {
args := ReportTaskDoneArgs{TaskType: ReduceTask, TaskId: TaskId, WorkerID: os.Getpid()}
reply := ReportTaskDoneReply{}
call("Coordinator.ReportTaskDone", &args, &reply)
}

func doReduce(reducef func(string, []string) string, reduceID int) {
// println("do reduce called once")
// 1. Read all the intermediate files
files, err := filepath.Glob(fmt.Sprintf("%v/mr-*-%d", TempDir, reduceID))
if err != nil {
log.Fatalf("cannot read temp files")
}
// 2. Group the files by key

intermediate := make(map[string][]string)

for _, filepath := range files {
file, err := os.Open(filepath)
if err != nil {
log.Fatalf("cannot open temp file")
}

dec := json.NewDecoder(file)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
intermediate[kv.Key] = append(intermediate[kv.Key], kv.Value)
}

file.Close()
}

// 3. Call reducef on each group
outputFile, err := ioutil.TempFile(TempDir, "mr-out-")
if err != nil {
log.Fatalf("cannot create temp file")
}
defer outputFile.Close()

for key, values := range intermediate {
output := reducef(key, values)
fmt.Fprintf(outputFile, "%v %v\n", key, output)
}

// 4. Write the output to a file
finalFilename := fmt.Sprintf("mr-out-%d", reduceID)
os.Rename(outputFile.Name(), finalFilename)
}

Implementation of the Coordinator

When a request for task comes to the coordinator, we need to assign tasks:

Now that we have shared data structures in the coordinator, to avoid race condition, we need extensive locking. Just lock everything that is shared, and you will be fine.

The idea is simple: when a request comes in, assign it either a map task or a reduce task, change the task status, and the worker assigned this task (for timeout operations). Also start a go routine waitTask to check for timeout. If timeout happens, then a process is no longer responsible for a certain task. Even if we get result back from the timed out node, we will discard the result (we need to do extrac check of PID in the result we get back).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.stage == ExitStage {
reply.TaskType = ExitTask
return nil
}
if c.stage == MapStage {
for i, task := range c.mapTasks {
if task.State == NotStarted {
c.mapTasks[i].State = Running
c.mapTasks[i].StartTime = time.Now()
c.mapTasks[i].WorkerID = args.WorkerID
reply.TaskType = MapTask
reply.File = task.File
reply.TaskId = task.TaskId
// fmt.Printf("Assign map task %v to worker %v, filename:%s\n", task.TaskId, args.WorkerID, reply.File)
go c.waitTask(&c.mapTasks[i])
return nil
}
}
}
if c.stage == ReduceStage {
for i, task := range c.reduceTasks {
if task.State == NotStarted {
c.reduceTasks[i].State = Running
c.reduceTasks[i].StartTime = time.Now()
c.reduceTasks[i].WorkerID = args.WorkerID
reply.TaskType = ReduceTask
reply.TaskId = task.TaskId
reply.File = task.File
go c.waitTask(&c.reduceTasks[i])
return nil
}
}
}
reply.TaskType = NoTask
return nil
}

Since we are talking about timeout, here is the implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Coordinator) waitTask(task *Task) {
time.Sleep(10 * time.Second)
c.mu.Lock()
defer c.mu.Unlock()
if task.State == Running {
if task.Type == MapTask {
fmt.Printf("Map task %v timeout, reassigning\n", task.TaskId)
c.mapTasks[task.TaskId].State = NotStarted
c.mapTasks[task.TaskId].WorkerID = -1
} else {
fmt.Printf("Reduce task %v timeout, reassigning\n", task.TaskId)
c.reduceTasks[task.TaskId].State = NotStarted
c.reduceTasks[task.TaskId].WorkerID = -1
}
}
}

If a timeout happens, which we monitor with a go routine, the state of the task, as well its worker, are changed accordingly. Note how we protect shared data structure with a lock.

If we receive a message, saying that a task has been done, we need to do the following:

  • Check the task type, state of the task, and whether the result comes back from someone who is actually responsible for the job.
  • Change the task status
  • If all tasks (map and reduce) have been finished, get back to the workers, telling them that we have done all the jobs.

Translate the logic into code, and we get:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (c *Coordinator) ReportTaskDone(args *ReportTaskDoneArgs, reply *ReportTaskDoneReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.TaskType == MapTask {
if c.mapTasks[args.TaskId].State == Running && c.mapTasks[args.TaskId].WorkerID != -1 && c.mapTasks[args.TaskId].WorkerID == args.WorkerID {
c.mapTasks[args.TaskId].State = Finished
c.mapTasks[args.TaskId].WorkerID = -1
c.nMap -= 1
fmt.Printf("Map task %v finished, nMap: %v, nReduce: %v\n", args.TaskId, c.nMap, c.nReduce)
}
} else if args.TaskType == ReduceTask {
if c.reduceTasks[args.TaskId].State == Running && c.reduceTasks[args.TaskId].WorkerID != -1 && c.reduceTasks[args.TaskId].WorkerID == args.WorkerID {
c.reduceTasks[args.TaskId].State = Finished
c.reduceTasks[args.TaskId].WorkerID = -1
c.nReduce -= 1
fmt.Printf("Reduce task %v finished, nMap: %v, nReduce: %v\n", args.TaskId, c.nMap, c.nReduce)
}
}

if c.nMap == 0 && c.nReduce == 0 {
// fmt.Printf("All tasks finished\n")
c.stage = ExitStage
reply.CanExit = true
} else if c.nMap == 0 {
c.stage = ReduceStage
reply.CanExit = false
}
return nil
}

There are some additional details of the code, that I haven’t covered, but the main idea is here. For details, refer back to my source code: https://github.com/PeterYaoNYU/mit-distributed-sys.

Author

Yuncheng Yao

Posted on

2024-01-08

Updated on

2024-01-19

Licensed under