PartB 需要定义日志格式,然后在 PartA 心跳逻辑的基础上,补全日志同步的逻辑。总体上来说,Leader 需要维护一个各个 Peer 的进度视图(nextIndexmatchIndex 数组)。其中 nextIndex 用于进行日志同步时匹配点试探matchIndex 用于日志同步成功后匹配点记录。依据全局匹配点分布,我们可以计算出当前全局的 commitIndex,然后再通过之后轮次的日志复制 RPC 下发给各个 Follower。

每个 Follower 收到 commitIndex 之后,再去 apply 本地的已提交日志到状态机。但这个 apply 的流程,我们留到之后一章来专门做,本章就暂时留待 TODO 了。

因此,本章就只实现逻辑:匹配点的试探匹配后的更新

结构体完善

AppendEntries RPC 结构体

根据 ApplyMsg 所需字段,来定义 LogEntry 。然后在此基础上,依照 Raft 论文中的[图 2]来补全 RPC 涉及到的结构体:AppendEntriesArgsAppendEntriesReply 并不需要添加额外字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// add log entries according to the ApplyMsg struct
type LogEntry struct {
Term int
CommandValid bool
Command interface{}
}

// add the fields about log:
// PrevLogIndex and PrevLogTerm is used to match the log prefix
// Entries is used to append when matched
// LeaderCommit tells the follower to update its own commitIndex
type AppendEntriesArgs struct {
Term int
LeaderId int

PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
}

type AppendEntriesReply struct {
Term int
Success bool
}

Raft 结构体

定义了 LogEntry 之后,完善下 raft struct 中相关字段:

1
2
3
4
5
6
7
// log in Peer's local
log []LogEntry

// only used when it is Leader,
// log view for each peer
nextIndex []int
matchIndex []int

Make 函数也要给上述字段进行初始化:

1
2
3
4
5
6
7
8
9
10
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
// ......
rf.log = append(rf.log, LogEntry{})

rf.matchIndex = make([]int, len(rf.peers))
rf.nextIndex = make([]int, len(rf.peers))

// ......
}

RPC 接收方

RPC 接收方,即 Leader 以外的其他 Peer。在考虑日志之后,要增加以下逻辑:

  1. 如果 prevLog 不匹配,则返回 Success = False
  2. 如果 prevLog 匹配,则将参数中的 Entries 追加到本地日志,返回 Success = True

所谓日志匹配:就是相同 Index 的地方,Term 相同;即 index 和 term 能唯一确定一条日志,这是因为,Raft 算法保证一个 Term 中最多有(也可能没有)一个 Leader,然后只有该 Leader 能确定日志顺序且同步日志。这样一来,Term 单调递增,每个 Term 只有一个 Leader,则该 Leader 能唯一确定该 Term 内的日志顺序。

此外,之前纯心跳逻辑(心跳逻辑和日志复制逻辑共用一个 RPC)只负责压制其他 Peer 发起选举,因此不用给 reply 返回信息。但此时,就需要用到 reply 了。

注:之后代码块中,所有加粗代码为新增代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// For debug
LOG(rf.me, rf.currentTerm, DDebug, "<- S%d, Receive log, Prev=[%d]T%d, Len()=%d", args.LeaderId, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))
// replay initialized
reply.Term = rf.currentTerm
reply.Success = false

// align the term
if args.Term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Higher term, T%d<T%d", args.LeaderId, args.Term, rf.currentTerm)
return
}
if args.Term >= rf.currentTerm {
rf.becomeFollowerLocked(args.Term)
}

// return failure if the previous log not matched
if args.PrevLogIndex >= len(rf.log) {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Follower log too short, Len:%d <= Prev:%d", args.LeaderId, len(rf.log), args.PrevLogIndex)
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Prev log not match, [%d]: T%d != T%d", args.LeaderId, args.PrevLogIndex, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm)
return
}

// append the leader logs to local
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
LOG(rf.me, rf.currentTerm, DLog2, "Follower append logs: (%d, %d]", args.PrevLogIndex, args.PrevLogIndex+len(args.Entries))
reply.Success = true

// TODO: handle the args.LeaderCommit

// reset the election timer, promising not start election in some interval
rf.resetElectionTimerLocked()
}

好奇的你可能会问为什么不管 args.LeaderCommit ,这部分自然要管,但我们将其留到之后 applyLoop 一节中(TODO:需要补充一个小节的链接)。这是因为,就逻辑的亲和性上来讲,commitIndex 是为最终的日志 apply 而服务的。但为了备忘,我们可以在代码注释中加一个 TODO 注释,这也是工程实践中常用的办法。

RPC 发送方

对于日志复制 RPC 发送方来说,需要增加两部分逻辑:

  1. 每个 RPC 发送前的参数构造
  2. 每个 RPC 收到返回值后:
    1. 如果复制成功,则看看是否可以更新 Leader 的 commitIndex (也留到之后实现,TODO,补充小节链接)
    2. 如果复制失败,则需要将匹配点回退,继续试探。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (rf *Raft) startReplication(term int) bool {
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peer, args, reply)

rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed", peer)
return
}

// align the term
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}

// probe the lower index if the prev log not matched
if !reply.Success {
idx := rf.nextIndex[peer] - 1
term := rf.log[idx].Term
for idx > 0 && rf.log[idx].Term == term {
idx--
}
rf.nextIndex[peer] = idx + 1
LOG(rf.me, rf.currentTerm, DLog, "Log not matched in %d, Update next=%d", args.PrevLogIndex, rf.nextIndex[peer])
return
}

// update the match/next index if log appended successfully
rf.matchIndex[peer] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[peer] = rf.matchIndex[peer] + 1

// TODO: need compute the new commitIndex here,
// but we leave it to the other chapter
}

rf.mu.Lock()
defer rf.mu.Unlock()

if rf.contextLostLocked(Leader, term) {
LOG(rf.me, rf.currentTerm, DLog, "Lost Leader[%d] to %s[T%d]", term, rf.role, rf.currentTerm)
return false
}

for peer := 0; peer < len(rf.peers); peer++ {
if peer == rf.me {
// Don't forget to update Leader's matchIndex
rf.matchIndex[peer] = len(rf.log) - 1
rf.nextIndex[peer] = len(rf.log)
continue
}

prevIdx := rf.nextIndex[peer] - 1
prevTerm := rf.log[prevIdx].Term
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevIdx,
PrevLogTerm: prevTerm,
Entries: rf.log[prevIdx+1:],
LeaderCommit: rf.commitIndex,
}
LOG(rf.me, rf.currentTerm, DDebug, "-> S%d, Send log, Prev=[%d]T%d, Len()=%d", peer, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))
go replicateToPeer(peer, args)
}

return true
}

这部分的最终目的,就是要更新 matchIndex。进而依据所有 Peer 的 matchIndex 来算 commitIndex 。Leader 有了 commitIndex 之后,再将其下发给各个 Follower,指导其各自更新本地 commitIndex 进而 apply。

但细心的你可能会注意到一件事,matchIndexnextIndex 是什么时候初始化的?所以,我们要继续补上这两个字段的初始化逻辑。本质上来说,这两个字段是各个 Peer 中日志进度在 Leader 中的一个视图(view)。Leader 正是依据此视图来决定给各个 Peer 发送多少日志。也是依据此视图,Leader 可以计算全局的 commitIndex

因此,该视图只在 Leader 当选的 Term 中有用。故而,我们要在 Leader 一当选时,更新该视图,即 becomeLeaderLocked 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rf *Raft) becomeLeaderLocked() {
if rf.role != Candidate {
LOG(rf.me, rf.currentTerm, DError, "Only Candidate can become Leader")
return
}

LOG(rf.me, rf.currentTerm, DLeader, "Become Leader in T%d", rf.currentTerm)
rf.role = Leader
for peer := 0; peer < len(rf.peers); peer++ {
rf.nextIndex[peer] = len(rf.log)
rf.matchIndex[peer] = 0
}
}