要支持 Compaction,需要对日志进行诸多改造。干脆,我们将相关逻辑封装到一个 struct 中。

于是我们新创建一个 RaftLog 的结构体,以支持:

  1. 在 index 处进行 Snapshot:将 index 以及之前的日志阶段掉
  2. 基本读写操作:
    1. 读取:在 Snapshot 存在时,需要做下标转换,但在没有 Snapshot 时又不用,需要想办法进行统一。
    2. 追加:包括在末尾追加(用于应用层给 Leader 追加日志)、在给定下标处覆盖追加(用于 Leader 覆写 Follower 日志)等等。

当然,以上需求是在有一个基本想法之后,再去看其他所有使用到 RaftLog 的代码,然后总结出来的。但为了行文方便,我直接放在了开始,所以乍一看会有些突兀。

RaftLog 实现

结构体字段

我们新建一个文件,起名为 raft_log.go ,然后定义一个 RaftLog 的结构体。主要包含三部分:

  1. 前面日志截断后 compact 成的 snapshot
  2. 后面的剩余日志 tailLog
  3. 两者的分界线,也就是上一节要求中提到的:lastIncludeTerm/lastIncludeIndex,这里我们给他加上 snap 前缀,可以更直观一些,知道是 snapshot 最后的下标和任期,即 snapLastIdx / snapLastTerm

这里有个巧妙的设计点,可以避免边界判断、一致化下标转换。即,将 tailLog 中下标为 0 (对应 snapLastIdx)的日志留空,但给其的 Term 字段赋值 snapLastTerm,真正的下标从 1 (对应 snapLastIdx+1)开始。

这样做有什么好处呢?更具体来说就是:

  1. 边界判断:在进行日志复制时,需要取所发送日志的 prevLogIndexprevLogTerm 时,可以避免特殊判断。
  2. 下标转换:所有的全局下标转到 tailLog 下标时,只需要减去 snapLastIdx 即可
1
2
3
4
5
6
7
8
9
10
type RaftLog struct {
snapLastIdx int
snapLastTerm int

// contains index [1, snapLastIdx]
snapshot []byte
// the first entry is `snapLastIdx`, but only contains the snapLastTerm
// the entries between (snapLastIdx, snapLastIdx+len(tailLog)-1] have real data
tailLog []LogEntry
}

换个角度理解,就是我们在 tailLog 的开始 mock 了一个假的 snapshot 的 lastIncludeEntry。且当 snapshot 为空时,该 lastIncludeEntry 的 index 和 term 都是 0(我们在[Raft PartC 实现和优化]定义的边界常量) 。

初始化

对 RaftLog 初始化,有两条路径:

  1. 构造函数:在通过 Make 构造 Raft 实例时使用,此时各个字段留默认值就行。
  2. 反序列化函数:读取宕机前(如果有)持久化的各个字段,对 RaftLog 进行填充。

且通常,两个是在 Make 函数中先后执行,因此构造函数其实可以留空,但下面我们为了将来的可扩展性仍然实现了,仅在调用时传空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// called as : rf.log = NewLog(InvalidIndex, InvalidTerm, nil, nil)
func NewLog(snapLastIdx, snapLastTerm int, snapshot []byte, entries []LogEntry) *RaftLog {
rl := &RaftLog{
snapLastIdx: snapLastIdx,
snapLastTerm: snapLastTerm,
snapshot: snapshot,
}

// make the len = 0, cap = 1 + len(entries)
rl.tailLog := make([]LogEntry, 0, 1 + len(entries))
rl.tailLog = append(rl.tailLog, LogEntry{
Term: snapLastTerm,
})
rl.tailLog = append(rl.tailLog, entries...)

return rl
}

// return detailed error for the caller to log
func (rl *RaftLog) readPersist(d *labgob.LabDecoder) error {
var lastIdx int
if err := d.Decode(&lastIdx); err != nil {
return fmt.Errorf("decode last include index failed")
}
rl.snapLastIdx = lastIdx

var lastTerm int
if err := d.Decode(&lastTerm); err != nil {
return fmt.Errorf("decode last include term failed")
}
rl.snapLastTerm = lastTerm

var log []LogEntry
if err := d.Decode(&log); err != nil {
return fmt.Errorf("decode tail log failed")
}
rl.tailLog = log

return nil
}

func (rl *RaftLog) persist(e *labgob.LabEncoder) {
e.Encode(rl.snapLastIdx)
e.Encode(rl.snapLastTerm)
e.Encode(rl.tailLog)
}

下标操作

在实现时,最容易弄混的就是下标操作,最主要的原因,就是要做下标转换。但好在我们上面 snapLastIdx 设定就是 tailLog[0] 的位置,因此从全局下标(logicIdx),转为 tailLog 局部下标,只需要和 snapLastIdx 做差值即可。反过来转换,只需要加上 snapLastIdx 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// the dummy log is counted
func (rl *RaftLog) size() int {
return rl.snapLastIdx + len(rl.tailLog)
}

// access the index `rl.snapLastIdx` is allowed, although it's not exist actually.
func (rl *RaftLog) idx(logicIdx int) int {
if logicIdx < rl.snapLastIdx || logicIdx >= rl.size() {
panic(fmt.Sprintf("%d is out of [%d, %d]", logicIdx, rl.snapLastIdx+1, rl.size()-1))
}
return logicIdx - rl.snapLastIdx
}

func (rl *RaftLog) at(logicIdx int) LogEntry {
return rl.tailLog[rl.idx(logicIdx)]
}

字符串函数

主要是为了打印供调试的日志使用,我们将其分成两个:

  1. 用以输出详细日志:进行深度追踪时用,频次要低,且通常用在 Debug 级别
  2. 用以输出粗略日志:较为简短,可以使用频次稍高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// more detailed
func (rl *RaftLog) String() string {
var terms string
prevTerm := rl.snapLastTerm
prevStart := rl.snapLastIdx
for i := 0; i < len(rl.tailLog); i++ {
if rl.tailLog[i].Term != prevTerm {
terms += fmt.Sprintf(" [%d, %d]T%d", prevStart, i-1, prevTerm)
prevTerm = rl.tailLog[i].Term
prevStart = i
}
}
terms += fmt.Sprintf("[%d, %d]T%d", prevStart, len(rl.tailLog)-1, prevTerm)
return terms
}

// more simplified
func (rl *RaftLog) Str() string {
lastIdx, lastTerm := rl.last()
return fmt.Sprintf("[%d]T%d~[%d]T%d", rl.snapLastIdx, rl.snapLastTerm, lastIdx, lastTerm)
}

其他模块用到

这些函数,都是通过逐一遍历其他 Raft 实现,包括选举(raft_election.go)、日志复制(raft_replication.go)、日志应用(raft_application.go)和公共逻辑 (raft.go)来逐一增补的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rl *RaftLog) last() (idx, term int) {
return rl.size() - 1, rl.tailLog[len(rl.tailLog)-1].Term
}

func (rl *RaftLog) tail(startIdx int) []LogEntry {
if startIdx >= rl.size() {
return nil
}
return rl.tailLog[rl.idx(startIdx):]
}

func (rl *RaftLog) firstLogFor(term int) int {
for idx, entry := range rl.tailLog {
if entry.Term == term {
return idx
} else if entry.Term > term {
break
}
}
return InvalidIndex
}

func (rl *RaftLog) append(e LogEntry) {
rl.tailLog = append(rl.tailLog, e)
}

func (rl *RaftLog) appendFrom(prevIdx int, entries []LogEntry) {
rl.tailLog = append(rl.tailLog[:rl.idx(prevIdx)+1], entries...)
}

Snapshot 基本实现

本节就只实现基本的 Snapshot(index, snapshot) 函数即可。相关 RPC 留到下一节实现,PartD 的第一个测试 TestSnapshotBasicPartD 保证不会用到 InstallSnapshot RPC 相关的逻辑。

该函数的基本含义为:应用层在 index 处做了个快照,Raft 层帮我把该快照保存下,同时,index 以及之前的日志就可以释放掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// --- raft.go

// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (PartD).
rf.mu.Lock()
defer rf.mu.Unlock()
LOG(rf.me, rf.currentTerm, DSnap, "Snap on %d", index)

if index <= rf.log.snapLastIdx || index > rf.commitIndex {
LOG(rf.me, rf.currentTerm, DSnap, "Could not snapshot beyond [%d, %d]", rf.log.snapLastIdx+1, rf.commitIndex)
return
}

rf.log.doSnapshot(index, snapshot)
rf.persistLocked()
}

在进行日志截断的时候,注意要新建一个数组,而不要直接使用下标切片运算。只有新建数组并切换过去,才会真正的解掉对于原来数组的引用,保证 golang GC 可以将原来数组的空间释放掉。

另外要注意的是—— rl.idx() 函数会用到rl.snapLastIdx ,而我们本函数也要修改 rl.snapLastIdx,因此一定要注意他们使用和修改的先后顺序,否则会出现不符合预期的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// --- raft_log.go
func (rl *RaftLog) doSnapshot(index int, snapshot []byte) {
// since idx() will use rl.snapLastIdx, so we should keep it first
idx := rl.idx(index)

rl.snapLastTerm = rl.tailLog[idx].Term
rl.snapLastIdx = index
rl.snapshot = snapshot

// allocate a new slice
newLog := make([]LogEntry, 0, rl.size()-rl.snapLastIdx)
newLog = append(newLog, LogEntry{
Term: rl.snapLastTerm,
})
newLog = append(newLog, rl.tailLog[idx+1:]...)
rl.tailLog = newLog
}

测试

实现完 RaftLog 之后,我们要将所有 PartA~PartC 的测试跑过,保证我们的实现没有影响之前的逻辑。

从这个角度可以看出测试的重要性,他是保证我们代码“不变性”(invariant)的重要手段,有了这些单测,你在重构时才可以放心的说,尽管我改了很多,但是我的代码功能上没有问题——当然,这也依赖于测试的完备性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
$ echo 'PartA PartB PartC' | xargs -n1 go test -run 

Test (PartA): initial election ...
... Passed -- 3.0 3 88 25020 0
Test (PartA): election after network failure ...
... Passed -- 4.5 3 190 38737 0
Test (PartA): multiple elections ...
... Passed -- 5.6 7 941 193306 0
PASS
ok course/raft 13.309s
Test (PartB): basic agreement ...
... Passed -- 0.5 3 22 5616 3
Test (PartB): RPC byte count ...
... Passed -- 1.8 3 54 115708 11
Test (PartB): test progressive failure of followers ...
... Passed -- 4.4 3 176 37379 3
Test (PartB): test failure of leaders ...
... Passed -- 4.8 3 268 60323 3
Test (PartB): agreement after follower reconnects ...
... Passed -- 5.6 3 174 47871 8
Test (PartB): no agreement if too many followers disconnect ...
... Passed -- 3.3 5 303 62214 3
Test (PartB): concurrent Start()s ...
... Passed -- 0.7 3 22 5644 6
Test (PartB): rejoin of partitioned leader ...
... Passed -- 6.2 3 262 67089 4
Test (PartB): leader backs up quickly over incorrect follower logs ...
... Passed -- 20.6 5 2498 1949990 102
Test (PartB): RPC counts aren’t too high ...
... Passed -- 2.2 3 70 20424 12
PASS
ok course/raft 50.123s
Test (PartC): basic persistence ...
... Passed -- 5.0 3 172 46581 7
Test (PartC): more persistence ...
... Passed -- 14.5 5 1258 276538 16
Test (PartC): partitioned leader and one follower crash, leader restarts ...
... Passed -- 1.2 3 46 11891 4
Test (PartC): Figure 8 ...
... Passed -- 28.5 5 1576 328781 36
Test (PartC): unreliable agreement ...
... Passed -- 3.9 5 238 85079 246
Test (PartC): Figure 8 (unreliable) ...
... Passed -- 32.5 5 4412 5485489 446
Test (PartC): churn ...
... Passed -- 16.4 5 1350 1008253 321
Test (PartC): unreliable churn ...
... Passed -- 16.5 5 1323 808135 249
PASS
ok course/raft 118.544s

同时,要跑过 PartD 的第一个测试 TestSnapshotBasicPartD ,保证我们的 Snapshot 的基本逻辑没问题。

1
VERBOSE=0 go test -run TestSnapshotBasicPartD | tee out.txt