type RaftLog struct { snapLastIdx int snapLastTerm int
// contains index [1, snapLastIdx] snapshot []byte // the first entry is `snapLastIdx`, but only contains the snapLastTerm // the entries between (snapLastIdx, snapLastIdx+len(tailLog)-1] have real data tailLog []LogEntry }
换个角度理解,就是我们在 tailLog 的开始 mock 了一个假的 snapshot 的 lastIncludeEntry。且当 snapshot 为空时,该 lastIncludeEntry 的 index 和 term 都是 0(我们在[Raft PartC 实现和优化]定义的边界常量) 。
初始化
对 RaftLog 初始化,有两条路径:
构造函数:在通过 Make 构造 Raft 实例时使用,此时各个字段留默认值就行。
反序列化函数:读取宕机前(如果有)持久化的各个字段,对 RaftLog 进行填充。
且通常,两个是在 Make 函数中先后执行,因此构造函数其实可以留空,但下面我们为了将来的可扩展性仍然实现了,仅在调用时传空。
// called as : rf.log = NewLog(InvalidIndex, InvalidTerm, nil, nil) funcNewLog(snapLastIdx, snapLastTerm int, snapshot []byte, entries []LogEntry) *RaftLog { rl := &RaftLog{ snapLastIdx: snapLastIdx, snapLastTerm: snapLastTerm, snapshot: snapshot, } // make the len = 0, cap = 1 + len(entries) rl.tailLog := make([]LogEntry, 0, 1 + len(entries)) rl.tailLog = append(rl.tailLog, LogEntry{ Term: snapLastTerm, }) rl.tailLog = append(rl.tailLog, entries...)
return rl }
// return detailed error for the caller to log func(rl *RaftLog) readPersist(d *labgob.LabDecoder) error { var lastIdx int if err := d.Decode(&lastIdx); err != nil { return fmt.Errorf("decode last include index failed") } rl.snapLastIdx = lastIdx
var lastTerm int if err := d.Decode(&lastTerm); err != nil { return fmt.Errorf("decode last include term failed") } rl.snapLastTerm = lastTerm
var log []LogEntry if err := d.Decode(&log); err != nil { return fmt.Errorf("decode tail log failed") } rl.tailLog = log
// the dummy log is counted func(rl *RaftLog) size() int { return rl.snapLastIdx + len(rl.tailLog) }
// access the index `rl.snapLastIdx` is allowed, although it's not exist actually. func(rl *RaftLog) idx(logicIdx int) int { if logicIdx < rl.snapLastIdx || logicIdx >= rl.size() { panic(fmt.Sprintf("%d is out of [%d, %d]", logicIdx, rl.snapLastIdx+1, rl.size()-1)) } return logicIdx - rl.snapLastIdx }
// the service says it has created a snapshot that has // all info up to and including index. this means the // service no longer needs the log through (and including) // that index. Raft should now trim its log as much as possible. func(rf *Raft) Snapshot(index int, snapshot []byte) { // Your code here (PartD). rf.mu.Lock() defer rf.mu.Unlock() LOG(rf.me, rf.currentTerm, DSnap, "Snap on %d", index)
if index <= rf.log.snapLastIdx || index > rf.commitIndex { LOG(rf.me, rf.currentTerm, DSnap, "Could not snapshot beyond [%d, %d]", rf.log.snapLastIdx+1, rf.commitIndex) return }
// --- raft_log.go func(rl *RaftLog) doSnapshot(index int, snapshot []byte) { // since idx() will use rl.snapLastIdx, so we should keep it first idx := rl.idx(index)
rl.snapLastTerm = rl.tailLog[idx].Term rl.snapLastIdx = index rl.snapshot = snapshot