PartC 最主要的逻辑包括两部分:

  1. 实现 Raft 的序列化和反序列化函数
  2. 将上述函数插入到代码流程中合适位置

但由于测试时加入了不同 Peer 的宕机重启,对我们之前的选主逻辑和复制逻辑的正确性性能又提出了更高的要求。也就是说,同样的代码,即使能过 PartB 的测试,也不一定能过 PartC 的测试。

因此,我们本节的大部分时间,反而会花在于对之前代码逻辑的性能提升查漏补缺上。

逻辑实现

序列化和反序列化

为了使代码模块清晰,我们将这部分也单独拆出一个文件,起名为 raft_persistence.go,主要包括对关键字段的序列化和反序列化两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (rf *Raft) persistLocked() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
raftstate := w.Bytes()
// leave the second parameter nil, will use it in PartD
rf.persister.Save(raftstate, nil)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
return
}

var currentTerm int
var votedFor int
var log []LogEntry

r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
if err := d.Decode(&currentTerm); err != nil {
LOG(rf.me, rf.currentTerm, DPersist, "Read currentTerm error: %v", err)
return
}
rf.currentTerm = currentTerm

if err := d.Decode(&votedFor); err != nil {
LOG(rf.me, rf.currentTerm, DPersist, "Read votedFor error: %v", err)
return
}
rf.votedFor = votedFor

if err := d.Decode(&log); err != nil {
LOG(rf.me, rf.currentTerm, DPersist, "Read log error: %v", err)
return
}
rf.log = log
LOG(rf.me, rf.currentTerm, DPersist, "Read Persist %v", rf.stateString())
}

根据[图 2]我们可以看出,在 Raft 结构体内,需要持久化的主要有三个字段:currentTermvotedForlog。这三者需要持久化的大致原因如下:

  1. currentTerm:重启后一定要知道自己之前任期到了哪里,因为任期是状态机中一切正确行为的基础。
  2. votedFor:如果我们在某个任期已经投过票了,重启之后显然不能再投票,否则就会一票多投,是作弊了。
  3. log:日志是自然要持久化的。否则只要集群内的多数机器重启几次,已经提交的日志也会被丢失。

另外, persist 函数加上 -Locked 后缀的原因在于,上面三个字段都是全局变量,因此需要在临界区调用,不然会出现数据竞态(data race)。而 readPersist 不加 -Locked 的原因在于,它只有在构造 Raft Peer 的时候才会被调用,而此时在一个 Raft struct 内部是不会有多线程的,因此不需要担心并发问题。

函数调用

readPersist

readPersist 函数只有在重启加载时才需要调用,也就是在构造 Raft 实例时。但需要注意的是,一定要在所有字段初始化完成后,再调用该函数。

如果是先调用 readPersist 再去初始化,就会把 readPersist 读出来的值给覆盖掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// initialization code
rf.role = Follower
rf.currentTerm = 1 // leave 0 to invalid
rf.votedFor = -1

// a dummy entry to aovid lots of corner checks
rf.log = append(rf.log, LogEntry{Term: InvalidTerm})

// initialize the leader's view slice
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))

// initialize the fields used for apply
rf.applyCh = applyCh
rf.applyCond = sync.NewCond(&rf.mu)
rf.commitIndex = 0
rf.lastApplied = 0

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

// start election ticker and apply ticker
go rf.electionTicker()
go rf.applicationTicker()

return rf
}

persistLocked

readPersist 函数的调用整体较为简单,persistLocked 函数相对较复杂,很容易遗漏一些地方。一个简单的办法就是在代码中搜索出三个成员变量:currentTermvotedForlog 被赋值修改的所有地方,然后逐个修改在其后增加 persistLocked 语句即可。

一共有五处需要增加。

raft.go 中三处:

在 Leader 接受应用层命令的接口 Start 函数中,要将 command 包裹为日志,然后追加到本地日志中,即会修改 rf.log ,因此要调用 persist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()

if rf.role != Leader {
return 0, 0, false
}
rf.log = append(rf.log, LogEntry{
CommandValid: true,
Command: command,
Term: rf.currentTerm,
})
rf.persistLocked()

return len(rf.log) - 1, rf.currentTerm, true
}

在状态机相关函数中,becomeFollowerbecomeCandidate 都有可能会涉及 termvotedFor 的更新,因此要加:

  1. becomeFollower:注意该函数中,term 是有可能不变的。在 term 不变时,并不需要 persist——因为 term 不变,votedFor 一定不会被重新赋值。
  2. becomeCandidate:变 Candidate,一定会自增 term,同时投自己的票。因此肯需要调用 persist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rf *Raft) becomeFollowerLocked(term int) {
if term < rf.currentTerm {
return
}
rf.role = Follower

shouldPersist := term != rf.currentTerm
if term > rf.currentTerm {
rf.votedFor = -1
}
rf.currentTerm = term
if shouldPersist {
rf.persistLocked()
}
}

func (rf *Raft) becomeCandidateLocked() {
if rf.role == Leader {
return
}

rf.currentTerm++
rf.role = Candidate
rf.votedFor = rf.me
rf.persistLocked()
}

raft_election.go 中一处:

在要票的回调函数 RequestVote 中,如果 Peer 投出票,则会修改 votedFor,因此需要调用 persist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (PartA, PartB).

rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
reply.VoteGranted = false
// align the term
// ...... some code omitted

// check for votedFor
// ...... some code omitted

// check if candidate's last log is more up to date
if rf.isMoreUpToDateLocked(args.LastLogIndex, args.LastLogTerm) {
return
}

reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.persistLocked()
rf.resetElectionTimerLocked()
}

raft_replication.go 中有一处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
reply.Success = false

// align the term
// ...... some code omitted

// will be explained in the next section
defer func() {
rf.resetElectionTimerLocked()
}()

// return failure if prevLog not matched
if args.PrevLogIndex >= len(rf.log) {
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
return
}

// append the leader log entries to local
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
rf.persistLocked()
reply.Success = true

// hanle LeaderCommit
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = args.LeaderCommit
rf.applyCond.Signal()
}
}

优化及修错

我们知道 Leader 在给 Follower 同步日志时,由于采用乐观+回撤(01.日志同步)的做法,会经历一个匹配探测期,在 PartA、PartB 的测试中,并没有在这一点上为难我们。但在 PartC 中,由于加大了日志量、引入了各种随机重启,导致这个匹配探测期耗时可能会非常长。如果我们实现的效率很低,不仅会导致在规定的时间内提交不了应用层想要提交的日志,还会引发之前实现没有注意的问题。让我们一个个来说。

日志回溯优化

由于 Leader 在向 Follower 同步日志时,采用的是乐观+回撤(01.日志同步)的做法,而每次回撤后都需要下一个 RPC 才能发给 Follower,而我们的 AppendEntries RPC 是周期性的,也就是说每次回撤都需要一个 replicateInterval

如果回撤的算法不够好,在某些情况下,就会导致匹配探测期时间特别长。测试证明,在 Leader 只基于自己的日志进行回撤的情况下,一次不管是回退一个 index 还是回退一个 term,效果都不够好。那还能怎样优化呢?

让 Follower 给点信息——告诉 Leader 自己日志大致到哪里了!

于是我们给 AppendEntriesReply 增加两个额外的字段,以携带一些 Follower 和 Leader 冲突日志的信息。

1
2
3
4
5
6
7
type AppendEntriesReply struct {
Term int
Success bool

ConfilictIndex int
ConfilictTerm int
}

Follower 端大概算法如下:

  1. 如果 Follower 日志过短,则ConfilictTerm 置空, ConfilictIndex = len(rf.log)
  2. 否则,将 ConfilictTerm 设置为 Follower 在 Leader.PrevLogIndex 处日志的 term;ConfilictIndex 设置为 ConfilictTerm 的第一条日志。

第一条做法的目的在于,如果 Follower 日志过短,可以提示 Leader 迅速回退到 Follower 日志的末尾,而不用傻傻的一个个 index 或者 term 往前试探。

第二条的目的在于,如果 Follower 存在 Leader.PrevLog ,但不匹配,则将对应 term 的日志全部跳过。

1
2
3
4
5
6
7
8
9
10
11
12
13
// --- rf.AppendEntries in raft_replication.go

// return failure if prevLog not matched
if args.PrevLogIndex >= len(rf.log) {
reply.ConfilictIndex = len(rf.log)
reply.ConfilictTerm = InvalidTerm
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.ConfilictTerm = rf.log[args.PrevLogIndex].Term
reply.ConfilictIndex = rf.firstIndexFor(reply.ConfilictTerm)
return
}

Leader 端使用上面两个新增字段的算法如下:

  1. 如果 ConfilictTerm 为空,说明 Follower 日志太短,直接将 nextIndex 赋值为 ConfilictIndex 迅速回退到 Follower 日志末尾**。**
  2. 否则,以 Leader 日志为准,跳过 ConfilictTerm 的所有日志;如果发现 Leader 日志中不存在 ConfilictTerm 的任何日志,则以 Follower 为准跳过 ConflictTerm,即使用 ConfilictIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// --- rf.startReplication.replicateToPeer in raft_replication.go
if !reply.Success {
prevNext := rf.nextIndex[peer]
if reply.ConfilictTerm == InvalidTerm {
rf.nextIndex[peer] = reply.ConfilictIndex
} else {
firstTermIndex := rf.firstIndexFor(reply.ConfilictTerm)
if firstTermIndex != InvalidIndex {
rf.nextIndex[peer] = firstTermIndex + 1
} else {
rf.nextIndex[peer] = reply.ConfilictIndex
}
}
// avoid the late reply move the nextIndex forward again
rf.nextIndex[peer] = MinInt(prevNext, rf.nextIndex[peer])
return
}

至于我们如何表示空 term 和空 index 呢?

  1. 空 term:可以在 make Raft 时让 term 从 1 开始,则 0 就空了出来,可以用来表示空 term,在代码里叫 InvalidTerm
  2. 空 index:还记得我们在 rf.log 起始加了一个空 entry 吗?由于这个小技巧,我们的有效日志也是永远从 1 开始,0 就可以用来标识空 index,代码中叫 InvalidIndex
1
2
3
4
5
// --- in raft.go
const (
InvalidIndex int = 0
InvalidTerm int = 0
)

为了让代码看着更易懂,我们封装了一个在日志数组中找指定 term 第一条日志的函数rf.firstLogFor

1
2
3
4
5
6
7
8
9
10
11
// --- in raft.go
func (rf *Raft) firstLogFor(term int) int {
for i, entry := range rf.log {
if entry.Term == term {
return i
} else if entry.Term > term {
break
}
}
return InvalidIndex
}

由于从前往后,日志的 term 是一段段的单调递增的,则从前往后找,找到第一个满足 term 的日志,就可以返回。如果相关 term 不存在,则返回 InvalidIndex

隐藏 bug 修复

我们在 Raft PartB 最后一章(13逻辑自查)提到过,有些问题在 PartA、PartB 测试中暴露不出来,但在 PartC 中就会暴露出来。这是因为 PartC 中的测试加大了日志量,加入了随机宕机重启,使得整个环境更加严苛,因此对 Raft 实现的正确性要求更高。这种错误,主要有两处。而这两处错误,也是因为上文提到的 PartC 测试特意引入的 Leader 日志同步时很长的匹配探测期所带来的。

其一,在收到 AppendEntries RPC 时,无论 Follower 接受还是拒绝日志,只要认可对方是 Leader 就要重置时钟。但在我们之前的实现,只有接受日志才会重置时钟。这是不对的,如果 Leader 和 Follower 匹配日志所花时间特别长,Follower 一直不重置选举时钟,就有可能错误的选举超时触发选举。

这里我们可以用一个 defer 函数来在合适位置之后来无论如何都要重置时钟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// --- in raft_replication.go
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
reply.Success = false

// align the term
// ...... some code omitted

// After we align the term, we accept the args.LeaderId as
// Leader, then we must reset election timer wheter we
// accept the log or not
defer rf.resetElectionTimerLocked()


// return failure if prevLog not matched
if args.PrevLogIndex >= len(rf.log) {
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
// if we truncate log here, should persist
rf.log = rf.log[0:args.PrevLogIndex]
rf.persistLocked()
return
}

// append the leader log entries to local
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
rf.persistLocked()
reply.Success = true

// hanle LeaderCommit
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = args.LeaderCommit
rf.applyCond.Signal()
}
}

其二,匹配探测期比较长时,会有多个探测的 RPC,如果 RPC 结果乱序回来:一个先发出去的探测 RPC 后回来了,其中所携带的 ConfilictTermConfilictIndex 就有可能造成 rf.next 的“反复横跳”。为此,我们可以强制 rf.next 单调递减:

1
2
3
4
5
6
7
8
9
10
11
12
// --- rf.startReplication in raft_replication.go
if !reply.Success {
prevIndex := rf.nextIndex[peer]

// some code omitted....

// avoid the late reply move the nextIndex forward again
if rf.nextIndex[peer] > prevIndex {
rf.nextIndex[peer] = prevIndex
}
return
}