Raft 通常是是一个多机、多线程的库(lib),因此会涉及大量并发编程知识。 由于在语言层面内置 goroutine 和 channel,Golang 大大简化了并发编程复杂度,这也是课程选择 golang 为编程语言的主要原因。

Goroutine

goroutine 是一种轻量级线程,是 go runtime 提供的一种用户态线程,因此也被称为协程。所谓轻量,就是相比操作系统线程,每个 goroutine 耗费的资源更少、goroutine 间切换耗费也更低(不会陷入内核态),因此可以有更高的并发度。因此,我们在 golang 中,可以相对随意的创建新的 goroutine。

但我们在决定是否使用 goroutine 时,仍然有一些原则和技巧可以遵循。原则上来说,使用 goroutine 的目的主要有两种:

  1. 提升计算性能:主要针对计算密集型任务,多开 goroutine,充分利用现代计算机多核性能。
  2. 旁路 IO 负载:主要针对有 IO 型任务的负载,比如 RPC 和文件 IO,将其放到单独 goroutine 中,避免影响主干工作流性能。

对于 raft 来说,主要后面一种情况:即将发送给每个 Peer 的 RPC 放进单独的 goroutine,从而避免阻塞主干 loop(包括 electionLoop 和 replcationLoop)。

难点

即便 golang 已经十分友好,作为新手,初次接触并发编程,仍会有诸多困惑之处。原因有很多:

语句失去相对顺序。编译阶段即使发生了指令重排,也仍然能保证单个线程内语句执行顺序书写顺序结果上的一致性。但是多线程间语句的相对顺序,就没有任何保证了。举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var a, b int

func f() {
a = 1
b = 2
}

func g() {
print(b)
print(a)
}

func main() {
go f()
g()
}

g() 最终可能会先输出 2 再输出 0 。而想让多线程间的代码执行保持某种顺序,就必须要使用一些通信手段,强化 happens-before 关系,在 golang 中,这些手段有:

  1. sync.Mutex
  2. chan
  3. sync.Cond

内存模型难理解

TODO

用锁

锁的原理

锁主要用在两种情况下,可见性原子性

可见性让一个线程对共享变量的修改,能及时为其他线程看到。这主要是因为,线程修改变量时会首先写到自己线程中的缓存中,而我们不能决定缓存数据会什么时候同步到主存中。使用锁,可以让修改立即同步到主存。

1
2
3
4
5
func (rf *Raft) GetState() (int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.currentTerm, rf.role == Leader
}

上面代码就是保证其他线程在取用 term 和 role 的时候,能及时将其他线程的修改结果同步过来。

原子性将一段代码进行打包执行,执行时,不会乱入其他代码。这样能保证一个逻辑单元正确的执行。

1
2
3
4
5
6
7
func (b *Bank) transfer() {
b.mu.Lock();
defer b.mu.Unlock()

b.account1 -= 50;
b.account2 += 50;
}

最典型的就是银行转账场景,如果还有个线程在动 b.account1 ,比如正在计算发放的利息,如果和本函数交错执行,就会产生错误结果。这种保证和数据库中的事务对原子性的保证在理念上是一样的。

用锁技巧

但在编程中使用锁时,要想用对锁,又是另外一回事。这需要一些实战技巧:

  1. 空间上:锁的保护范围
  2. 时间上:锁的释放时机

保护范围

锁的保护范围,即锁所要保护的共享变量集。只有明确了锁保护哪些共享变量,我们才能在访问、修改这些共享变量时加对锁。锁的保护范围越大,实现就越容易,但性能就越差;锁的保护粒度越细,性能就会变好(当然过犹不及,锁太多性能也并不见得好,锁本身也会消耗很多资源),但用对就越难(很容易发生死锁)。

因此,用锁的一个基本策略是——渐进式加锁。即,对于一个类来说,先用一把大锁保护所有需要共享的字段,保证逻辑正确。在实现正确之后,定位性能瓶颈,按需拆锁——让每个锁只保护少量字段。对于我们的 raft 实验来说,整个 Raft 类使用一把大锁足以。但在工业界中的 raft 实现,会用多把锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// A Go object implementing a single Raft peer.
type Raft struct {
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

mu sync.Mutex
// fields below should be persisted
currentTerm int
votedFor int
log []LogEntry

// control apply progress
commitIndex int
lastApplied int
applyCond *sync.Cond
applyCh chan ApplyMsg

// only for leaders
nextIndex []int // guess when initialize
matchIndex []int // truth from the rpc

// timer and role
electTimerStart time.Time
role Role
}

另外一个小技巧就是,在写代码时,将所有锁需要保护的字段置于锁之后,所有不需要保护的字段置于锁之前,如上图。但也有例外,比如 applyCh 就是线程安全的,本不需要锁保护,但为了保证字段间的亲和性(这是保持代码块清楚的一个重原则),还是把它放到了和其他控制 apply 的字段一块。

释放时机

最简单的加锁方式,就是在所有用到共享变量的函数中,全函数加锁。但这会性能很差,尤其是遇到一些长耗时函数,如果一直不释放锁,其他线程就不能获取锁而执行,使得使用多线程丧失了意义。

于是我们的思路可以改为,一开始全函数上锁,然后寻找函数中的长耗时区域,及时释放锁,耗时区域执行完之后再加回来。这些耗时区域包括:

  1. IO:包括本机磁盘 IO 和网络 IO
  2. Channel:尤其是阻塞 channel

具体到 raft 中有:

  1. 网络IO:发送 RequestVote、AppendEntries 的 PRC 请求时
  2. 本机IO:读写文件时,比如写日志到外存
  3. Channel:往 Apply Channel 中发送数据

当然,最主要的就是发送两个 RPC ,进行选举要票和日志同步时,一定要注意释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ----- section 1: construct args-----
rf.mu.Lock()
nextIndex := rf.nextIndex[server]
args := &AppendEntriesArgs{
Term: leaderTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.log[nextIndex-1].Term,
Entries: rf.log[nextIndex:],
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()

// ----- section 2: send rpc --------
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if !ok { // the network or peer goes wrong
return
}

// ----- section 3: handle reply------
rf.mu.Lock()
defer rf.mu.Unlock()
// handle reply

如上面代码,将发送 RPC 请求分成三个部分:

  1. 构造请求参数 AppendEntriesArgs
  2. 发送 RPC 请求
  3. 处理请求回复 AppendEntriesReply

代码块 1 和 3 分别上锁,在真正发送 RPC 请求时释放锁,因为其耗时可能会非常长。RequestVote RPC 发送时也类似,这里不再赘述。

命名技巧

如果一个函数需要在持有锁时才能调用,最好要在命名中有所体现,而不能只写在注释里。如使用 Locked 后缀:

1
2
3
func (rf *Raft) becomeFollowerLocked(term int)
func (rf *Raft) becomeCandidateLocked()
func (rf *Raft) becomeLeaderLocked(term int)

漏锁检测

golang 还在语言层面提供了对数据竞态( data race )进行检测的选项 -race,如 go test -run 2B -race 。该选项可以帮忙识别出共享变量漏在访问或者修改时漏加锁的情况。在出现数据竞态时,会报出出现竞态的相关 goroutine 的调用栈,包括创建线程、读取线程和修改线程,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
WARNING: DATA RACE
Write at 0x00c0004cb088 by goroutine 1459:
6.5840/raft.(*Raft).becomeCandidateLocked()
/Users/qtmuniao/code/6.5840/src/raft/raft.go:233 +0x27c
6.5840/raft.(*Raft).electionTicker()
/Users/qtmuniao/code/6.5840/src/raft/raft_election.go:180 +0x2c4
6.5840/raft.Make.func1()
/Users/qtmuniao/code/6.5840/src/raft/raft.go:356 +0x34

Previous read at 0x00c0004cb088 by goroutine 1484:
6.5840/raft.(*Raft).replicateTicker()
/Users/qtmuniao/code/6.5840/src/raft/raft_replication.go:209 +0xa0
6.5840/raft.(*Raft).startElection.func1.2()
/Users/qtmuniao/code/6.5840/src/raft/raft_election.go:140 +0x40

Goroutine 1459 (running) created at:
6.5840/raft.Make()
/Users/qtmuniao/code/6.5840/src/raft/raft.go:356 +0x578
6.5840/raft.(*config).start1()
/Users/qtmuniao/code/6.5840/src/raft/config.go:322 +0x778
6.5840/raft.make_config()
/Users/qtmuniao/code/6.5840/src/raft/config.go:101 +0x76c
6.5840/raft.TestFailNoAgree2B()
/Users/qtmuniao/code/6.5840/src/raft/test_test.go:305 +0x3c
testing.tRunner()
/opt/homebrew/Cellar/go/1.20.6/libexec/src/testing/testing.go:1576 +0x188
testing.(*T).Run.func1()
/opt/homebrew/Cellar/go/1.20.6/libexec/src/testing/testing.go:1629 +0x40

Goroutine 1484 (finished) created at:
6.5840/raft.(*Raft).startElection.func1()
/Users/qtmuniao/code/6.5840/src/raft/raft_election.go:140 +0x3ec
6.5840/raft.(*Raft).startElection.func3()
/Users/qtmuniao/code/6.5840/src/raft/raft_election.go:165 +0x54

通过该选项检测,我发现我容易在几个地方漏加锁:

  1. 访问共享变量的日志中
  2. 使用共享变量的判断条件里
  3. 只有一条返回共享变量语句的函数中

其他

锁是最常用的多线程同步手段。在 raft 中,除了锁之外,还会涉及其他几种同步方法,包括 channelsync.Cond

Channel

channel 本质上是一个线程安全的消息队列,将 channel 作为语言内置类型来实现,是 golang 语言的一大特点。我们知道线程间通信有两大流派:共享内存(通过内存)和消息传递(通过消息队列)。golang 推荐第二个流派,原因是更安全,因此直接将 channel 作为内置类型。

但当然,我们的 raft 实现还是主要使用共享内存的方式来通信,因为更简单。本 raft 实验中用到 channel 的地方主要是 applyChannel,即当日志被大多数节点提交时,就通过 channel 传给 raft 使用方(后面的 KVStore)。使用方再决定如何对日志进行应用。

1
2
3
4
5
6
7
for _, applyMsg := range messages {
rf.applyCh <- *applyMsg // maybe block, should release lock

rf.mu.Lock()
rf.lastApplied++
rf.mu.Unlock()
}

sync.Cond

Golang 中 Sync.Cond 就是操作系统中信号量源语 —— wait signal 的一个实现。用于多线程间同步:比如某个线程 P1 做完了准备工作,通知另外的线程 P2 做正式处理。则 P2 必须等待 P1 完成再执行,而不能随意执行。此时,就可以使用 sync.Cond

需要注意的是,sync.Cond 必须和 sync.Mutex 配合使用,即每个 sync.Cond 都要在构造的时候绑定一个 sync.Mutex

1
rf.applyCond = sync.NewCond(&rf.mu)

这是因为 Wait()Signal() 函数必须在锁保护下的临界区中执行。其语义是:

  1. Wait() 会阻塞 goroutine 执行,并释放当前持有的锁。
  2. Signal() 会唤醒阻塞在 Wait() 上的 goroutine。

由于阻塞在 Wait() 上的 goroutine 被唤醒后会自动重新获取锁,这就要求调用完 Signal() 的 goroutine 要立即释放锁,否则可能形成死锁。在 raft 中,主要用于在 rf.commitIndex 被更新后,提醒 apply goroutine 去执行 apply。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// wait in apply loop
func (rf *Raft) applyLoop() {
for !rf.killed() {
rf.mu.Lock()
rf.applyCond.Wait()

// do the apply
}
}

// signal in the leader
// in AppendEntriesReply handling
if n > rf.commitIndex && rf.log[n].Term == rf.currentTerm {
rf.commitIndex = n
rf.applyCond.Signal()
}

// singal in the follower
// in AppendEntries callback
if args.LeaderCommit > rf.commitIndex {
targetIndex := MinInt(args.LeaderCommit, len(rf.log)-1)
rf.commitIndex = targetIndex
rf.applyCond.Signal()
}

参考资料

  1. The Go Memory Model:https://go.dev/ref/mem
  2. Raft Locking Advice:https://pdos.csail.mit.edu/6.824/labs/raft-locking.txt