我们以三个层次组织 RPC 发送方(也就是 Candidate)要票逻辑

  1. 选举 loop:但按框架默认的命名方式,我们称之为 electionTicketor
  2. 单轮选举:超时成为 Candidate 之后,针对所有 Peer(除自己外)发起一次要票过程,我们称之为 startElection
  3. 单次 RPC:针对每个 Peer 的 RequestVote 的请求和响应处理,由于要进行计票,需要用到一个局部变量 votes,因此我们使用一个startElection 中的嵌套函数来实现,称为 askVoteFromPeer

还剩一块,就是 RPC 接受方(其他 Peer)的投票逻辑

选举 Loop

基本逻辑是每次在循环时,要进行两项检查:

  1. 超时检查:看选举 Timer 是否已经超时,只有超时后才会真正发起选举。这里有个问题,就是为什么检查间隔(也就是循环中的 time.Sleep(time.Duration(ms) * time.Millisecond) )间隔也是随机的?因为只有检查间隔也随机才不会造成:超时间隔随机,但由于“等距”检查,造成同样检查间隔时,一同发起选举。
  2. 角色检查:判断是否为 Leader,如果自己已经是 Leader,则自然不用发起选举。因为发起选举的唯一目的就是当选 Leader

因此,我们要实现的第一个逻辑就是:随机超时上下界配置和超时检测函数。可以将选举超时间隔的上下界定义在最开始,后面调试的过程中如果发现该参数设置不合适,可以很方便地找到并修改。至于这两个上下界如何选,可以参考 PartA 实现要点(03.Raft PartA 领导者选举)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const (
electionTimeoutMin time.Duration = 250 * time.Millisecond
electionTimeoutMax time.Duration = 400 * time.Millisecond
)

func (rf *Raft) resetElectionTimerLocked() {
rf.electionStart = time.Now()
randRange := int64(electionTimeoutMax - electionTimeoutMin)
rf.electionTimeout = electionTimeoutMin + time.Duration(rand.Int63()%randRange)
}

func (rf *Raft) isElectionTimeoutLocked() bool {
return time.Since(rf.electionStart) > rf.electionTimeout
}

满足条件时,转变为 Candidate ,然后异步地(同步会造成主 Loop 检查延迟)发起选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rf *Raft) electionTicker() {
for !rf.killed() {
// Your code here (PartA)
// Check if a leader election should be started.
rf.mu.Lock()
if rf.role != Leader && rf.isElectionTimeoutLocked() {
rf.becomeCandidateLocked()
go rf.startElection(rf.currentTerm)
}
rf.mu.Unlock()

// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}

所有用到 Raft 全局变量的地方都要加锁,但注意不要在加锁时进行同步地发送RPC。

该 Loop 的生命周期和 Raft Peer 相同,即在创建 Raft 实例时就在后台开始运行

1
2
3
4
5
6
7
8
9
10
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {

// some code are omitted here

// start ticker goroutine to start elections
go rf.electionTicker()

return rf
}

单轮选举

一轮选举包括针对除自己外所有 Peer 的一轮要票 RPC,由于需要访问全局变量,所以仍然要加锁。同样的,就不能在持有锁的时候,同步地进行 RPC。需要用 goroutine 异步地对每个 Peer 进行 RPC。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) startElection(term int) bool {
votes := 0
askVoteFromPeer := func(peer int, args *RequestVoteArgs) {
// send rpc to `peer` and handle the response
}

rf.mu.Lock()
defer rf.mu.Unlock()

// every time locked
if rf.contextLostLocked(Candidate, term) {
return false
}

for peer := 0; peer < len(rf.peers); peer++ {
if peer == rf.me {
votes++
continue
}

args := &RequestVoteArgs{
Term: term,
CandidateId: rf.me,
}
go askVoteFromPeer(peer, args)
}

return true
}

上下文检查

这里面有个检查“上下文”是否丢失的关键函数:contextLostLocked 。上下文,在不同的地方有不同的指代。在我们的 Raft 的实现中,“上下文”就是指 TermRole。即在一个任期内,只要你的角色没有变化,就能放心地推进状态机

1
2
3
func (rf *Raft) contextLostLocked(role Role, term int) bool {
return !(rf.currentTerm == term && rf.role == role)
}

在多线程环境中,只有通过锁保护起来的临界区内的代码块才可以认为被原子地执行了。由于在 Raft 实现中,我们使用了大量的 goroutine,因此每当线程新进入一个临界区时,要进行 Raft 上下文的检查。如果 Raft 的上下文已经被更改,要及时终止 goroutine,避免对状态机做出错误的改动。

单次 RPC

单次 RPC 包括构造 RPC 参数、发送 RPC等待结果、对 RPC 结果进行处理三个部分。构造参数我们在 startElection 函数内完成了,因此 askVoteFromPeer 函数中就只需要包括后梁部分即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
askVoteFromPeer := func(peer int, args *RequestVoteArgs) {
// send RPC
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(peer, args, reply)

// handle the response
rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DDebug, "Ask vote from %d, Lost or error", peer)
return
}

// align the term
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}

// check the context
if rf.contextLostLocked(Candidate, rf.currentTerm) {
LOG(rf.me, rf.currentTerm, DVote, "Lost context, abort RequestVoteReply in T%d", rf.currentTerm)
return
}

// count votes
if reply.VoteGranted {
votes++
}
if votes > len(rf.peers)/2 {
rf.becomeLeaderLocked()
go rf.replicationTicker(term)
}
}

对齐任期

在接受到 RPC 或者处理 RPC 返回值时的第一步,就是要对齐 Term。因为 Term 在 Raft 中本质上是一种“优先级”或者“权力等级”的体现。Peer 的 Term 相同,是对话展开的基础,否则就要先对齐 Term:

  1. 如果对方 Term 比自己小:无视请求,通过返回值“亮出”自己的 Term
  2. 如果对方 Term 比自己大:乖乖跟上对方 Term,变成最“菜”的 Follower

对齐 Term 之后,还要检查上下文,即处理 RPC (RPC 回调函数也是在其他线程调用的)返回值和处理多线程本质上一样:都要首先确保上下文没有丢失,才能驱动状态机。

Q:为什么要检查上下文?都需要在什么地方检查上下文?

A:你可以这么理解,如果一件事情在一把锁的保护下不中断地做,那我们并不需要检查上下文。但是很多时候我们得进行一些耗时操作(如 RPC),此时需要把锁临时断开。之后,重新上锁时,你就得考虑,当前的状态是否满足我们之前期望的状态(也即“上下文”是否被维持了)——因为你的锁中间断开了,所以有些状态可能被其他线程给修改了。

回调函数

所有 Peer 在运行时都有可能收到要票请求,RequestVote 这个回调函数,就是定义该 Peer 收到要票请求的处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (PartA, PartB).
rf.mu.Lock()
defer rf.mu.Unlock()

// align the term
reply.Term = rf.currentTerm
if rf.currentTerm > args.Term {
LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject vote, higher term, T%d>T%d", args.CandidateId, rf.currentTerm, args.Term)
reply.VoteGranted = false
return
}
if rf.currentTerm < args.Term {
rf.becomeFollowerLocked(args.Term)
}

// check the votedFor
if rf.votedFor != -1 {
LOG(rf.me, rf.currentTerm, DVote, "-> S%d, Reject, Already voted S%d", args.CandidateId, rf.votedFor)
reply.VoteGranted = false
return
}

reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.resetElectionTimerLocked()
LOG(rf.me, rf.currentTerm, DVote, "-> S%d", args.CandidateId)
}

回调函数实现的一个关键点,还是要先对齐 Term。不仅是因为这是之后展开“两个 Peer 对话”的基础,还是因为在对齐 Term 的过程中,Peer 有可能重置 votedFor。这样即使本来由于已经投过票了而不能再投票,但提高任期重置后,在新的 Term 里,就又有一票可以投了。

还有一点,论文里很隐晦地提到过:只有投票给对方后,才能重置选举 timer。换句话说,在没有投出票时,是不允许重置选举 timer 的。从感性上来理解,只有“认可对方的权威”(发现对方是 Leader 或者投票给对方)时,才会重置选举 timer —— 本质上是一种“承诺”:认可对方后,短时间就不再去发起选举争抢领导权。