Occam's razor Archive Pages Categories Tags

Raft Consensus in Go - Part 2: Log Replication and Commit

06 May 2021

In Part 1, we covered how Raft servers elect a leader through term-based voting. Once Raft has a leader, the hard part is done. The leader’s job is simple: replicate its log to followers and tell them when entries are safe to apply. Followers accept log entries from the leader and apply them in order.

This post covers log replication - how the leader keeps followers in sync, handles failures, and decides when a log entry is “committed” (safe to apply).

The full implementation is available at github.com/navgeet/raft.


The Log

The log is the core of Raft - it’s an ordered sequence of entries that will be applied to the state machine:

type LogEntry struct {
    // The term when this entry was added
    Term uint64

    // The index of this entry in the log (1-based)
    Index uint64

    // The actual command/data
    Command []byte
}

type Log interface {
    // Add an entry to the log
    Append(entry *LogEntry) error

    // Get entry at index
    Get(index uint64) (*LogEntry, error)

    // Remove entries from index onward
    Delete(index uint64) error

    // Last index in the log
    LastIndex() uint64

    // Last term in the log
    LastTerm() uint64
}

The leader appends new entries to its log. Followers receive these entries and append them too. Once a majority has an entry, it’s “committed.”


AppendEntries RPC

The leader sends AppendEntries RPCs to replicate log entries:

type AppendEntriesRequest struct {
    LeaderID string
    Term uint64

    // Previous entry info (for log consistency check)
    PrevLogIndex uint64
    PrevLogTerm uint64

    // Entries to replicate
    Entries []*LogEntry

    // Leader's commit index
    LeaderCommit uint64
}

type AppendEntriesResponse struct {
    Term uint64
    Success bool
    ConflictIndex uint64  // For faster conflict resolution
}

The key is PrevLogIndex and PrevLogTerm. Before appending new entries, the leader tells the follower what the previous entry was. If the follower doesn’t have an entry at that index with that term, it rejects the request. This ensures log consistency.


Replicating Entries

When a client sends a command to the leader, the leader appends it to its log and replicates it:

func (r *Raft) Apply(operation []byte) (*OperationResponse, error) {
    r.mu.Lock()

    // Only leader can accept operations
    if r.state != Leader {
        return nil, errNotLeader
    }

    // Append to our log
    entry := &LogEntry{
        Term: r.currentTerm,
        Index: r.log.LastIndex() + 1,
        Command: operation,
    }

    if err := r.log.Append(entry); err != nil {
        return nil, err
    }

    // Note the index so we can respond to client later
    index := entry.Index
    r.mu.Unlock()

    // Replicate to followers
    r.replicate()

    // Wait for entry to be committed
    return r.waitForCommit(index), nil
}

The leader waits for the entry to be replicated to a majority of servers before responding to the client. It does this by checking the commitIndex in a loop.


The Replicate Loop

On each heartbeat, the leader sends log entries to followers:

func (r *Raft) replicate() {
    for id, peer := range r.peers {
        if id == r.id {
            continue
        }

        go r.replicateToPeer(peer)
    }
}

func (r *Raft) replicateToPeer(peer Peer) {
    r.mu.Lock()

    id := peer.ID()
    nextIndex := r.nextIndex[id]

    // Get entries to send, starting at nextIndex
    prevLogIndex := nextIndex - 1
    var prevLogTerm uint64
    if prevLogIndex > 0 {
        prevEntry, _ := r.log.Get(prevLogIndex)
        prevLogTerm = prevEntry.Term
    }

    // Get log entries starting at nextIndex
    entries := r.log.GetRange(nextIndex, r.log.LastIndex())

    request := AppendEntriesRequest{
        LeaderID: r.id,
        Term: r.currentTerm,
        PrevLogIndex: prevLogIndex,
        PrevLogTerm: prevLogTerm,
        Entries: entries,
        LeaderCommit: r.commitIndex,
    }

    r.mu.Unlock()

    response, err := peer.AppendEntries(request)
    if err != nil {
        return
    }

    r.mu.Lock()
    defer r.mu.Unlock()

    if response.Term > r.currentTerm {
        r.currentTerm = response.Term
        r.state = Follower
        r.persist()
        return
    }

    if response.Success {
        // Replication succeeded, update our tracking
        newMatchIndex := prevLogIndex + uint64(len(request.Entries))
        r.matchIndex[id] = newMatchIndex
        r.nextIndex[id] = newMatchIndex + 1
    } else {
        // Replication failed, try with earlier entries
        // This is log recovery - the follower's log diverged
        r.nextIndex[id] = response.ConflictIndex
    }
}

The nextIndex tracks, for each follower, what we should send next. If replication succeeds, we advance it. If it fails, we backtrack and try again.


Handling AppendEntries on the Follower

A follower receives AppendEntries and decides whether to accept the entries:

func (r *Raft) handleAppendEntries(request AppendEntriesRequest) AppendEntriesResponse {
    r.mu.Lock()
    defer r.mu.Unlock()

    response := AppendEntriesResponse{Term: r.currentTerm}

    // Reset election timeout - we got a message from leader
    r.lastContact = time.Now()

    // If request term is lower, reject
    if request.Term < r.currentTerm {
        return response
    }

    // Update term and become follower if needed
    if request.Term > r.currentTerm {
        r.currentTerm = request.Term
        r.votedFor = ""
        r.state = Follower
        r.persist()
    }

    // Check log consistency: do we have the previous entry?
    if request.PrevLogIndex > 0 {
        prevEntry, err := r.log.Get(request.PrevLogIndex)
        if err != nil || prevEntry.Term != request.PrevLogTerm {
            // Log divergence - we don't have the expected previous entry
            // Return the conflicting index for faster recovery
            response.ConflictIndex = r.log.LastIndex()
            return response
        }
    }

    // Append new entries
    for i, entry := range request.Entries {
        index := request.PrevLogIndex + uint64(i) + 1

        // Check if entry already exists
        existing, err := r.log.Get(index)
        if err == nil && existing.Term == entry.Term {
            // Already have this entry, skip
            continue
        }

        // Entry doesn't exist or has different term - delete everything after
        r.log.Delete(index)
        r.log.Append(entry)
    }

    // Update commit index if leader has committed further
    if request.LeaderCommit > r.commitIndex {
        r.commitIndex = min(request.LeaderCommit, r.log.LastIndex())
        r.applyCond.Broadcast()  // Wake up apply loop
    }

    response.Success = true
    return response
}

The key moment is checking the previous entry. If the follower doesn’t have it, the logs have diverged and replication fails. The leader will back off and try with earlier entries until it finds one they both have.


Log Recovery

Log recovery handles the case where the leader and a follower’s logs diverge. This happens when:

  1. A server was the leader but crashed
  2. It had uncommitted entries on its log
  3. It restarted and a new leader was elected
  4. The new leader might not have those uncommitted entries
// When AppendEntries fails with ConflictIndex:

// Fast recovery: binary search for matching prefix
r.nextIndex[id] = response.ConflictIndex

// Next attempt will have earlier prevLogIndex
// Keep backing off until we find a matching entry

Instead of backing off one entry at a time (slow), the response includes ConflictIndex to speed up recovery. In practice, this makes recovery go from O(n) to O(log n).


Committing Entries

The tricky part: when is an entry “safe to apply”? An entry is committed when the leader knows it’s been replicated to a majority.

The leader tracks this using matchIndex - for each follower, the highest index it has replicated:

func (r *Raft) updateCommitIndex() {
    if r.state != Leader {
        return
    }

    // Count how many replicas have each log index
    for index := r.commitIndex + 1; index <= r.log.LastIndex(); index++ {
        count := 1  // Count ourselves

        for _, matchIdx := range r.matchIndex {
            if matchIdx >= index {
                count++
            }
        }

        // If majority has this index, it's committed
        if count > len(r.peers)/2 {
            entry, _ := r.log.Get(index)

            // Only commit entries from current term
            // (see Raft paper for why this is important)
            if entry.Term == r.currentTerm {
                r.commitIndex = index
            }
        }
    }
}

The leader runs this check periodically (usually on heartbeat). Once commitIndex advances, it tells followers in the next AppendEntries RPC, and they apply the entries.


The Apply Loop

Both leader and followers have an apply loop that applies committed entries to the state machine:

func (r *Raft) applyLoop() {
    for r.state != Shutdown {
        r.mu.Lock()

        // Wait until there are entries to apply
        for r.lastApplied >= r.commitIndex && r.state != Shutdown {
            r.applyCond.Wait()
        }

        // Apply all committed but unapplied entries
        for r.lastApplied < r.commitIndex {
            r.lastApplied++

            entry, _ := r.log.Get(r.lastApplied)

            r.mu.Unlock()

            // Apply to state machine (without holding lock)
            response := r.fsm.Apply(entry.Command)

            r.mu.Lock()

            // Send response to client
            r.operationResponseCh <- OperationResponse{
                Term: entry.Term,
                Index: entry.Index,
                Operation: entry.Command,
                Response: response,
            }
        }

        r.mu.Unlock()
    }
}

The loop is synchronized with commitIndex updates via applyCond (a condition variable). When commitIndex advances, the apply loop wakes up and applies all committed entries.

The important detail: release the lock before applying to the state machine. Holding the Raft lock while applying is a deadlock risk if the state machine tries to call back into Raft.


What Can Go Wrong

Uncommitted entries: If you respond to a client before commitIndex reaches their entry’s index, the client might see the change and then lose it if the leader crashes. The client will see conflicting behavior.

Out-of-order application: If you apply entries out of order, the state machine will be inconsistent. Always apply in index order.

Lost updates: If nextIndex never advances because replication keeps failing, the leader never knows the entry is replicated and never commits it. The client times out waiting. This is why the ConflictIndex optimization matters - it lets log recovery succeed quickly.

Split brain: If two servers think they’re leader, they could both be appending entries and committing different values. But the election logic from Part 1 prevents this. A server can only become leader with a majority vote, so only one “leader” can exist at a time.

The replication logic is where Raft shines. It handles log divergence, crashes mid-replication, and network partitions. The leader and followers stay in sync despite failures.

blog comments powered by Disqus