经过调试,我们有几个地方没有实现对:日志同步的 reply 处理前没有检查 context、Start 没有实现、AppendEntries 日志判断。

Context 检查

需要 Context 检查的主要有四个地方:

  1. startReplication 前,检查自己仍然是给定 term 的 Leader
  2. replicateToPeer 处理 reply 时,检查自己仍然是给定 term 的 Leader
  3. startElection 前,检查自己仍然是给定 term 的 Candidate
  4. askVoteFromPeer 处理 reply 时,检查自己仍然是给定 term 的 Candidate

由于我们 replication 和 election 实现的对称性,可以发现前两个和后两个是对称的,因此很好记忆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peer, args, reply)

rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed", peer)
return
}

// align the term
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}

// check context lost
if rf.contextLostLocked(Leader, term) {
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Context Lost, T%d:Leader->T%d:%s", peer, term, rf.currentTerm, rf.role)
return
}

// hanle the reply
// probe the lower index if the prevLog not matched
if !reply.Success {
// go back a term
idx, term := args.PrevLogIndex, args.PrevLogTerm
for idx > 0 && rf.log[idx].Term == term {
idx--
}
rf.nextIndex[peer] = idx + 1
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Not matched at %d, try next=%d", peer, args.PrevLogIndex, rf.nextIndex[peer])
return
}

// update match/next index if log appended successfully
rf.matchIndex[peer] = args.PrevLogIndex + len(args.Entries) // important
rf.nextIndex[peer] = rf.matchIndex[peer] + 1

// update the commitIndex
majorityMatched := rf.getMajorityIndexLocked()
if majorityMatched > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DApply, "Leader update the commit index %d->%d", rf.commitIndex, majorityMatched)
rf.commitIndex = majorityMatched
rf.applyCond.Signal()
}
}

Start 实现

只有正确实现了 Start,tester 才能通过 Start 给 Leader 本地追加日志。进而通过 Leader 的 AppendEntries RPC 将日志分发给所有 Follower,然后在发现大多数 Peer 在本地追加该日志之后,指示每个 Peer Apply 该日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()

if rf.role != Leader {
return 0, 0, false
}
rf.log = append(rf.log, LogEntry{
CommandValid: true,
Command: command,
Term: rf.currentTerm,
})
LOG(rf.me, rf.currentTerm, DLeader, "Leader accept log [%d]T%d", len(rf.log)-1, rf.currentTerm)

return len(rf.log) - 1, rf.currentTerm, true
}

实现中有两点需要注意:

  1. Append 日志前一点要先检查自己是否仍然为 Leader:只有 Leader 才能直接 Append 日志到本地,也即整个 Raft Group 只有一个外界数据接收点——那就是 Leader;不遵循此原则,会出现日志冲突。
  2. 构造 LogEntry 的时候不要忘记填 Term 字段:在选领导者的时候会通过比较最后一条日志的 term 来确定谁更 up-to-date,不设置 term,会使得该比较出问题,导致不具有所有提交日志的 Candidate 当选 Leader。

AppendEntries 边界检查

在判断 args.PrevLogIndex >= len(rf.log) 的时候忘记了等号**。**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
reply.Success = false

// align the term
if args.Term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject log, Higher term, T%d<T%d", args.LeaderId, args.Term, rf.currentTerm)
return
}
if args.Term >= rf.currentTerm {
rf.becomeFollowerLocked(args.Term)
}

// return failure if prevLog not matched
if args.PrevLogIndex >= len(rf.log) {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject log, Follower log too short, Len:%d < Prev:%d", args.LeaderId, len(rf.log), args.PrevLogIndex)
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject log, Prev log not match, [%d]: T%d != T%d", args.LeaderId, args.PrevLogIndex, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm)
return
}

// append the leader log entries to local
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
reply.Success = true
LOG(rf.me, rf.currentTerm, DLog2, "Follower accept logs: (%d, %d]", args.PrevLogIndex, args.PrevLogIndex+len(args.Entries))

// hanle LeaderCommit
if args.LeaderCommit > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DApply, "Follower update the commit index %d->%d", rf.commitIndex, args.LeaderCommit)
rf.commitIndex = args.LeaderCommit
rf.applyCond.Signal()
}

rf.resetElectionTimerLocked()
}

最后就是和测试速度相关的 replicateInterval 参数设置,设的越小,测试通过越快。但是不能太小,因为测试框架会卡下界,我试了下,这个下界应该是 60 ms。因此我们将其设置为 70 ms:

1
2
3
4
5
6
const (
electionTimeoutMin time.Duration = 250 * time.Millisecond
electionTimeoutMax time.Duration = 400 * time.Millisecond

replicateInterval time.Duration = 70 * time.Millisecond
)

逻辑自查

在我们进行调试之前,可以先看一个我之前总结的 Bug 检查列表,逐一进行下对照,看是否遗漏了某些实现。

  1. Start 函数是否实现。
  2. 注意特殊更新 leader 自己对应的 nextIndexmatchIndex
    1. 初始化时需要设定 matchIndex = len(rf.log) - 1,当然,由于每次 RPC 时设定了,这里也可以不管。
    2. 发 RPC 阶段可以跳过,但也要更新 matchIndex = len(rf.log) - 1,因为可能 rf.log 可能由于 Start 的调用而改变了。
  3. 依据 matchIndex 计算多数日志进度问题:
    1. 在 Sort 之后,顺序是从小到大的,因此在计算 CommitIndex 时要取中位数偏左边那个数
    2. tmpIndexes := rf.matchIndex[:] 不会复制 Slice 底层数组。得新建一个 Slice,然后使用 copy 函数才能避免 sort 对 matchIndex 的影响。
  4. 选举时钟 Election Timer 重置问题
    1. **(PartB 可过,PartC 过不了)**日志不匹配时,也要 reset timer
  5. 在 startReplication 函数内只使用传入的参数 Term:leaderTerm
    1. 每次加锁后,都要先校验 contextLost,即是否为 Leader,是否 rf.currentTerm == leaderTerm
    2. 校验后,统一使用 leaderTerm,表明 term 没有 change 过。
  6. 别忘了在 AppendEntriesArgs 中设置 leaderCommit
  7. 别忘了在 RequestVoteArgs 中设置 LastLogIndex 和 LastLogTerm
  8. More up-to-date 比较的时候 term 和 index 参数别传反了
  9. Leader 和 Follower 都要在合适的时候 apply 日志。即在更新 commitIndex 时, 设置 applyCond.Signal,来唤醒 applicationTicker
  10. 上一个 AppendEntries 的处理结果还没回来,下一个就已经发出去了,导致同样的 prevTerm/prevIndex 的 RPC 发了多次。如果 Follower 接到 RPC 就删除日志,甚至会导致两次返回去 conflict index 和 term 不一样
  11. PartB 可过,PartC 过不了)AppendEntries 的试探阶段,reply 会乱序回来。这可能会导致,本来已经找到匹配点了,但是此时回来了一个延迟到达之前非匹配的点失败回复。这种请求应该通过合理方式拒掉这种延迟回复。比如比对 len(rf.log) 和当前rf.nextIndex
  12. 两个 RPC,candidate 和 leader 处理 reply 的时候,一定要对齐 term,而不是先判断 contextLost

要点总结

  1. 初始化时,给一条空日志作为哨兵,可以减少很多边界判断:
    1. 可以无脑取最后一条日志,而不用担心是否为空
    2. 在试探后退时,可以退到 0 的位置,而非 -1
  2. Leader 的两个数组,本质上是 Leader 对全局 Peer 的 Log 的两个视图:
    1. nextIndex:试探点视图,用于寻找 Leader 和 Follower 的日志匹配点
    2. matchIndex:匹配点视图,收到成功的 AppendEntriesReply 后更新,进而计算 CommitIndex
  3. 日志同步分两个阶段(两个阶段都有心跳的作用),分别对应上面两个数组,分水岭是第一次同步成功:
    1. Backtracking:探测匹配点
    2. Appending:正常同步日志
  4. tmpIndexes := rf.matchIndex[:] 不会复制。得新建一个 Slice,然后使用 copy 函数。
  5. 如果要进行日志探测优化,最好封装两个函数:
    1. firstEntryForTermLocked
    2. lastEntryForTermLocked

测试逻辑

  1. cfg.one 由于要判断 cfg.nCommitted(index) > expectedServers,因此最少需要两轮心跳,第一轮同步日志,第二轮同步 CommitIndex。由此观之,测试的主要占时大头由 ReplicateInterval 决定。
  2. 测试逻辑:大概是混沌运行一段时间,然后最后将所有连通,看在 10s 内是否所有 peer 都能 apply 完(在 one 里)。
    1. 如果联通后, append 一条日志。此时 没有 apply
    2. 由于某个 follower 超时(比如并发过高),变更 leader 后,
    3. 由于没有新的日志 append 过来,但无法 commit 老 term 的日志,导致不能 apply 该日志
    4. 感觉只能通过 leader 上台提交一条新的日志来解决了,但这样无法过 2B
  3. 在工业界,常用 Leader 上台后 append 一个空日志,以让 leaderCommit 尽快更新
    1. 但在本实验中会让 2B 挂掉,所以不能这么干,因为 PartB 的测试依赖 tester 眼中的 index。
  4. 之前以为 TestBackupPartB 耗时是因为在试探阶段,因此着重优化 Backtracking,发现效果不大:
    1. 如果每次 nextIndex 线性 -1,该测试用例用时 50s
    2. 改成取回 Follower.LastLogTerm,从小于 LastLogTerm 的第一个 Term 开始试探,用时 30s
    3. 使用 Students’ Guide to Raft 提到的算法,最后也是 30s
  5. 后面在看了测试逻辑之后发现,因为其测试逻辑涉及几十个 Start->Check All Commit 的过程。每一次写 Leader 到日志真正提交至少需要两个 replicationInterval,一次用于分发日志,一次用于同步 LeaderCommit。因此该测试的优化并不在试探阶段,而在同步阶段。有两种方法:最简单粗暴的就是缩小 replicationInterval;另一个就是在每次 Start 之后立即发送 AppendEntries 请求,而不是等到一个 replicationInterval 之后。