本章我们要增加在代码组织(02.Raft 代码总览)一章中提到的三个工作流中的最后一个:日志应用工作流, applyTicker

由于 Apply 只有在 commitIndex 变大的时候才会触发,因此我们可以使用 golang 中的条件变量 sync.Cond ,使用唤醒机制,只有在 commitIndex 增大后才唤醒 applyTicker

字段补全

根据论文图 2 我们需要给 apply 逻辑补上两个字段:

  1. commitIndex:全局日志提交进度
  2. lastApplied:本 Peer 日志 apply 进度

由于我们想在实现时,使用 sync.Cond 唤醒 apply 的工作流,因此需要增加:applyCond

最后,在我们 Raft 实现的设定中,apply 的过程,就是将 applyMsg 通过构造 Peer 时传进来的 channel 返回给应用层。因此还需要保存下这个 applyCh

1
2
3
4
5
6
7
8
9
10
11
12
// A Go object implementing a single Raft peer.
type Raft struct {
// ......

// commit index and last applied
commitIndex int
lastApplied int
applyCond *sync.Cond
applyCh chan ApplyMsg

// ......
}

增加其初始化。applyCond 在初始化时,需要关联到一把锁上,这是 sync.Cond 的使用要求,之后只有在该锁临界区内才可以进行 Wait()Signal() 的调用。对于我们来说,这把锁自然就是全局的那把大锁:rf.mu。具体 sync.Cond 的工作原理可以参考并发编程一章的相关内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
// ......

rf.applyCh = applyCh
rf.commitIndex = 0
rf.lastApplied = 0
rf.applyCond = sync.NewCond(&rf.mu)

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

// start ticker goroutine to start elections
go rf.electionTicker()
go rf.applyTicker()

return rf
}

Apply 工作流

Apply 工作流在实现的时候,最重要的就是在给 applyCh 发送 ApplyMsg 时,不要在加锁的情况下进行。因为我们并不知道这个操作会耗时多久(即应用层多久会取走数据),因此不能让其在 apply 的时候持有锁。

于是,我们把 apply 分为三个阶段:

  1. 阶段一:构造所有待 apply 的 ApplyMsg
  2. 阶段二:遍历这些 msgs,进行 apply
  3. 阶段三:更新 lastApplied
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rf *Raft) applyTicker() {
for !rf.killed() {
rf.mu.Lock()
rf.applyCond.Wait()

entries := make([]LogEntry, 0)
// should start from rf.lastApplied+1 instead of rf.lastApplied
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
entries = append(entries, rf.log[i])
}
rf.mu.Unlock()

for i, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: entry.CommandValid,
Command: entry.Command,
CommandIndex: rf.lastApplied + 1 + i,
}
}

rf.mu.Lock()
LOG(rf.me, rf.currentTerm, DApply, "Apply log for [%d, %d]", rf.lastApplied+1, rf.lastApplied+len(entries))
rf.lastApplied += len(entries)
rf.mu.Unlock()
}
}

只要我们保证全局就只有这一个 apply 的地方,那我们这样分成三个部分问题就不大。尤其是需要注意,当后面增加 snapshot apply 的逻辑时,也要放到该函数里。

Leader CommitIndex 更新

在 Leader 给其他 Peer AppendEntries 成功后,会更新 rf.matchIndex

1
2
3
4
5
6
7
8
9
10
11
12
13
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
// ......

// update the commmit index if log appended successfully
rf.matchIndex[peer] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[peer] = rf.matchIndex[peer] + 1 // important: must update
majorityMatched := rf.getMajorityIndexLocked()
if majorityMatched > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DApply, "Leader update the commit index %d->%d", rf.commitIndex, majorityMatched)
rf.commitIndex = majorityMatched
rf.applyCond.Signal()
}
}

在每次更新 rf.matchIndex 后,依据此全局匹配点视图,我们可以算出多数 Peer 的匹配点,进而更新 Leader 的 CommitIndex。我们可以使用排序后找中位数的方法来计算。

这里需要说明一下,也是视频中没有说明白的地方,就是排序之后是匹配点从小到大,因此需要找左边的那个(如果 peers 是偶数的话)中位数,才是多数派的匹配点。

1
2
3
4
5
6
7
8
9
func (rf *Raft) getMajorityIndexLocked() int {
// TODO(spw): may could be avoid copying
tmpIndexes := make([]int, len(rf.matchIndex))
copy(tmpIndexes, rf.matchIndex)
sort.Ints(sort.IntSlice(tmpIndexes))
majorityIdx := (len(tmpIndexes) - 1) / 2
LOG(rf.me, rf.currentTerm, DDebug, "Match index after sort: %v, majority[%d]=%d", tmpIndexes, majorityIdx, tmpIndexes[majorityIdx])
return tmpIndexes[majorityIdx] // min -> max
}

由于排序会改变原数组,因此要把 matchIndex 复制一份再进行排序,

如果 commitIndex 更新后,则唤醒 apply 工作流,提醒可以 apply 新的日志到本地了。

Follower CommitIndex 更新

在 Leader CommitIndex 更新后,会通过下一次的 AppendEntries 的 RPC 参数发送给每个 Follower。则首先,要根据论文图 2,在 AppendEntriesArgs 中增加 LeaderCommit 参数。

1
2
3
4
5
6
7
8
9
10
type AppendEntriesArgs struct {
Term int
LeaderId int

PrevLogIndex int
PrevLogTerm int
Entries []LogEntry

LeaderCommit int
}

每个 Follower 通过 AppendEntries 的回调函数收到 Leader 发来的 LeaderCommit,来更新本地的 CommitIndex,进而驱动 Apply 工作流开始干活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
LOG(rf.me, rf.currentTerm, DDebug, "<- S%d, Receive log, Prev=[%d]T%d, Len()=%d", args.LeaderId, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))

reply.Term = rf.currentTerm
reply.Success = false
// align the term
if args.Term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Higher term, T%d<T%d", args.LeaderId, args.Term, rf.currentTerm)
return
}
if args.Term >= rf.currentTerm {
rf.becomeFollowerLocked(args.Term)
}

// return failure if the previous log not matched
if args.PrevLogIndex >= len(rf.log) {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Follower log too short, Len:%d <= Prev:%d", args.LeaderId, len(rf.log), args.PrevLogIndex)
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Prev log not match, [%d]: T%d != T%d", args.LeaderId, args.PrevLogIndex, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm)
return
}

reply.Success = true
// append the leader logs to local
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
LOG(rf.me, rf.currentTerm, DLog2, "Follower append logs: (%d, %d]", args.PrevLogIndex, args.PrevLogIndex+len(args.Entries))

// update the commit index if needed and indicate the apply loop to apply
if args.LeaderCommit > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DApply, "Follower update the commit index %d->%d", rf.commitIndex, args.LeaderCommit)
rf.commitIndex = args.LeaderCommit
if rf.commitIndex >= len(rf.log) {
rf.commitIndex = len(rf.log) - 1
}
rf.applyCond.Signal()
}

// reset the election timer, promising not start election in some interval
rf.resetElectionTimerLocked()
}

上面 Peer 收到 Leader 发来的日志同步请求处理逻辑中,

1
2
3
if args.Term >= rf.currentTerm {
rf.becomeFollowerLocked(args.Term)
}

大家可能有个问题,对于大于的情况,容易理解,论文中就这么写的。为什么相同任期也要变为 Follower?

对于相同任期来说,可以分情况考虑:

  1. 首先他不可能是 leader,因为一个任期最多只会选出一个 leader
  2. 如果他是 Candidate,收到 Leader 的日志同步请求,得变 Follower
  3. 如果本来就是 Follower 就无所谓了