由于事先做了数据流图,因此总体来说 PartD 没有什么大问题。剩下有些问题,上一节中提到过一些;其他的集中在两个部分。

问题修复

问题包括两部分:

  1. 在 Snapshot 时对 index 的检查
  2. 使用 rf.log.at 前对传入下标的检查

这两个问题单独在 Raft 的测试中是检测不出来的,要通过之后分布式 KV 小结才能检查出来,但根据主题的亲和性,还是要放在这里讲(视频中没有提到)

Snapshot 下标检查

之前只检查了下界 rf.log.snapLastIdx (不要重复进行 snapshot),但也要加上检查上界:rf.commitIndex。即,不能对没有提交的日志进行 snapshot。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// --- raft_compaction.go
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (PartD).
rf.mu.Lock()
defer rf.mu.Unlock()
if index > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DSnap, "Couldn't snapshot before CommitIdx: %d>%d", index, rf.commitIndex)
return
}
if index <= rf.log.snapLastIdx {
LOG(rf.me, rf.currentTerm, DSnap, "Already snapshot in %d<=%d", index, rf.log.snapLastIdx)
return
}
rf.log.doSnapshot(index, snapshot)
rf.persistLocked()
}

LogRaft.at 下标检查

每次使用 RaftLog.at 函数时,都要保证其传入参数落入 [RaftLog.snapLastIdx, RaftLog.size()-1] 的区间内,有的地方可以通过逻辑来隐式保证,但有些地方由于并发的问题,必须要进行显式地检查。

主要包括 applicationTicker 的 Loop 中逻辑。Apply Log 之前会把所有 (lastAppiled, commitIndex] 的日志取出来,但有可能这个区间会超出 rf.log 中所保存的 tailLog 日志区间,因此要做剪裁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (rf *Raft) applicationTicker() {
for !rf.killed() {
rf.mu.Lock()
rf.applyCond.Wait()
entries := make([]LogEntry, 0)
snapPendingApply := rf.snapPending

if !snapPendingApply {
if rf.lastApplied < rf.log.snapLastIdx {
rf.lastApplied = rf.log.snapLastIdx
}

// make sure that the rf.log have all the entries
start := rf.lastApplied + 1
end := rf.commitIndex
if end >= rf.log.size() {
end = rf.log.size() - 1
}
for i := start; i <= end; i++ {
entries = append(entries, rf.log.at(i))
}
}
rf.mu.Unlock()

// --- some code ommited here
}
}

通过测试

如果我们在写代码前能把工程的数据流图弄得门清,就可以最大限度的减少大方面的纰漏。剩下些小的问题,通过单测就可以纠正。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
➜  raft git:(raft) ✗ go test -run PartD
Test (PartD): snapshots basic ...
... Passed -- 5.0 3 148 54742 235
Test (PartD): install snapshots (disconnect) ...
... Passed -- 48.6 3 1536 583368 318
Test (PartD): install snapshots (disconnect+unreliable) ...
... Passed -- 68.4 3 2168 831179 345
Test (PartD): install snapshots (crash) ...
... Passed -- 71.4 3 2052 771805 364
Test (PartD): install snapshots (unreliable+crash) ...
... Passed -- 75.5 3 2116 770524 365
Test (PartD): crash and restart all servers ...
... Passed -- 8.8 3 292 83682 58
Test (PartD): snapshot initialization after crash ...
... Passed -- 2.4 3 86 23088 14
PASS
ok course/raft 280.304s
➜ raft git:(raft) ✗ dstest PartD -p 30 -n 100
Verbosity level set to 0
Tests ━━━━━━━━━━━━━━━━━━━━╺━━━━━━━━━━━━ 0:00:53 60% 0:10:00
╭─────────────────────────────────────────────╮
│ PartD ⠹ ━━━━━━━━━━━━━━━━━━━━━╺━━━━━━━━━━━━ 60/100 │
╰─────────────────────────────────────────────╯

➜ raft git:(raft) ✗ dstest PartD -p 30 -n 100
Verbosity level set to 0
┏━━━━━━━┳━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Test. ┃. Failed ┃ Total ┃ Time ┃
┡━━━━━━━╇━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━━━━┩
│ PartD │ 0 │ 100 │ 278.18 ± 7.05 │
└───────┴────────┴───────┴───────────────┘

注意要点

由于课程录屏是在我实现过多遍后总结出的规律,因此很多大家第一遍拿到 PartD 时可能会遇到很多问题,在视频中并没有讲。这里就一次性的将我实现过程中思考过的问题罗列下。大家如果有其他问题,也可以在文档下面评论,我会选一些有共性的,追加到这里。

  1. snapshot 以什么形式存储和持久化,Raft 层需不需要理解 snapshot 内部内容?
    1. 不需要,只有应用层需要感知其具体内容。 Raft 层在收到应用层 snapshot 时,只需要在内存和外存(使用 persistor 持久化)各保存一份即可。
    2. 之后由 Follower 的 Raft 层给到应用层进行 apply 的时候,直接给 snapshot 就可以,不用重新识别出一条条 log(也是不可能的)。
  2. 最终要实现成切片多次发 snapshot(像论文一样),还是简单的实现成发一次就够了?
    1. 测试框架要求一次发送就行,因为 snapshot 最大也不会太大。因为测试框架主要是针对功能进行测试的,并没有进行压力测试。
    2. 但实践中,们是没有办法控制 snapshot 的大小的,如果其过大,我们肯定要对其进行拆分。
  3. Follower 收到 snapshot apply 的时候完全 apply?那应用层如何进行去重?Snapshot 中的 kv 带 index 吗?
    1. 应用层 Apply snapshot 时,是覆盖式(全量式,因为 Snapshot 下标肯定从 0 开始) Apply;在 Apply log 时,是增量式的 Apply。两者不同。
    2. 因此在 Apply snapshot 时,并不需要去重(因为不是增量式的),直接替换掉当前状态机即可。
  4. 不仅是 Leader 会被做 snapshot?
    1. 所有 Peer 都会,因为这是一个应用层决定的行为。
    2. 但除 Leader 外的其他 Peer 可能会同时收到 Leader 来的 Snapshot。
    3. 即,Snapshot 分两种:由应用层主动地 Snapshot;从 Leader 接收来的被动的 Snapshot。
  5. 应用层调用 **snapshot(index, snapshot)**意味着 什么?
    1. 对应用层意味着:
      1. 应用已经做好了一个快照,但编解码方式之后应用层自己知道,Raft 层不感知。
      2. 应用层自己也会保存该快照,之后宕机重启后会先加载该快照。此时(宕机重启后) Raft 层也要记得更新自己的 commitIndexlastApplied
    2. 对 Raft 层意味着:
      1. 应用层告诉 Raft 层,你可以把 index 及以前的日志给释放掉了。
      2. Raft 层要保存下 Snapshot,万一其变为 Leader 之后需要给 Follower 发。
      3. 保存分为在内存中保存和持久化到外存。
  6. 需要对原流程修改的地方?
    1. Leader 发送 entries 的时候,要先检查要发送的 PrevLogEntry 还在不在,如果不在了,需要先发 snapshot。
  7. 要实现一个基于逻辑 idx 的取 log 相关的子数据结构,不然会存在大量下标翻译的过程?
    1. 这也就是我们为什么要在第一小节重新实现 RaftLog 的原因。
  8. 宕机重启后, lastApplied 怎么初始化?
    1. 如果有 snapshot 存在:则需要初始化为 snapshot 的 lastIncludedIndex。因为应用层肯定也是从自己 snapshot 中来恢复的。
      1. 在 Make 阶段同步的再 apply 一个 snapshot 到 appplyCh 中有可能会直接 block 住。
      2. 即使不阻塞,有时候也会遇到 snapshot decode error 报错。
    2. 如果没有 snapshot 存在:那就初始化为 0 。
    3. 如果初始化为 snapLastIndex,那前面的需要 apply 吗?
      1. 不需要,因为应用层自己会从自己保存的 snapshot 中恢复。
  9. 我之前遇到一个 snapshot 持久化的问题
    1. 每次调用 PersistLocked 的时候把通过设置 snapshot=nil 在不需要持久化 snapshot 时把 snapshot 清空了。这样在重启时,就得不到 snapshot。
    2. 因此:
      1. 收到 snapshot 先保存到内存中(也就是我们的 RaftLog 中)
      2. 每次持久化时,一定要读取内存中的最新 snapshot 进行持久化。
  10. 注意处理 installSnapshot 和 applyLoop 对 applyCh 的并发问题
    1. 尽量将所有 apply (包括 apply log 和 apply snapshot)都放到 apply 线程里
  11. 将 snapshot 相关的也收到 log 中去
    1. 这也是我们为什么单独实现 RaftLog 的原因。
    2. 因为 RaftLog 本质上就包括 snapshottailLog 紧密相关的两部分。
    3. 也可以看出,一个合理的抽象,可以让实现理解丝滑很多。
  12. 在 read 不出 snapshot 或者 log 时,也要对 rf.log 初始化?
    1. 是先要对rf.log初始化,然后通过 Persist 读取之前持久化的内容,如果有的话,就对 rf.log 再更新。

小结

总结来说,在拿到 PartD 的需求时,开始最困惑的就是,snapshot 在应用层和 Raft 层的关系以及和普通日志一块进行 apply 的顺序和含义问题。

弄清了上述两点,再理清了“快照数据流”,并且用图画出来之后,一切就很清晰了。这样也就自然的导出——我们似乎要把日志进行重构,如是就有了自然地想法将 snapshot 和尾部的日志,封装到一块。

所以可以看出,思考顺序和我们的课程行文顺序是相反的。这也是我们常说的,看别人的做法,看起来很合理,但不知道怎么来的——因为大部分都是以思考过、碰壁过后,逆序的呈现的。