前面一节我们处理了单个 Group 的逻辑,其实比较简单,和我们前面在 Lab3 实现的分布式 KV 是基本类似的。
今天这一节来处理下配置变更的问题。
shardkv 需要定时从 shardctrler 这边拉取最新的配置,然后根据配置来确定哪些 shard 应该是需要进行迁移的。
上一节我们已经写了一个简单的拉取配置的后台任务,但是按照 lab 的提示,我们每次只能够拉取一个配置,并且按照顺序处理,这样做的目的主要是为了避免覆盖还未完成的配置变更任务。
1 2 3 4 5 6 7 8 9 10 11 12
| func (kv *ShardKV) fetchConfigTask() { for !kv.killed() { kv.mu.Lock() newConfig := kv.mck.Query(kv.currentConfig.Num + 1) kv.mu.Unlock()
kv.ConfigCommand(RaftCommand{ConfigChange, newConfig}, &OpReply{}) time.Sleep(FetchConfigInterval) } }
|
拉取完毕配置之后,我们需要构造一个对应的命令,然后传到 raft 模块进行同步。
这里需要做一点小的改造,因为我们之前传入到 raft 的都是客户端的操作,这里我们需要加上配置变更的操作。并且在 apply 协程中进行反解析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| var opReply *OpReply raftCommand := message.Command.(RaftCommand) if raftCommand.CmdType == ClientOpeartion { op := raftCommand.Data.(Op) if op.OpType != OpGet && kv.requestDuplicated(op.ClientId, op.SeqId) { opReply = kv.duplicateTable[op.ClientId].Reply } else { shardId := key2shard(op.Key) opReply = kv.applyToStateMachine(op, shardId) if op.OpType != OpGet { kv.duplicateTable[op.ClientId] = LastOperationInfo{ SeqId: op.SeqId, Reply: opReply, } } } } else { opReply = kv.handleConfigChangeMessage(raftCommand) }
|
根据最新状态的 Config 信息,我们能够判断出当前 Group 中负责哪些 shard,也能够判断出某个 shard 转移到当前 shard 中。