In part 1, we built a single-node key-value store with persistence. It works, but there’s a glaring problem: if that node dies, your data is unavailable. The disk might be fine, but clients can’t reach it until someone restarts the process or provisions a new machine.
The solution is replication - keep copies of the data on multiple nodes so the system survives individual failures. But replication introduces a hard problem: how do you keep the copies consistent? If two clients write to different nodes simultaneously, which write wins?
This is where Raft comes in.
Distributed consensus has been a solved problem since Paxos in 1989. But Paxos is notoriously difficult to understand and implement correctly. The original paper is dense, and real-world implementations require significant extensions that aren’t well-specified.
Raft was designed in 2014 specifically to be understandable. It provides the same guarantees as Paxos but breaks the problem into manageable pieces: leader election, log replication, and safety. The authors prioritized clarity over cleverness.
The key properties:
Before diving into code, let’s understand how Raft works at a high level.
Leader Election
Nodes are in one of three states: follower, candidate, or leader. Time is divided into terms - logical clocks that increase monotonically. Each term has at most one leader.
Followers expect periodic heartbeats from the leader. If a follower’s election timeout expires without hearing from a leader, it becomes a candidate and starts an election. It votes for itself and requests votes from other nodes. If it receives votes from a majority, it becomes the leader.
This ensures the cluster always makes progress as long as a majority of nodes are up.
Log Replication
The leader maintains a log of operations. When a client sends a write, the leader appends it to its log and replicates it to followers. Once a majority of nodes have the entry, it’s committed and the leader applies it to its state machine.
Each log entry has an index and a term. The commit index tracks the highest entry known to be committed. Followers apply entries up to this index to their state machines.
Safety
Raft guarantees that if a log entry is committed, all future leaders will have that entry. This is enforced during elections - a candidate can only win if its log is at least as up-to-date as the majority.
Implementing Raft correctly is hard. Edge cases around network partitions, timing, and log compaction are subtle. For production use, battle-tested libraries are the way to go.
HashiCorp’s raft library powers Consul, Nomad, and Vault. It handles the consensus protocol, and we just need to implement the state machine.
import "github.com/hashicorp/raft"
The key interface is FSM (Finite State Machine):
type FSM interface {
Apply(*raft.Log) interface{}
Snapshot() (FSMSnapshot, error)
Restore(io.ReadCloser) error
}
Let’s adapt our KV store to work as a Raft state machine. First, we define command types:
type CommandType uint8
const (
CommandSet CommandType = iota
CommandDelete
)
type Command struct {
Type CommandType
Key string
Value string
}
Now the FSM implementation:
type KVStoreFSM struct {
mu sync.RWMutex
data map[string]string
}
func NewKVStoreFSM() *KVStoreFSM {
return &KVStoreFSM{
data: make(map[string]string),
}
}
func (f *KVStoreFSM) Apply(log *raft.Log) interface{} {
var cmd Command
if err := json.Unmarshal(log.Data, &cmd); err != nil {
return err
}
f.mu.Lock()
defer f.mu.Unlock()
switch cmd.Type {
case CommandSet:
f.data[cmd.Key] = cmd.Value
case CommandDelete:
delete(f.data, cmd.Key)
}
return nil
}
func (f *KVStoreFSM) Get(key string) (string, bool) {
f.mu.RLock()
defer f.mu.RUnlock()
val, ok := f.data[key]
return val, ok
}
Note that Apply is the only way to mutate state. Raft guarantees it’s called in the same order on all nodes.
Snapshots prevent the log from growing forever. When the log gets too large, Raft takes a snapshot and discards old entries:
func (f *KVStoreFSM) Snapshot() (raft.FSMSnapshot, error) {
f.mu.RLock()
defer f.mu.RUnlock()
// Clone the data
data := make(map[string]string, len(f.data))
for k, v := range f.data {
data[k] = v
}
return &kvSnapshot{data: data}, nil
}
type kvSnapshot struct {
data map[string]string
}
func (s *kvSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
b, err := json.Marshal(s.data)
if err != nil {
return err
}
if _, err := sink.Write(b); err != nil {
return err
}
return sink.Close()
}()
if err != nil {
sink.Cancel()
}
return err
}
func (s *kvSnapshot) Release() {}
Restore rebuilds state from a snapshot:
func (f *KVStoreFSM) Restore(rc io.ReadCloser) error {
defer rc.Close()
var data map[string]string
if err := json.NewDecoder(rc).Decode(&data); err != nil {
return err
}
f.mu.Lock()
f.data = data
f.mu.Unlock()
return nil
}
Setting up a Raft cluster requires configuring storage, transport, and the initial cluster membership.
func setupRaft(nodeID string, addr string, fsm raft.FSM, bootstrap bool) (*raft.Raft, error) {
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(nodeID)
// Storage for logs and stable state
logStore, err := raftboltdb.NewBoltStore(filepath.Join("data", nodeID, "raft-log.db"))
if err != nil {
return nil, err
}
stableStore, err := raftboltdb.NewBoltStore(filepath.Join("data", nodeID, "raft-stable.db"))
if err != nil {
return nil, err
}
snapshotStore, err := raft.NewFileSnapshotStore(filepath.Join("data", nodeID), 3, os.Stderr)
if err != nil {
return nil, err
}
// TCP transport
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
transport, err := raft.NewTCPTransport(addr, tcpAddr, 3, 10*time.Second, os.Stderr)
if err != nil {
return nil, err
}
r, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
return nil, err
}
// Bootstrap the first node
if bootstrap {
cfg := raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(nodeID),
Address: raft.ServerAddress(addr),
},
},
}
r.BootstrapCluster(cfg)
}
return r, nil
}
To add a node to an existing cluster, the leader must add it:
func (s *Server) addPeer(nodeID, addr string) error {
if s.raft.State() != raft.Leader {
return errors.New("not the leader")
}
f := s.raft.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0)
return f.Error()
}
Writes must go through the leader. If a client contacts a follower, we need to either forward the request or tell the client where the leader is.
type Server struct {
raft *raft.Raft
fsm *KVStoreFSM
}
func (s *Server) Set(key, value string) error {
if s.raft.State() != raft.Leader {
return errors.New("not the leader")
}
cmd := Command{Type: CommandSet, Key: key, Value: value}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
f := s.raft.Apply(data, 10*time.Second)
return f.Error()
}
func (s *Server) Delete(key string) error {
if s.raft.State() != raft.Leader {
return errors.New("not the leader")
}
cmd := Command{Type: CommandDelete, Key: key}
data, err := json.Marshal(cmd)
if err != nil {
return err
}
f := s.raft.Apply(data, 10*time.Second)
return f.Error()
}
Reads are trickier. Reading from the FSM directly is fast but might return stale data if there’s a network partition and the leader has been deposed. For strong consistency, use raft.VerifyLeader() before reading:
func (s *Server) Get(key string) (string, bool, error) {
// Ensure we're still the leader
if err := s.raft.VerifyLeader().Error(); err != nil {
return "", false, err
}
val, ok := s.fsm.Get(key)
return val, ok, nil
}
For better performance with slightly relaxed consistency, you can skip the verification and read directly from any node’s FSM. This is useful for workloads that can tolerate reading slightly stale data.
Let’s see the cluster in action. Start three nodes:
# Terminal 1 - Bootstrap the first node
./kvstore --id node1 --addr localhost:9001 --http :8001 --bootstrap
# Terminal 2
./kvstore --id node2 --addr localhost:9002 --http :8002 --join localhost:9001
# Terminal 3
./kvstore --id node3 --addr localhost:9003 --http :8003 --join localhost:9001
Write some data:
curl -X PUT -d "distributed" http://localhost:8001/kv/hello
Read it from any node:
curl http://localhost:8002/kv/hello
# distributed
Now kill the leader (node1). After a brief election timeout, one of the remaining nodes becomes the new leader:
curl http://localhost:8002/kv/hello
# distributed
curl -X PUT -d "still works" http://localhost:8002/kv/hello
The cluster keeps working. When node1 comes back, it rejoins as a follower and catches up on missed writes.
Distributed consensus isn’t free. Here’s what you give up:
Latency: Every write requires a round-trip to a majority of nodes. With 5 nodes spread across data centers, that’s potentially hundreds of milliseconds per write. Compare that to single-digit microseconds for an in-memory write.
Availability during partitions: With Raft’s strong consistency, if a majority of nodes can’t communicate, the cluster stops accepting writes. This is the CAP theorem in action - we chose consistency over availability.
Complexity: More moving parts means more ways to fail. Network issues, clock skew, and split-brain scenarios all need careful handling.
For many use cases, these tradeoffs are worth it. Losing a few hundred milliseconds per write is acceptable when the alternative is data loss or hours of downtime during failover.
We took our single-node KV store and distributed it across multiple nodes using Raft. The cluster now survives node failures while maintaining strong consistency - clients always see their writes after acknowledgment, even if the leader changes.
The hashicorp/raft library handles the hard parts: leader election, log replication, and safety guarantees. We just implemented the FSM interface to apply commands and manage snapshots.
Potential improvements:
ReadIndex for consistent reads without leader verification overheadThe interface stayed the same - clients don’t need to know they’re talking to a distributed system. That’s the power of good abstractions.