Distributed System Related Projects in GO: MapReduce, Raft, KV-Storage
To be frank, I am not sure if I am able to finish this project, given the obvious complexity of implementing the Raft algorithm. But we have to at least try.
The choice of Raft over Paxos is mainly out of understanbility: Raft is not necessarily superior to Paxos, but it’s much easier to implement, given that the documentation is much more complete. We are lucky that the author already divided the implementation into separate components, and in this part, we will implement the Leader Election part of the code.
To begin, we need to define the structure of a Raft instance, for originality, I stick to figure 2 in the original paper in terms of naming convention. Note that this is subject to change in subsequent implementation: currently we are only focusing on the Leader Election. We also need to define some structs for the RPC request and reply.
// // A Go object implementing a single Raft peer. // type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int// this peer's index into peers[] dead int32// set by Kill()
// Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain.
applyCh chan ApplyMsg applyCond *sync.Cond replicatorCond []*sync.Cond state NodeState
currentTerm int votedFor int logs []Entry
commitIndex int lastApplied int nextIndex []int matchIndex []int
type RequestVoteArgs struct { // Your data here (2A, 2B). Term int CandidateId int LastLogIndex int LastLogTerm int }
// // example RequestVote RPC reply structure. // field names must start with capital letters! // type RequestVoteReply struct { // Your data here (2A). Term int VoteGranted bool }
type Entry struct { Term int Command interface{} }
type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []Entry LeaderCommit int }
type AppendEntriesReply struct { Term int Success bool }
Some constants related to heartbeat and leader timeout are as follows, you can change it according to the specific requirement of your distributed system.
// Your initialization code here (2A, 2B, 2C). rf.dead = 0 rf.applyCh = applyCh // this loc I am not sure about, what is the exact usage of applyCond? rf.applyCond = sync.NewCond(&rf.mu) rf.replicatorCond = make([]*sync.Cond, len(peers)) rf.state = Follower rf.currentTerm = 0 rf.votedFor = -1
// The ticker go routine starts a new election if this peer hasn't received // heartsbeats recently. func(rf *Raft) ticker() { for !rf.killed() { select { case <-rf.electionTimer.C: rf.mu.Lock() rf.ChangeState(Candidate) rf.currentTerm += 1 rf.StartElection() rf.electionTimer.Reset(RandomizedElectionTimeout()) fmt.Printf("Node %v's election timer fires, curremtTerm: %v\n", rf.me, rf.currentTerm) rf.mu.Unlock() case <-rf.heartbeatTimer.C: rf.mu.Lock() if rf.state == Leader { rf.BroadcastHeartbeat(true) rf.heartbeatTimer.Reset(HeartbeatInterval) } rf.mu.Unlock() } } }
First, let’s focus on the BroadcastHeartbeat() function, which is easier. It is just sending out regular AppendEntries RPC to all servers except itself:
If the election timer times out, then a follower also changes to a candidate, and send out request asking for votes from a majority. The key idea of the implementation is to do that in a non-blocking manner. Start a separate go routine for each vote request, so that the thread is not blocker waiting for a particular request. Meanwhile, the term number also needs to be checked to ensure that the candidate itself is a viable candidate, i.e., it is not outdated. If through the RPC request, the node finds itself outdated, it just make itself a follower:
func(rf *Raft) StartElection() { fmt.Printf("StartElection: Node %v\n", rf.me) request := rf.generateRequestVoteRequest() fmt.Printf("Node %v generates RequestVoteRequest %v in term %v\n", rf.me, request, rf.currentTerm) grantedVotes := 1 rf.votedFor = rf.me for peer := range rf.peers { if peer == rf.me { continue } gofunc(peer int) { fmt.Printf("LOCK: Node %v sending to %v\n", rf.me, peer) response := new(RequestVoteReply) if rf.sendRequestVote(peer, request, response) { rf.mu.Lock() defer rf.mu.Unlock() fmt.Printf("[StartElection] {Node %v} receives RequestVoteResponse %v from {Node %v} after sending RequestVoteRequest %v in term %v\n", rf.me, response, peer, request, rf.currentTerm) if rf.currentTerm == request.Term && rf.state == Candidate { if response.VoteGranted { grantedVotes += 1 if grantedVotes > len(rf.peers)/2 { fmt.Printf("{Node %v} receives majority votes in term %v\n", rf.me, rf.currentTerm) rf.ChangeState(Leader) rf.BroadcastHeartbeat(true) } } } elseif response.Term > rf.currentTerm { fmt.Printf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v", rf.me, peer, response.Term, rf.currentTerm) rf.ChangeState(Follower) rf.currentTerm = response.Term rf.votedFor = -1 } } }(peer) } fmt.Printf("UNLOCK: Node %v finishes StartElection\n", rf.me) }
This concludes the leader election part of Raft. For specific functions whose implementation is not documented in the page, please refer back to the source code.