PartA 只需要实现不带日志的心跳。

需要说明的是:在论文中心跳(不带日志)和日志复制是用的一个 RPC,毕竟他们在逻辑上唯一的区别就是带不带日志。但在工程实践中,为了提升性能,有的 Raft 实现会将其进行分开。我们的课程里,为了保持简洁,也就实现到一块了。因为对于初学者来说,简洁比性能重要

和选举逻辑相对,我们也分三个层次来实现 RPC 发送方

  1. 心跳 Loop:在当选 Leader 后起一个后台线程,等间隔的发送心跳/复制日志,称为 replicationTicker
  2. 单轮心跳:对除自己外的所有 Peer 发送一个心跳 RPC,称为 startReplication
  3. 单次 RPC:对某个 Peer 来发送心跳,并且处理 RPC 返回值,称为 replicateToPeer

当然,还有 RPC 接收方回调函数的逻辑。

心跳(日志复制)逻辑和选举逻辑实现层次一致、命名风格一致的好处在于,可以减少心智负担,方便调试和维护。

心跳 Loop

由于不用构造随机超时间隔,心跳 Loop 会比选举 Loop 简单很多:

1
2
3
4
5
6
7
8
9
10
func (rf *Raft) replicationTicker(term int) {
for !rf.killed() {
ok := rf.startReplication(term)
if !ok {
return
}

time.Sleep(replicateInterval)
}
}

与选举 Loop 不同的是,这里的 startReplication 有个返回值,主要是检测“上下文(05. Raft PartA 选举逻辑上下文检查)”是否还在( ContextLost )——一旦发现 Raft Peer 已经不是这个 term 的 Leader 了,就立即退出 Loop。

单轮心跳

和 Candidate 的选举逻辑类似,Leader 会给除自己外的所有其他 Peer 发送心跳。在发送前要检测“上下文(05. Raft PartA 选举逻辑上下文检查)是否还在,如果不在了,就直接返回 false ——告诉外层循环 replicationTicker 可以终止循环了。

因此 startReplication 的返回值含义为:是否成功的发起了一轮心跳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (rf *Raft) startReplication(term int) bool {
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
// send heartbeat RPC and handle the reply
}

rf.mu.Lock()
defer rf.mu.Unlock()
if rf.contextLostLocked(Leader, term) {
LOG(rf.me, rf.currentTerm, DLeader, "Leader[T%d] -> %s[T%d]", term, rf.role, rf.currentTerm)
return false
}

for peer := 0; peer < len(rf.peers); peer++ {
if peer == rf.me {
continue
}

args := &AppendEntriesArgs{
Term: term,
LeaderId: rf.me,
}

go replicateToPeer(peer, args)
}

return true
}

单次 RPC

在不关心日志时,心跳的返回值处理比较简单,只需要对齐下 term 就行。如果后续还要进行其他处理,则还要检查 context 是否丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
replicateToPeer := func(peer int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(peer, args, reply)

rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
LOG(rf.me, rf.currentTerm, DLog, "-> S%d, Lost or crashed", peer)
return
}
// align the term
if reply.Term > rf.currentTerm {
rf.becomeFollowerLocked(reply.Term)
return
}
}

回调函数

心跳接收方在收到心跳时,只要 Leader 的 term 不小于自己,就对其进行认可,变为 Follower,并重置选举时钟,承诺一段时间内不发起选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
reply.Success = false
// align the term
if args.Term < rf.currentTerm {
LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject log", args.LeaderId)
return
}
if args.Term >= rf.currentTerm {
rf.becomeFollowerLocked(args.Term)
}

// reset the timer
rf.resetElectionTimerLocked()
reply.Success = true
}