在前面的几个章节中,我们基本实现了分布式 KV 的大致逻辑,构建了一个高可用的分布式 KV 系统。

但是我们并没有使用前面在 raft 库中实现的 Snapshot 方法,这样一来,如果系统重启了,raft 需要加载全量的数据去恢复状态,如果节点中的数据量较大的话,这样会消耗较长的时间去加载。

所以我们可以利用 Snapshot 的功能,对 raft 日志进行压缩,降低日志存储的空间,减少 KVServer 集群在重启时的耗时。

在代码中,KVServer 维护了一个字段名为 maxraftstate,它会由使用者进行设置,表示的是允许的最大的持久化的 raft 的日志大小。

1
2
3
4
5
6
7
8
9
10
11
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()

maxraftstate int // snapshot if log grows this big

// Your definitions here.
}

这个值会在 KVServer 启动的时候被设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})

kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate

// You may need initialization code here.

kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)

// You may need initialization code here.
kv.dead = 0
kv.lastApplied = 0
kv.stateMachine = NewMemoryKVStateMachine()
kv.notifyChans = make(map[int]chan *OpReply)
kv.duplicateTable = make(map[int64]LastOperationInfo)

go kv.applyTask()
return kv
}

我们应该将这个值和 persister.RaftStateSize() 的值进行比较,一旦发现 maxraftstate 超过了其大小,则我们需要调用 raft 模块中的 Snapshot 方法,让 raft 模块进行日志的压缩。如果 maxraftstate 的值是 -1,则说明不需要 snapshot。

注意具体在代码中,我们需要修改三个地方:

一是在 apply 任务的后台线程中进行判断,如果必要的话则调用 Snapshot 方法。

二是对于 applyCh 中传递过来的 Snapshot 消息,我们需要从这个 Snapshot 取出对应的数据,然后恢复状态机。

三是在 KVServer 启动的时候,也需要直接从 Snapshot 中恢复状态。