[Consensus] Raft from Scratch: Leader Election

In this series, I will try to implement the Raft protocol in Golang, using the RPC as a building block. Source code available at https://github.com/PeterYaoNYU/mit-distributed-sys.

PeterYaoNYU/mit-distributed-sys

Distributed System Related Projects in GO: MapReduce, Raft, KV-Storage

To be frank, I am not sure if I am able to finish this project, given the obvious complexity of implementing the Raft algorithm. But we have to at least try.

The choice of Raft over Paxos is mainly out of understanbility: Raft is not necessarily superior to Paxos, but it’s much easier to implement, given that the documentation is much more complete. We are lucky that the author already divided the implementation into separate components, and in this part, we will implement the Leader Election part of the code.

To begin, we need to define the structure of a Raft instance, for originality, I stick to figure 2 in the original paper in terms of naming convention. Note that this is subject to change in subsequent implementation: currently we are only focusing on the Leader Election. We also need to define some structs for the RPC request and reply.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

applyCh chan ApplyMsg
applyCond *sync.Cond
replicatorCond []*sync.Cond
state NodeState

currentTerm int
votedFor int
logs []Entry

commitIndex int
lastApplied int
nextIndex []int
matchIndex []int

electionTimer *time.Timer
heartbeatTimer *time.Timer
}

type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}

//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
// Your data here (2A).
Term int
VoteGranted bool
}

type Entry struct {
Term int
Command interface{}
}

type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []Entry
LeaderCommit int
}

type AppendEntriesReply struct {
Term int
Success bool
}


Some constants related to heartbeat and leader timeout are as follows, you can change it according to the specific requirement of your distributed system.

1
2
3
4
5
6
7
8
9
10
11
12
const (
Follower NodeState = iota
Candidate
Leader
)

const (
MinElectionTimeout = 1300 * time.Millisecond
MaxElectionTimeout = 2000 * time.Millisecond
)

const HeartbeatInterval = 100 * time.Millisecond

Then we write the initialization code for the Raft instance. The main idea is simple: initialize the varaibles defined in the Raft Struct.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).
rf.dead = 0
rf.applyCh = applyCh
// this loc I am not sure about, what is the exact usage of applyCond?
rf.applyCond = sync.NewCond(&rf.mu)
rf.replicatorCond = make([]*sync.Cond, len(peers))
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = -1

rf.logs = make([]Entry, 1)
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
rf.heartbeatTimer = time.NewTimer(HeartbeatInterval)
rf.electionTimer = time.NewTimer(RandomizedElectionTimeout())

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

fmt.Printf("{Node %v} is initialized with state {state %v,term %v,commitIndex %v,lastApplied %v}", rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied)

// start ticker goroutine to start elections
go rf.ticker()

fmt.Print("Raft is initialized\n")

return rf
}

After initialization, a ticker will be set off. Its main functions are, in a separate go routine:

  • if is the leader, send regular heartbeat
  • for all server, set the randomized timeout for the election timer

I use 2 go timers to implement that, with the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
for !rf.killed() {
select {
case <-rf.electionTimer.C:
rf.mu.Lock()
rf.ChangeState(Candidate)
rf.currentTerm += 1
rf.StartElection()
rf.electionTimer.Reset(RandomizedElectionTimeout())
fmt.Printf("Node %v's election timer fires, curremtTerm: %v\n", rf.me, rf.currentTerm)
rf.mu.Unlock()
case <-rf.heartbeatTimer.C:
rf.mu.Lock()
if rf.state == Leader {
rf.BroadcastHeartbeat(true)
rf.heartbeatTimer.Reset(HeartbeatInterval)
}
rf.mu.Unlock()
}
}
}

First, let’s focus on the BroadcastHeartbeat() function, which is easier. It is just sending out regular AppendEntries RPC to all servers except itself:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (rf *Raft) BroadcastHeartbeat(includeLogReplication bool) {
// rf.mu.Lock()
// defer rf.mu.Unlock()

fmt.Printf("[BroadcastHeartbeat] {Node %v} broadcasts heartbeat in term %v\n", rf.me, rf.currentTerm)

args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
}

for peerIndex, _ := range rf.peers {
if peerIndex == rf.me {
continue
}
go func(peerIndex int) {
reply := AppendEntriesReply{}
ok := rf.CallAppendEntries(peerIndex, &args, &reply)
rf.mu.Lock()
defer rf.mu.Unlock()

if ok {
if reply.Term > rf.currentTerm {
fmt.Printf("[Heartbeat Receive] {Node %v} receives Heartbeat reply gtom {Node %v} with term %v and steps down in term %v", rf.me, peerIndex, reply.Term, rf.currentTerm)
rf.ChangeState(Follower)
rf.currentTerm = reply.Term
rf.votedFor = -1
}
} else {
fmt.Printf("{Node %v} fails to send AppendEntriesRequest %v to {Node %v}", rf.me, args, peerIndex)
}
}(peerIndex)
}
}

func (rf *Raft) CallAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

if args.Term < rf.currentTerm {
reply.Term, reply.Success = rf.currentTerm, false
return
}
if args.Term > rf.currentTerm {
rf.ChangeState(Follower)
rf.currentTerm = args.Term
rf.votedFor = -1
fmt.Printf("[Heartbeat + Follower Change] {Node %v} receives heartbeat from {Node %v} in term %v\n", rf.me, args.LeaderId, rf.currentTerm)
}
fmt.Printf("[Heartbeat] {Node %v} receives heartbeat from {Node %v} in term %v\n", rf.me, args.LeaderId, rf.currentTerm)
rf.electionTimer.Reset(RandomizedElectionTimeout())
reply.Term, reply.Success = rf.currentTerm, true
return
}

Note that the append entries function is incomplete, since we haven’t added log replication yet.

Next, let’s implement the voting procedure. The basic logic is that:
Once a vote request is received:

  • check if the term number of the request is too stale. If so, turn down the request.
  • if my term number is too stale, if so, update my term number
  • if the new leader-to-be’s log is up to date. If so, vote for this leader.

An implementation of this looks as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rf *Raft) RequestVote(request *RequestVoteArgs, response *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
// defer fmt.Printf("{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing requestVoteRequest %v and reply requestVoteResponse %v", rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), request, response)
defer fmt.Printf("[RequestVote]{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v} before processing requestVoteRequest %v and reply requestVoteResponse %v\n", rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, request, response)

if request.Term < rf.currentTerm || (request.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != request.CandidateId) {
response.Term = rf.currentTerm
response.VoteGranted = false
return
}
if request.Term > rf.currentTerm {
rf.ChangeState(Follower)
rf.currentTerm = request.Term
rf.votedFor = -1
}
if !rf.LogIsUpToDate(request.LastLogTerm, request.LastLogIndex) {
response.Term = rf.currentTerm
response.VoteGranted = false
return
}
rf.votedFor = request.CandidateId
rf.electionTimer.Reset(RandomizedElectionTimeout())
response.Term, response.VoteGranted = rf.currentTerm, true
}

If the election timer times out, then a follower also changes to a candidate, and send out request asking for votes from a majority. The key idea of the implementation is to do that in a non-blocking manner. Start a separate go routine for each vote request, so that the thread is not blocker waiting for a particular request. Meanwhile, the term number also needs to be checked to ensure that the candidate itself is a viable candidate, i.e., it is not outdated. If through the RPC request, the node finds itself outdated, it just make itself a follower:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (rf *Raft) StartElection() {
fmt.Printf("StartElection: Node %v\n", rf.me)
request := rf.generateRequestVoteRequest()
fmt.Printf("Node %v generates RequestVoteRequest %v in term %v\n", rf.me, request, rf.currentTerm)
grantedVotes := 1
rf.votedFor = rf.me
for peer := range rf.peers {
if peer == rf.me {
continue
}
go func(peer int) {
fmt.Printf("LOCK: Node %v sending to %v\n", rf.me, peer)
response := new(RequestVoteReply)
if rf.sendRequestVote(peer, request, response) {
rf.mu.Lock()
defer rf.mu.Unlock()
fmt.Printf("[StartElection] {Node %v} receives RequestVoteResponse %v from {Node %v} after sending RequestVoteRequest %v in term %v\n", rf.me, response, peer, request, rf.currentTerm)
if rf.currentTerm == request.Term && rf.state == Candidate {
if response.VoteGranted {
grantedVotes += 1
if grantedVotes > len(rf.peers)/2 {
fmt.Printf("{Node %v} receives majority votes in term %v\n", rf.me, rf.currentTerm)
rf.ChangeState(Leader)
rf.BroadcastHeartbeat(true)
}
}
} else if response.Term > rf.currentTerm {
fmt.Printf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v", rf.me, peer, response.Term, rf.currentTerm)
rf.ChangeState(Follower)
rf.currentTerm = response.Term
rf.votedFor = -1
}
}
}(peer)
}
fmt.Printf("UNLOCK: Node %v finishes StartElection\n", rf.me)
}

This concludes the leader election part of Raft. For specific functions whose implementation is not documented in the page, please refer back to the source code.

Author

Yuncheng Yao

Posted on

2024-01-12

Updated on

2024-01-19

Licensed under