18. Raft PartD 日志重构
要支持 Compaction,需要对日志进行诸多改造。干脆,我们将相关逻辑封装到一个 struct 中。
于是我们新创建一个 RaftLog 的结构体,以支持:
在 index 处进行 Snapshot:将 index 以及之前的日志阶段掉
基本读写操作:
读取:在 Snapshot 存在时,需要做下标转换,但在没有 Snapshot 时又不用,需要想办法进行统一。
追加:包括在末尾追加(用于应用层给 Leader 追加日志)、在给定下标处覆盖追加(用于 Leader 覆写 Follower 日志)等等。
当然,以上需求是在有一个基本想法之后,再去看其他所有使用到 RaftLog 的代码,然后总结出来的。但为了行文方便,我直接放在了开始,所以乍一看会有些突兀。
RaftLog 实现
结构体字段
我们新建一个文件,起名为 raft_log.go ,然后定义一个 RaftLog 的结构体。主要包含三部分:
前面日志截断后 compact 成的 snapshot
后面的剩余日志 tailLog
两者的分界线,也就是上一节要求中提到的:lastIncludeTerm/lastInc ...
19. Raft PartD 快照数据流
本节我们要实现 InstallSnapshot 相关 RPC 。为了避免丢三落四,我们遵循一条 snapshot 数据流主线,来分四个步骤,一步步实现:
Leader 的应用层调用 raft.Snapshot(index, snapshot) 函数
保存 snapshot
截断日志
持久化
Leader 在需要时使用该 snapshot 构造参数发送 RPC 给 Follower
Follower 接收到 Leader RPC 后替换本地日志,并将其持久化
Follower 通过 ApplyMsg 将 snapshot 传给 Follower 应用层
和之前一样,为了保持模块的内聚性,我们新建一个文件raft_compaction.go 将 Snapshot() 接口和 InstallSnapshot 相关 RPC 都放在一块。
调用 Snapshot() 接口
Leader 的应用层调用 Snapshot(index, snapshot) 接口后,我们会在 index 处截断现有日志,将 snapshot 保存在 rf.log 中,同时将其持久化,以应对可能得宕机重启。
...
21. Raft 的总结和优化
看到这里,恭喜你,已经实现了一个“麻雀虽小、五脏俱全”的基本版本的 Raft(当然,我们没有实现成员变更)。如果你是跟着课程一步步代码敲过来的,想必你会有很多经验、也有很多困惑,欢迎留言跟大家分享。
整体测试
针对多线程并发访问的数据竞态测试:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859➜ raft git:(raft) ✗ go test -run Part -race Test (PartA): initial election ... ... Passed -- 3.1 3 94 26978 0Test (PartA): election after network failure ... ... Passed -- 4.5 3 184 38101 0Test (PartA): multiple elections ... ... Passed -- 5.4 ...
20. Raft PartD 调试和小结
由于事先做了数据流图,因此总体来说 PartD 没有什么大问题。剩下有些问题,上一节中提到过一些;其他的集中在两个部分。
问题修复
问题包括两部分:
在 Snapshot 时对 index 的检查
使用 rf.log.at 前对传入下标的检查
这两个问题单独在 Raft 的测试中是检测不出来的,要通过之后分布式 KV 小结才能检查出来,但根据主题的亲和性,还是要放在这里讲(视频中没有提到)
Snapshot 下标检查
之前只检查了下界 rf.log.snapLastIdx (不要重复进行 snapshot),但也要加上检查上界:rf.commitIndex。即,不能对没有提交的日志进行 snapshot。
12345678910111213141516// --- raft_compaction.gofunc (rf *Raft) Snapshot(index int, snapshot []byte) { // Your code here (PartD). rf.mu.Lock() defer rf.mu.Unlock() ...
22 基于 raft 的分布式 KV 概述
在前面的第一部分的学习中,大家已经对分布式大致理论、分布式共识算法有了基本的认识,并且已经完整的实现了 raft 的各个重要组成部分,学习到了很多关于分布式系统的基本概念以及代码实践。
接下来,就要在实践中更进一步,基于前面实现的 raft 算法,去构建一个高可用的分布式 Key/Value 服务,通过这一部分的学习,你将会真正的将 raft 使用起来,这不仅可以帮助你在更深的维度去理解分布式理论、raft 共识算法,并且让你对生产环境中的高可用、容错的项目有具体的实践经验。
大致架构
我们需要构建的分布式 KV 服务是什么样的架构?
我们的分布式 KV 服务将会是一个复制状态机,由几个使用 raft 进行状态复制的 kv 服务节点组成。分布式 KV 服务需要保证在集群大多数节点正常的情况下依然能够正常提供服务,即使有一些其他的错误或者网络分区。
大致的流程是客户端向后端的 servers 发起请求,后端的服务是由多个节点组成的,每个节点之间使用 raft 进行状态复制,客户端会选择将请求发送到 Leader 节点,然后由 Leader 节点进行状态复制,即发送日志,当收到多数的节点 ...
24 kvraft Server 端处理
前面客户端的逻辑比较简单,接下来我们专注于分布式 KV 的服务端的处理,也就是说当客户端的请求发送过来之后,我们的后端的分布式 kv server 应该怎样处理这个请求。
按照我们前面梳理的大致交互逻辑,客户端的请求到达之后,我们需要首先通过 raft 模块将其存储到 raft 日志中,回想一下我们在前面实现的 raft 库中,提供了一个 Start 入口方法,这个方法是 raft 接收外部请求的,我们会将请求通过这个方法传递过去。
1234567891011121314151617func (rf *Raft) Start(command interface{}) (int, int, bool) { rf.mu.Lock() defer rf.mu.Unlock() if rf.role != Leader { return 0, 0, false } rf.log = append(rf.log, LogEntry{ CommandValid: true, Comman ...
23 kvraft Client 端处理
前面一节主要了解了我们基于 raft 实现的分布式 KV 的大致架构和代码框架,从这一节开始就需要开始具体的代码逻辑了,我们首先需要实现的是客户端的逻辑。
上一节中提到了客户端的结构体 Clerk:
1234type Clerk struct { servers []*labrpc.ClientEnd // You will have to modify this struct.}
可以看到其中维护了 servers 列表,表示的是后端分布式 KV 服务的所有节点信息,我们可以通过这个信息去向指定的节点发送数据读写的请求。
前面提到了三种类型的请求,主要是:
Get
Put
Append
每个 kv 服务的节点都是一个 raft 的 peer,客户端发送请求到 kv 服务的 Leader 节点,然后 Leader 节点会存储请求日志在本地,然后将日志通过 raft 发送给其他的节点进行状态同步。所以 raft 日志其实存储的是一连串客户端请求,然后 server 节点会按照顺序执行请求,并将结果存储到状态机中。
这里 Clerk 发送请求的时候,由于事先 ...
25 kvraft 的节点故障与重复请求
前面一节我们处理了 server 端的大致逻辑,我们知道 kv 服务是由多个 raft 节点组成的,而由于分布式系统中不可避免的节点故障等缘故,我们需要去处理这样的 failure 问题,这样才能够保证我们的分布式系统是始终可用、可靠的,即在大多数节点正常下依然能够正常响应请求,容忍少数节点发生各种未知错误。
例如,如果我们的客户端向集群中的 Leader 发送了一个请求,Leader 接收到请求之后,因为某种原因出现了故障,并且降级为 Follower,这时候客户端便一直等不到回复。直到客户端请求超时,那么它就会向下一个节点重试请求,直到正确得到了 Leader 的回应。
因此在客户端的 Clerk 代码中,如果节点发送故障,需要轮询下一个节点,重试客户端请求,直到得到了正确的响应,才结束请求。
123456789101112131415161718func (ck *Clerk) Get(key string) string { // You will have to modify this function. args := GetArgs{ ...
27 基于 multi raft 的 shardkv 概述
前面的一个部分,我们完整实现了一个基于 raft,并且满足高可用、线性一致性的分布式 KV 存储系统。
这个新的部分会更进一步,我们会基于 raft 构建一个分片的分布式 KV。
一个分片(shard)指的是一个 Key/Value 数据集的一部分数据,比如,对一个有很多数据的 KV 存储系统中,所有以 ‘a’ 开头的 key 可以是一个分片,所有以 ‘b’ 开头的 key 可以是一个分片,当然这里只是简单举一个 shard 划分的例子,实际上划分数据的办法还有很多,最常见的是 Hash 和 Range。
我们为什么需要对数据进行分区?一个非常重要的原因是因为性能,在不分区的情况下,所有数据的读写请求都会在一个分片中,这在并发量较大的情况下可能存在一定的瓶颈。
如果对数据做了分区,那么不同分区之间的数据读写请求是可以并行的,这能够较大的提升 KV 系统的并发能力。
大致架构
我们这个部分需要完成的分片的分布式 KV 存储系统由两个主要的部分组成。首先是复制组(Replica Group),它指的是处理一个或多个 shard 的 KV 服务,通常是由一个 Raft 集群组成的,所以一个完 ...
26 带snapshot 的 kvraft 实现
在前面的几个章节中,我们基本实现了分布式 KV 的大致逻辑,构建了一个高可用的分布式 KV 系统。
但是我们并没有使用前面在 raft 库中实现的 Snapshot 方法,这样一来,如果系统重启了,raft 需要加载全量的数据去恢复状态,如果节点中的数据量较大的话,这样会消耗较长的时间去加载。
所以我们可以利用 Snapshot 的功能,对 raft 日志进行压缩,降低日志存储的空间,减少 KVServer 集群在重启时的耗时。
在代码中,KVServer 维护了一个字段名为 maxraftstate,它会由使用者进行设置,表示的是允许的最大的持久化的 raft 的日志大小。
1234567891011type KVServer struct { mu sync.Mutex me int rf *raft.Raft applyCh chan raft.ApplyMsg dead int32 // set by Kill() maxraftstate int // snapshot if log grows this ...