MIT 6.5840 Lab4:KV over Raft

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

KV over Raft 请求路径架构图
Lab4 的请求路径:Client → RPC Handler → Raft → Apply → KVDB → 回复 Client。

一句话总结

Lab4 在 Raft 之上搭建一个线性化的 KV 服务——Client 看到的效果就像操作一台机器,但背后是多台机器在同步。

为什么要做这个

Lab3 给了我们一个”日志一致”的引擎,但它只管日志顺序,不管业务。Lab4 要把它变成真正能用的 KV 服务:

  • Client 发 Get/Put/Append → 多数副本确认 → 回复 Client
  • Leader 挂了自动切换,Client 无感知
  • 日志太长了自动做快照

核心架构:两个世界的桥梁

Lab4 的本质就是在”KV 业务世界”和”Raft 共识世界”之间搭一座桥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────────────────────────────────┐
│ KV Server │
│ │
│ RPC Handler ──→ rf.Start(op) ──→ Raft 日志 │
│ │ │ │
│ │ 等待... │ 复制 │
│ │ ▼ │
│ │ applyCh │
│ │ │ │
│ waitCh[idx] ←── Applier ←──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ 回复 Client KVDB │
└─────────────────────────────────────────────┘

Channel-per-Index 模式

这是 Lab4 最精妙的设计。问题是:RPC Handler 怎么知道 Raft 什么时候把自己的命令 apply 了?

答案:给每个 log index 一个 channel,Handler 等在上面,Applier apply 后往里面发信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type KVServer struct {
KVDB map[string]string
waitChMap map[int]chan *Op // log index → 通知 channel
LastRequestMap map[int64]int64 // clientId → 最大已处理 requestId
}

// RPC Handler:提交到 Raft 并等待
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
op := Op{Key: args.Key, Value: args.Value, Op: args.Op,
ClientId: args.ClientId, RequestId: args.RequestId}

index, term, isLeader := kv.rf.Start(op)
if !isLeader {
reply.Err = ErrWrongLeader
return
}

ch := kv.getWaitCh(index) // 创建/获取这个 index 的 channel

select {
case appliedOp := <-ch: // 等待 Applier 通知
// 验证是不是自己的命令(可能被新 Leader 覆盖了)
if appliedOp.ClientId == op.ClientId && appliedOp.RequestId == op.RequestId {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
case <-time.After(500 * time.Millisecond): // 超时
reply.Err = ErrWrongLeader
}

kv.deleteWaitCh(index) // 清理,防止内存泄漏
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Applier:从 applyCh 读取已提交的命令,执行并通知
func (kv *KVServer) applier() {
for msg := range kv.applyCh {
op := msg.Command.(Op)

// 去重检查
if !kv.isInvalidRequest(op.ClientId, op.RequestId) {
switch op.Op {
case "Put":
kv.KVDB[op.Key] = op.Value
case "Append":
kv.KVDB[op.Key] += op.Value
}
kv.LastRequestMap[op.ClientId] = op.RequestId
}

// 通知等待的 Handler
if ch, ok := kv.waitChMap[msg.CommandIndex]; ok {
ch <- &op
}
}
}

为什么 Get 也要走 Raft?

线性化要求:读操作必须读到最新的已提交数据

如果 Get 不走 Raft 直接读本地:

  1. 旧 Leader 被网络隔离,它不知道自己已经不是 Leader 了
  2. 新 Leader 已经处理了新的写入
  3. Client 从旧 Leader 读到过期数据 → 违反线性化

解决方案简单粗暴:Get 也作为一条日志走 Raft。当它被 apply 时,说明此时此刻这个节点确实是 Leader 且数据是最新的。

去重:和 Lab2 一样的套路

1
2
3
4
5
6
// 同一个 Client 的请求 ID 单调递增
// 如果 requestId <= 已处理的最大 ID,说明是重复请求
func (kv *KVServer) isInvalidRequest(clientId, requestId int64) bool {
lastId, ok := kv.LastRequestMap[clientId]
return ok && requestId <= lastId
}

注意:去重只对 Put/Append 生效。Get 是幂等的,重复读没关系。

Lab4B:快照

当 Raft 日志太大时(persister.RaftStateSize() >= maxraftstate),做一个快照:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 每次 apply 后检查是否需要快照
func (kv *KVServer) checkSnapshot(index int) {
if kv.maxraftstate == -1 { return }
if kv.persister.RaftStateSize() >= kv.maxraftstate {
// 编码当前状态
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.KVDB)
e.Encode(kv.LastRequestMap)
// 告诉 Raft 裁剪日志
kv.rf.Snapshot(index, w.Bytes())
}
}

快照包含什么?

  • KVDB:全部 KV 数据
  • LastRequestMap:去重表

这两个加在一起就是完整的状态机状态。恢复时直接加载,不需要从头 replay 日志。

Client 的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Clerk struct {
servers []*labrpc.ClientEnd
leaderId int // 缓存当前 Leader
clientId int64 // 唯一标识
requestId int64 // 单调递增
}

func (ck *Clerk) PutAppend(key, value, op string) {
ck.requestId++
args := &PutAppendArgs{
Key: key, Value: value, Op: op,
ClientId: ck.clientId, RequestId: ck.requestId,
}
for {
reply := &PutAppendReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, reply)
if ok && reply.Err == OK {
return
}
ck.leaderId = (ck.leaderId + 1) % len(ck.servers) // 切换到下一个
}
}
  • 缓存 leaderId 避免每次都轮询
  • 失败时轮转到下一个 server
  • 同一个请求的所有重试使用相同的 requestId

关键 Edge Case

1. Log 被覆盖

Leader A 在 index=5 放了 Client X 的命令,但还没 commit 就挂了。新 Leader B 在 index=5 放了 Client Y 的命令。A 恢复后收到 apply,index=5 变成了 Y 的命令。

解决:Handler 收到通知后,验证 appliedOp.ClientId == myOp.ClientId,不匹配就返回 ErrWrongLeader

2. Applier 发信号时 Handler 已经超时走了

Channel 是 buffered(size=1),所以 Applier 写入不会阻塞。Handler 超时后 channel 被删除,下次 apply 时找不到就跳过。

3. 重复 Apply

网络分区恢复后,旧 Leader 的 Raft 可能 apply 一些已经在新 Leader 上 apply 过的命令。LastRequestMap 确保不会重复执行。

整体数据流总结

1
2
3
4
5
6
7
8
9
10
11
12
┌────────┐    ┌─────────────────────────────────────────────┐
│ Client │───→│ Handler: rf.Start(op) │
│ │ │ ↓ │
│ │ │ Raft: replicate to majority │
│ │ │ ↓ │
│ │ │ applyCh → Applier: execute + notify waitCh │
│ │←───│ │
└────────┘ └─────────────────────────────────────────────┘

去重保证:at-most-once
快照保证:bounded memory
线性化保证:所有操作(含 Get)经过 Raft 排序