func(rf *Raft) applyTicker() { for !rf.killed() { rf.mu.Lock() rf.applyCond.Wait()
entries := make([]LogEntry, 0) // should start from rf.lastApplied+1 instead of rf.lastApplied for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ { entries = append(entries, rf.log[i]) } rf.mu.Unlock()
for i, entry := range entries { rf.applyCh <- ApplyMsg{ CommandValid: entry.CommandValid, Command: entry.Command, CommandIndex: rf.lastApplied + 1 + i, } }
reply.Term = rf.currentTerm reply.Success = false // align the term if args.Term < rf.currentTerm { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Higher term, T%d<T%d", args.LeaderId, args.Term, rf.currentTerm) return } if args.Term >= rf.currentTerm { rf.becomeFollowerLocked(args.Term) }
// return failure if the previous log not matched if args.PrevLogIndex >= len(rf.log) { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Follower log too short, Len:%d <= Prev:%d", args.LeaderId, len(rf.log), args.PrevLogIndex) return } if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { LOG(rf.me, rf.currentTerm, DLog2, "<- S%d, Reject Log, Prev log not match, [%d]: T%d != T%d", args.LeaderId, args.PrevLogIndex, rf.log[args.PrevLogIndex].Term, args.PrevLogTerm) return }
reply.Success = true // append the leader logs to local rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...) LOG(rf.me, rf.currentTerm, DLog2, "Follower append logs: (%d, %d]", args.PrevLogIndex, args.PrevLogIndex+len(args.Entries))
// update the commit index if needed and indicate the apply loop to apply if args.LeaderCommit > rf.commitIndex { LOG(rf.me, rf.currentTerm, DApply, "Follower update the commit index %d->%d", rf.commitIndex, args.LeaderCommit) rf.commitIndex = args.LeaderCommit if rf.commitIndex >= len(rf.log) { rf.commitIndex = len(rf.log) - 1 } rf.applyCond.Signal() }
// reset the election timer, promising not start election in some interval rf.resetElectionTimerLocked() }
上面 Peer 收到 Leader 发来的日志同步请求处理逻辑中,
1 2 3
if args.Term >= rf.currentTerm { rf.becomeFollowerLocked(args.Term) }