Occam's razor Archive Pages Categories Tags

Distributing a Key-Value Store with Raft in Go

28 January 2023

In part 1, we built a single-node key-value store with persistence. It works, but there’s a glaring problem: if that node dies, your data is unavailable. The disk might be fine, but clients can’t reach it until someone restarts the process or provisions a new machine.

The solution is replication - keep copies of the data on multiple nodes so the system survives individual failures. But replication introduces a hard problem: how do you keep the copies consistent? If two clients write to different nodes simultaneously, which write wins?

This is where Raft comes in.


Why Raft?

Distributed consensus has been a solved problem since Paxos in 1989. But Paxos is notoriously difficult to understand and implement correctly. The original paper is dense, and real-world implementations require significant extensions that aren’t well-specified.

Raft was designed in 2014 specifically to be understandable. It provides the same guarantees as Paxos but breaks the problem into manageable pieces: leader election, log replication, and safety. The authors prioritized clarity over cleverness.

The key properties:


Raft Basics

Before diving into code, let’s understand how Raft works at a high level.

Leader Election

Nodes are in one of three states: follower, candidate, or leader. Time is divided into terms - logical clocks that increase monotonically. Each term has at most one leader.

Followers expect periodic heartbeats from the leader. If a follower’s election timeout expires without hearing from a leader, it becomes a candidate and starts an election. It votes for itself and requests votes from other nodes. If it receives votes from a majority, it becomes the leader.

This ensures the cluster always makes progress as long as a majority of nodes are up.

Log Replication

The leader maintains a log of operations. When a client sends a write, the leader appends it to its log and replicates it to followers. Once a majority of nodes have the entry, it’s committed and the leader applies it to its state machine.

Each log entry has an index and a term. The commit index tracks the highest entry known to be committed. Followers apply entries up to this index to their state machines.

Safety

Raft guarantees that if a log entry is committed, all future leaders will have that entry. This is enforced during elections - a candidate can only win if its log is at least as up-to-date as the majority.


Using hashicorp/raft

Implementing Raft correctly is hard. Edge cases around network partitions, timing, and log compaction are subtle. For production use, battle-tested libraries are the way to go.

HashiCorp’s raft library powers Consul, Nomad, and Vault. It handles the consensus protocol, and we just need to implement the state machine.

import "github.com/hashicorp/raft"

The key interface is FSM (Finite State Machine):

type FSM interface {
    Apply(*raft.Log) interface{}
    Snapshot() (FSMSnapshot, error)
    Restore(io.ReadCloser) error
}


Implementing the FSM

Let’s adapt our KV store to work as a Raft state machine. First, we define command types:

type CommandType uint8

const (
    CommandSet CommandType = iota
    CommandDelete
)

type Command struct {
    Type  CommandType
    Key   string
    Value string
}

Now the FSM implementation:

type KVStoreFSM struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewKVStoreFSM() *KVStoreFSM {
    return &KVStoreFSM{
        data: make(map[string]string),
    }
}

func (f *KVStoreFSM) Apply(log *raft.Log) interface{} {
    var cmd Command
    if err := json.Unmarshal(log.Data, &cmd); err != nil {
        return err
    }

    f.mu.Lock()
    defer f.mu.Unlock()

    switch cmd.Type {
    case CommandSet:
        f.data[cmd.Key] = cmd.Value
    case CommandDelete:
        delete(f.data, cmd.Key)
    }

    return nil
}

func (f *KVStoreFSM) Get(key string) (string, bool) {
    f.mu.RLock()
    defer f.mu.RUnlock()
    val, ok := f.data[key]
    return val, ok
}

Note that Apply is the only way to mutate state. Raft guarantees it’s called in the same order on all nodes.

Snapshots prevent the log from growing forever. When the log gets too large, Raft takes a snapshot and discards old entries:

func (f *KVStoreFSM) Snapshot() (raft.FSMSnapshot, error) {
    f.mu.RLock()
    defer f.mu.RUnlock()

    // Clone the data
    data := make(map[string]string, len(f.data))
    for k, v := range f.data {
        data[k] = v
    }

    return &kvSnapshot{data: data}, nil
}

type kvSnapshot struct {
    data map[string]string
}

func (s *kvSnapshot) Persist(sink raft.SnapshotSink) error {
    err := func() error {
        b, err := json.Marshal(s.data)
        if err != nil {
            return err
        }
        if _, err := sink.Write(b); err != nil {
            return err
        }
        return sink.Close()
    }()

    if err != nil {
        sink.Cancel()
    }
    return err
}

func (s *kvSnapshot) Release() {}

Restore rebuilds state from a snapshot:

func (f *KVStoreFSM) Restore(rc io.ReadCloser) error {
    defer rc.Close()

    var data map[string]string
    if err := json.NewDecoder(rc).Decode(&data); err != nil {
        return err
    }

    f.mu.Lock()
    f.data = data
    f.mu.Unlock()

    return nil
}


Cluster Setup

Setting up a Raft cluster requires configuring storage, transport, and the initial cluster membership.

func setupRaft(nodeID string, addr string, fsm raft.FSM, bootstrap bool) (*raft.Raft, error) {
    config := raft.DefaultConfig()
    config.LocalID = raft.ServerID(nodeID)

    // Storage for logs and stable state
    logStore, err := raftboltdb.NewBoltStore(filepath.Join("data", nodeID, "raft-log.db"))
    if err != nil {
        return nil, err
    }
    stableStore, err := raftboltdb.NewBoltStore(filepath.Join("data", nodeID, "raft-stable.db"))
    if err != nil {
        return nil, err
    }
    snapshotStore, err := raft.NewFileSnapshotStore(filepath.Join("data", nodeID), 3, os.Stderr)
    if err != nil {
        return nil, err
    }

    // TCP transport
    tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
    if err != nil {
        return nil, err
    }
    transport, err := raft.NewTCPTransport(addr, tcpAddr, 3, 10*time.Second, os.Stderr)
    if err != nil {
        return nil, err
    }

    r, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
    if err != nil {
        return nil, err
    }

    // Bootstrap the first node
    if bootstrap {
        cfg := raft.Configuration{
            Servers: []raft.Server{
                {
                    ID:      raft.ServerID(nodeID),
                    Address: raft.ServerAddress(addr),
                },
            },
        }
        r.BootstrapCluster(cfg)
    }

    return r, nil
}

To add a node to an existing cluster, the leader must add it:

func (s *Server) addPeer(nodeID, addr string) error {
    if s.raft.State() != raft.Leader {
        return errors.New("not the leader")
    }

    f := s.raft.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0)
    return f.Error()
}


Handling Client Requests

Writes must go through the leader. If a client contacts a follower, we need to either forward the request or tell the client where the leader is.

type Server struct {
    raft *raft.Raft
    fsm  *KVStoreFSM
}

func (s *Server) Set(key, value string) error {
    if s.raft.State() != raft.Leader {
        return errors.New("not the leader")
    }

    cmd := Command{Type: CommandSet, Key: key, Value: value}
    data, err := json.Marshal(cmd)
    if err != nil {
        return err
    }

    f := s.raft.Apply(data, 10*time.Second)
    return f.Error()
}

func (s *Server) Delete(key string) error {
    if s.raft.State() != raft.Leader {
        return errors.New("not the leader")
    }

    cmd := Command{Type: CommandDelete, Key: key}
    data, err := json.Marshal(cmd)
    if err != nil {
        return err
    }

    f := s.raft.Apply(data, 10*time.Second)
    return f.Error()
}

Reads are trickier. Reading from the FSM directly is fast but might return stale data if there’s a network partition and the leader has been deposed. For strong consistency, use raft.VerifyLeader() before reading:

func (s *Server) Get(key string) (string, bool, error) {
    // Ensure we're still the leader
    if err := s.raft.VerifyLeader().Error(); err != nil {
        return "", false, err
    }

    val, ok := s.fsm.Get(key)
    return val, ok, nil
}

For better performance with slightly relaxed consistency, you can skip the verification and read directly from any node’s FSM. This is useful for workloads that can tolerate reading slightly stale data.


Demo: Surviving Node Failures

Let’s see the cluster in action. Start three nodes:

# Terminal 1 - Bootstrap the first node
./kvstore --id node1 --addr localhost:9001 --http :8001 --bootstrap

# Terminal 2
./kvstore --id node2 --addr localhost:9002 --http :8002 --join localhost:9001

# Terminal 3
./kvstore --id node3 --addr localhost:9003 --http :8003 --join localhost:9001

Write some data:

curl -X PUT -d "distributed" http://localhost:8001/kv/hello

Read it from any node:

curl http://localhost:8002/kv/hello
# distributed

Now kill the leader (node1). After a brief election timeout, one of the remaining nodes becomes the new leader:

curl http://localhost:8002/kv/hello
# distributed

curl -X PUT -d "still works" http://localhost:8002/kv/hello

The cluster keeps working. When node1 comes back, it rejoins as a follower and catches up on missed writes.


Tradeoffs

Distributed consensus isn’t free. Here’s what you give up:

Latency: Every write requires a round-trip to a majority of nodes. With 5 nodes spread across data centers, that’s potentially hundreds of milliseconds per write. Compare that to single-digit microseconds for an in-memory write.

Availability during partitions: With Raft’s strong consistency, if a majority of nodes can’t communicate, the cluster stops accepting writes. This is the CAP theorem in action - we chose consistency over availability.

Complexity: More moving parts means more ways to fail. Network issues, clock skew, and split-brain scenarios all need careful handling.

For many use cases, these tradeoffs are worth it. Losing a few hundred milliseconds per write is acceptable when the alternative is data loss or hours of downtime during failover.


Conclusion

We took our single-node KV store and distributed it across multiple nodes using Raft. The cluster now survives node failures while maintaining strong consistency - clients always see their writes after acknowledgment, even if the leader changes.

The hashicorp/raft library handles the hard parts: leader election, log replication, and safety guarantees. We just implemented the FSM interface to apply commands and manage snapshots.

Potential improvements:

The interface stayed the same - clients don’t need to know they’re talking to a distributed system. That’s the power of good abstractions.

blog comments powered by Disqus