WarYan's Blog

记录成长历程

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

Sharded KV 分片架构图
Lab5 完整架构:ShardCtrler 管理分片分配,多个 Raft Group 各负责一部分 Shard,配置变更时触发迁移。

一句话总结

Lab5 把单一 Raft Group 扩展为多个 Raft Group,每个 Group 负责一部分 Key(分片),并且支持动态迁移——加减节点时数据能自动搬家。

为什么要做这个

Lab4 的 KV 服务有一个致命问题:所有数据都在一个 Raft Group 里

  • 写入瓶颈:所有写都要经过唯一的 Leader
  • 存储瓶颈:所有数据存在每台机器上
  • 扩展方式:只能加副本(提高容错),不能加容量

分片(Sharding)解决了这个问题:把 key space 分成 N 份,每份由不同的 Group 负责。

整体架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────────────────────────────────────────────────┐
│ ShardCtrler │
│ (管理 Shard → Group 的映射关系) │
│ ┌─────────────────────────────────────────┐ │
│ │ Config: Shard[0..9] → Group ID │ │
│ │ [G1, G1, G1, G2, G2, G2, G3, G3, G3, G3]│ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
↕ Query/Join/Leave
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Group 1 │ │ Group 2 │ │ Group 3 │
│ (Raft×3) │ │ (Raft×3) │ │ (Raft×3) │
│ Shard 0-2 │ │ Shard 3-5 │ │ Shard 6-9 │
└──────────┘ └──────────┘ └──────────┘
  • ShardCtrler:集中式配置管理,本身也是 Raft 复制的
  • Group:每个 Group 是一个独立的 Raft 集群,负责若干 Shard
  • Shard:key 通过 key[0] % NShards 映射到 Shard 号

Lab5A:ShardCtrler

ShardCtrler 管理配置的变迁历史:

1
2
3
4
5
type Config struct {
Num int // 配置版本号
Shards [10]int // Shard → Group ID 的映射
Groups map[int][]string // Group ID → server 列表
}

四个操作:

操作 作用 触发 Rebalance?
Join(groups) 新增 Group
Leave(gids) 移除 Group
Move(shard, gid) 手动指定某 Shard 归属
Query(num) 查询某版本配置

Rebalance 算法

目标:让每个 Group 负责的 Shard 数量尽可能均匀(差值 ≤ 1)。

1
2
3
4
5
func adjustShards(shards *[10]int, groups map[int][]string) {
// 1. 统计每个 Group 当前负责几个 Shard
// 2. 排序 Group(保证确定性!)
// 3. 循环:从最多的 Group 拿一个给最少的,直到差值 ≤ 1
}

关键细节:排序保证确定性。Go 的 map 迭代顺序是随机的,如果不排序,不同副本可能算出不同的分配结果!

Lab5B:Sharded KV

这是整个课程最复杂的 Lab。核心挑战:配置变更时,数据如何安全迁移?

分片四状态机

每个 Shard 在每个 Group 上有四种状态:

1
2
3
4
5
6
7
8
9
              Config 变更
新 Owner
Serving ──────────────→ Pulling ──────────→ GCing ──────────→ Serving
拉数据 通知旧 Owner 删除

Config 变更
旧 Owner
Serving ──────────────→ BePulling ─────────→ (数据被删除)
等待新 Owner 来拉

状态转换的含义:

  • Serving:正常服务,接受 Client 请求
  • Pulling:我是新 Owner,正在从旧 Owner 拉数据,暂不接受该 Shard 的请求
  • BePulling:我是旧 Owner,等待新 Owner 来拉我的数据
  • GCing:数据已拉到,但还没通知旧 Owner 删除

为什么用 Pull 而不是 Push?

如果旧 Owner 主动 Push:

  • 旧 Owner 挂了 → 数据卡在半路,新 Owner 不知道什么时候能收到
  • 需要额外的重试和确认机制

如果新 Owner 主动 Pull:

  • 旧 Owner 挂了 → 新 Owner 重试即可
  • 主动权在需要数据的一方,逻辑更简单

三个后台 Monitor

只在 Leader 上运行的三个 goroutine,驱动整个迁移流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Monitor 1:拉取新配置
func (kv *ShardKV) monitorRequestConfig() {
// 前提:所有 Shard 都是 Serving 状态
// 动作:向 ShardCtrler 查询 config N+1
// 结果:通过 Raft 提交 AddConfig 命令
}

// Monitor 2:拉取 Shard 数据
func (kv *ShardKV) monitorInsert() {
// 前提:存在 Pulling 状态的 Shard
// 动作:向旧 Owner 发 GetShards RPC
// 结果:通过 Raft 提交 InsertShard 命令
}

// Monitor 3:通知旧 Owner 清理
func (kv *ShardKV) monitorGC() {
// 前提:存在 GCing 状态的 Shard
// 动作:向旧 Owner 发 DeleteShards RPC
// 结果:通过 Raft 提交 AdjustShardState 命令
}

配置升级必须逐步进行

核心约束:所有 Shard 必须全部处于 Serving 状态,才能开始拉取下一个配置。

1
2
3
4
5
6
7
8
func (kv *ShardKV) allShardsServing() bool {
for _, shard := range kv.shards {
if shard.State != Serving {
return false
}
}
return true
}

为什么?假设跳过了中间配置:

  • Config 1: Shard 3 在 Group A
  • Config 2: Shard 3 从 A → B
  • Config 3: Shard 3 从 B → C

如果 C 直接从 Config 1 跳到 Config 3,它去找 A 拉数据。但 A 可能已经按 Config 2 把数据给 B 了!

迁移的完整流程

以 Shard 3 从 Group A → Group B 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
时间轴 →

Group A Group B
────────────────────────────────────────────
Serving(Shard 3) (无 Shard 3)
│ │
│ 新 Config 到来 │
▼ ▼
BePulling(Shard 3) Pulling(Shard 3)
│ │
│←── GetShards RPC ────────│ monitorInsert 发起
│──── 返回数据 ───────────→│
│ ▼
│ GCing(Shard 3) ← 数据已到位
│ │
│←── DeleteShards RPC ─────│ monitorGC 发起
▼ ▼
(清除 Shard 3) Serving(Shard 3) ← 迁移完成!

去重表也要跟着迁移

1
2
3
4
type ShardData struct {
KVData map[string]string // Shard 的 KV 数据
LastRequestMap map[int64]int64 // 该 Shard 的去重表
}

为什么?假设 Client X 的 Append 请求被 Group A 执行了。Shard 迁移到 Group B 后,如果 Client X 重试(因为没收到回复),Group B 必须知道这个请求已经执行过。

合并去重表时取 max:

1
2
3
4
5
for clientId, requestId := range incomingLastRequestMap {
if requestId > kv.LastRequestMap[clientId] {
kv.LastRequestMap[clientId] = requestId
}
}

所有内部操作走 Raft

配置变更、Shard 插入、Shard 删除、状态调整——这些全部作为命令通过 Raft 提交:

1
2
3
4
type Command struct {
Type string // "AddConfig" | "InsertShard" | "DeleteShard" | "AdjustShardState" | "Get" | "Put" | "Append"
Data interface{}
}

这保证了 Group 内所有副本看到完全相同的状态变迁序列。如果只在 Leader 本地执行这些操作,Leader 切换后新 Leader 的状态可能和 Follower 不一致。

Client 的视角

Client 完全不需要知道迁移的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (ck *Clerk) Get(key string) string {
shard := key2shard(key)
for {
gid := ck.config.Shards[shard]
servers := ck.config.Groups[gid]
for _, srv := range servers {
reply := srv.Call("ShardKV.Get", args)
if reply.Err == OK {
return reply.Value
}
if reply.Err == ErrWrongGroup {
break // 去找新配置
}
}
// 刷新配置重试
ck.config = ck.sm.Query(-1)
}
}

收到 ErrWrongGroup 时,Client 向 ShardCtrler 查询最新配置,然后重试。整个迁移过程对 Client 几乎透明。

这个 Lab 的核心收获

  1. 分片是水平扩展的基础:把数据按 key 分到不同机器,每台只负责一部分
  2. 配置变更必须序列化:不能跳配置,否则数据可能找错 Owner
  3. Pull 优于 Push:主动权在需求方更容易实现容错
  4. 内部操作也要共识:状态变更必须经过 Raft,保证副本一致
  5. 去重表是业务状态的一部分:迁移数据时必须同步迁移去重表

这 5 个 Lab 从 MapReduce 走到 Sharded KV,构建了一个完整的分布式系统知识栈。每一层都在解决上一层遗留的问题,最终形成一个可扩展、容错、强一致的分布式 KV 服务。

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

KV over Raft 请求路径架构图
Lab4 的请求路径:Client → RPC Handler → Raft → Apply → KVDB → 回复 Client。

一句话总结

Lab4 在 Raft 之上搭建一个线性化的 KV 服务——Client 看到的效果就像操作一台机器,但背后是多台机器在同步。

为什么要做这个

Lab3 给了我们一个”日志一致”的引擎,但它只管日志顺序,不管业务。Lab4 要把它变成真正能用的 KV 服务:

  • Client 发 Get/Put/Append → 多数副本确认 → 回复 Client
  • Leader 挂了自动切换,Client 无感知
  • 日志太长了自动做快照

核心架构:两个世界的桥梁

Lab4 的本质就是在”KV 业务世界”和”Raft 共识世界”之间搭一座桥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────────────────────────────────┐
│ KV Server │
│ │
│ RPC Handler ──→ rf.Start(op) ──→ Raft 日志 │
│ │ │ │
│ │ 等待... │ 复制 │
│ │ ▼ │
│ │ applyCh │
│ │ │ │
│ waitCh[idx] ←── Applier ←──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ 回复 Client KVDB │
└─────────────────────────────────────────────┘

Channel-per-Index 模式

这是 Lab4 最精妙的设计。问题是:RPC Handler 怎么知道 Raft 什么时候把自己的命令 apply 了?

答案:给每个 log index 一个 channel,Handler 等在上面,Applier apply 后往里面发信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type KVServer struct {
KVDB map[string]string
waitChMap map[int]chan *Op // log index → 通知 channel
LastRequestMap map[int64]int64 // clientId → 最大已处理 requestId
}

// RPC Handler:提交到 Raft 并等待
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
op := Op{Key: args.Key, Value: args.Value, Op: args.Op,
ClientId: args.ClientId, RequestId: args.RequestId}

index, term, isLeader := kv.rf.Start(op)
if !isLeader {
reply.Err = ErrWrongLeader
return
}

ch := kv.getWaitCh(index) // 创建/获取这个 index 的 channel

select {
case appliedOp := <-ch: // 等待 Applier 通知
// 验证是不是自己的命令(可能被新 Leader 覆盖了)
if appliedOp.ClientId == op.ClientId && appliedOp.RequestId == op.RequestId {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
case <-time.After(500 * time.Millisecond): // 超时
reply.Err = ErrWrongLeader
}

kv.deleteWaitCh(index) // 清理,防止内存泄漏
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Applier:从 applyCh 读取已提交的命令,执行并通知
func (kv *KVServer) applier() {
for msg := range kv.applyCh {
op := msg.Command.(Op)

// 去重检查
if !kv.isInvalidRequest(op.ClientId, op.RequestId) {
switch op.Op {
case "Put":
kv.KVDB[op.Key] = op.Value
case "Append":
kv.KVDB[op.Key] += op.Value
}
kv.LastRequestMap[op.ClientId] = op.RequestId
}

// 通知等待的 Handler
if ch, ok := kv.waitChMap[msg.CommandIndex]; ok {
ch <- &op
}
}
}

为什么 Get 也要走 Raft?

线性化要求:读操作必须读到最新的已提交数据

如果 Get 不走 Raft 直接读本地:

  1. 旧 Leader 被网络隔离,它不知道自己已经不是 Leader 了
  2. 新 Leader 已经处理了新的写入
  3. Client 从旧 Leader 读到过期数据 → 违反线性化

解决方案简单粗暴:Get 也作为一条日志走 Raft。当它被 apply 时,说明此时此刻这个节点确实是 Leader 且数据是最新的。

去重:和 Lab2 一样的套路

1
2
3
4
5
6
// 同一个 Client 的请求 ID 单调递增
// 如果 requestId <= 已处理的最大 ID,说明是重复请求
func (kv *KVServer) isInvalidRequest(clientId, requestId int64) bool {
lastId, ok := kv.LastRequestMap[clientId]
return ok && requestId <= lastId
}

注意:去重只对 Put/Append 生效。Get 是幂等的,重复读没关系。

Lab4B:快照

当 Raft 日志太大时(persister.RaftStateSize() >= maxraftstate),做一个快照:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 每次 apply 后检查是否需要快照
func (kv *KVServer) checkSnapshot(index int) {
if kv.maxraftstate == -1 { return }
if kv.persister.RaftStateSize() >= kv.maxraftstate {
// 编码当前状态
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.KVDB)
e.Encode(kv.LastRequestMap)
// 告诉 Raft 裁剪日志
kv.rf.Snapshot(index, w.Bytes())
}
}

快照包含什么?

  • KVDB:全部 KV 数据
  • LastRequestMap:去重表

这两个加在一起就是完整的状态机状态。恢复时直接加载,不需要从头 replay 日志。

Client 的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Clerk struct {
servers []*labrpc.ClientEnd
leaderId int // 缓存当前 Leader
clientId int64 // 唯一标识
requestId int64 // 单调递增
}

func (ck *Clerk) PutAppend(key, value, op string) {
ck.requestId++
args := &PutAppendArgs{
Key: key, Value: value, Op: op,
ClientId: ck.clientId, RequestId: ck.requestId,
}
for {
reply := &PutAppendReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, reply)
if ok && reply.Err == OK {
return
}
ck.leaderId = (ck.leaderId + 1) % len(ck.servers) // 切换到下一个
}
}
  • 缓存 leaderId 避免每次都轮询
  • 失败时轮转到下一个 server
  • 同一个请求的所有重试使用相同的 requestId

关键 Edge Case

1. Log 被覆盖

Leader A 在 index=5 放了 Client X 的命令,但还没 commit 就挂了。新 Leader B 在 index=5 放了 Client Y 的命令。A 恢复后收到 apply,index=5 变成了 Y 的命令。

解决:Handler 收到通知后,验证 appliedOp.ClientId == myOp.ClientId,不匹配就返回 ErrWrongLeader

2. Applier 发信号时 Handler 已经超时走了

Channel 是 buffered(size=1),所以 Applier 写入不会阻塞。Handler 超时后 channel 被删除,下次 apply 时找不到就跳过。

3. 重复 Apply

网络分区恢复后,旧 Leader 的 Raft 可能 apply 一些已经在新 Leader 上 apply 过的命令。LastRequestMap 确保不会重复执行。

整体数据流总结

1
2
3
4
5
6
7
8
9
10
11
12
┌────────┐    ┌─────────────────────────────────────────────┐
│ Client │───→│ Handler: rf.Start(op) │
│ │ │ ↓ │
│ │ │ Raft: replicate to majority │
│ │ │ ↓ │
│ │ │ applyCh → Applier: execute + notify waitCh │
│ │←───│ │
└────────┘ └─────────────────────────────────────────────┘

去重保证:at-most-once
快照保证:bounded memory
线性化保证:所有操作(含 Get)经过 Raft 排序

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

Raft 共识算法架构
Raft 的核心:Leader 选举 + 日志复制 + 安全性保证。分 4 个子 Lab 逐步实现。

一句话总结

Lab3 实现完整的 Raft 共识算法——让多台机器对一组操作的顺序达成一致,即使部分机器挂掉也能继续工作。

为什么要做这个

单机 KV(Lab2)挂了就全部丢了。我们需要复制:把数据复制到多台机器上,挂掉少数几台也不影响服务。

但复制带来新问题:多台机器怎么保证对操作顺序的一致性?Raft 就是解决这个问题的算法。

一个比喻理解 Raft

想象一个公司:

  • Leader:CEO,所有决策由它做出
  • Follower:员工,听 CEO 的指挥
  • Candidate:CEO 挂了后竞选新 CEO 的人

规则:

  1. 正常情况下只有一个 CEO,所有命令由它发出
  2. CEO 定期发心跳,证明自己还活着
  3. 员工超时没收到心跳,就开始选举新 CEO
  4. 得到多数票才能当选

Lab3A:Leader 选举

三种角色转换

1
2
3
4
5
6
7
8
                超时
Follower ──────────────→ Candidate
↑ │
│ 发现更高 term │ 获得多数票
│ ▼
└──────────────────── Leader
心跳超时
Leader ─────→ Follower

选举超时

1
2
3
4
// 随机超时 [250ms, 400ms],避免所有节点同时发起选举(split vote)
func (rf *Raft) electionTimeout() time.Duration {
return time.Duration(250+rand.Intn(150)) * time.Millisecond
}

为什么要随机?如果所有节点超时时间相同,它们会同时发起选举,互相投票给自己,谁也选不上。

投票规则

不是谁先来就投给谁。要比较日志的新旧:

1
2
3
4
5
6
7
// 候选人的日志至少要和我一样新,我才投票给它
func isMoreUpToDate(candidateLastTerm, candidateLastIndex, myLastTerm, myLastIndex int) bool {
if candidateLastTerm != myLastTerm {
return candidateLastTerm > myLastTerm // term 大的更新
}
return candidateLastIndex >= myLastIndex // term 相同,日志长的更新
}

这保证了:新 Leader 一定包含所有已提交的日志

Term(任期)

Term 是 Raft 的”逻辑时钟”。规则很简单:

  • 每次选举,Term +1
  • 收到更高 Term 的消息 → 立即变成 Follower
  • 收到更低 Term 的消息 → 拒绝

Lab3B:日志复制

Leader 选出来了,现在它要把客户端的命令复制给所有 Follower。

复制流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Client: Start(command)


Leader 追加到本地 log

▼ 并行发送 AppendEntries RPC
┌────┼────┐
▼ ▼ ▼
F1 F2 F3 ← 各 Follower 追加到自己的 log
│ │ │
▼ ▼ ▼
回复成功给 Leader

▼ 多数回复成功
commitIndex 推进


apply 到状态机

AppendEntries 的核心参数

1
2
3
4
5
6
7
8
type AppendEntriesArgs struct {
Term int // Leader 当前 term
LeaderId int
PrevLogIndex int // 紧接新日志前的那条日志的 index
PrevLogTerm int // 紧接新日志前的那条日志的 term
Entries []LogEntry // 要追加的新日志
LeaderCommit int // Leader 的 commitIndex
}

关键是 PrevLogIndexPrevLogTerm——它们形成一个一致性检查

“我要追加的新日志,接在 index=5, term=2 的后面。你那儿 index=5 也是 term=2 吗?”

如果不匹配,说明日志有分歧,需要回退重试。

冲突优化:按 Term 跳跃

朴素做法:nextIndex 每次减 1,可能要回退很多次。优化做法:

1
2
3
4
5
6
// Follower 告诉 Leader 冲突 term 对应的第一条日志
type AppendEntriesReply struct {
Success bool
ConflictTerm int // 冲突位置的 term
ConflictIndex int // 该 term 的第一条日志 index
}

Leader 直接跳到冲突 term 的起始位置,大大减少回退次数。

Commit 规则

Leader 只能提交当前 term 的日志(Figure 8 安全性):

1
2
3
4
5
6
7
8
9
10
11
12
13
// 只有当 log[N].Term == currentTerm 时才能提交 index N
for N := rf.commitIndex + 1; N <= rf.getLastIndex(); N++ {
if rf.log[N-firstIndex].Term != rf.currentTerm {
continue
}
count := 1
for _, matchIdx := range rf.matchIndex {
if matchIdx >= N { count++ }
}
if count > len(rf.peers)/2 {
rf.commitIndex = N
}
}

Lab3C:持久化

哪些状态需要在 crash 后恢复?

状态 持久化? 原因
currentTerm 防止在同一 term 投两次票
votedFor 同上
log[] 日志是数据的唯一来源
commitIndex 可以从日志和多数派重新推导
lastApplied 重启后从头 apply
1
2
3
4
5
6
7
8
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.voteFor)
e.Encode(rf.log)
rf.persister.Save(w.Bytes(), rf.persister.ReadSnapshot())
}

规则:任何修改 currentTerm/votedFor/log 的地方都必须调用 persist()。遗漏一处就可能导致数据不一致。

Lab3D:快照

问题:日志无限增长,内存会爆。解决:定期把状态机快照保存,然后裁剪已快照的日志。

日志裁剪

1
2
3
4
裁剪前:[1] [2] [3] [4] [5] [6] [7] [8]
↑ 快照到这里
裁剪后:[5] [6] [7] [8]
↑ log[0] 存快照元信息(哨兵)

log[0] 永远是哨兵,存储 LastIncludedIndexLastIncludedTerm,真正的日志从 log[1] 开始。

InstallSnapshot

当 Follower 落后太远(需要的日志已被 Leader 裁剪),Leader 直接发送整个快照:

1
2
3
4
5
6
7
if rf.nextIndex[server] <= rf.getFirstIndex() {
// Follower 需要的日志已经被裁掉了,发快照
go rf.sendInstallSnapshot(server)
} else {
// 正常发 AppendEntries
go rf.sendAppendEntries(server)
}

Follower 收到快照后:

  1. 替换整个日志前缀
  2. 保存快照到 persister
  3. 通过 applyCh 通知上层应用快照

实现中的坑

1. Apply 的顺序保证

sync.Cond 通知 apply goroutine,保证 commitIndex 推进后立即 apply:

1
rf.commitCond.Signal()  // commitIndex 更新后

如果用 sleep 轮询,会引入延迟,测试过不了。

2. Start() 后立即心跳

测试有速度要求。Client 调用 Start() 提交命令后,必须立即触发一轮心跳,不能等到下一个 100ms 心跳周期:

1
2
3
4
5
func (rf *Raft) Start(command interface{}) (int, int, bool) {
// ... 追加日志 ...
go rf.runHeartBeats() // 立即广播
return index, term, true
}

3. Figure 8 陷阱

新 Leader 不能通过计数来提交旧 term 的日志。必须先提交一条自己 term 的日志(哪怕是空的 noop),才能间接”带动”旧日志被提交。

这个 Lab 的核心收获

Raft 本质上就做一件事:让一组机器对日志序列达成一致

  • Leader 负责决定顺序
  • 投票机制保证 Leader 有全部已提交数据
  • Term 机制保证不会出现”脑裂”
  • 持久化保证 crash 后不会”忘事”
  • 快照保证日志不会无限增长

理解了这些,Lab4 就是在 Raft 之上搭一个 KV 服务而已。

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

KV Server 请求处理流程
Lab2 的核心:Client 请求 → Server 去重检查 → 执行/返回缓存 → Client 确认清理。

一句话总结

Lab2 在单个服务器上实现一个 KV 存储,核心挑战是在不可靠网络下保证每个请求最多执行一次(at-most-once)。

为什么要做这个

网络是不可靠的。Client 发了一个 Append("balance", "+100") 请求:

  • Server 执行了,回复丢了 → Client 重试 → 余额多加了 100!
  • 这就是”exactly-once”问题。

Lab2 教你如何用请求去重解决这个问题,这个模式会贯穿后面所有 Lab。

核心问题:重复请求

1
2
3
4
5
6
Client                    Server
│── Put("x", "1") ────→ │ ✓ 执行,x="1"
│ │
│←── OK ─────────────── │ ← 这个回复丢了!
│ │
│── Put("x", "1") ────→ │ ← Client 重试,Server 怎么知道这是重复的?

如果 Server 不做任何处理,第二次请求会被当作新请求执行。对于 Put 还好(幂等),但对于 Append 就会重复追加。

解决方案:去重表

每个请求带一个全局唯一的 ID。Server 维护一张表记录已处理的请求:

1
2
3
4
5
type KVServer struct {
mu sync.Mutex
data map[string]string // KV 存储
request map[int64]string // 去重表:requestId -> 上次结果
}

处理流程:

  1. 收到请求,先查 request[id]
  2. 如果存在 → 直接返回缓存结果,不再执行
  3. 如果不存在 → 执行操作,缓存结果

但是——内存会爆炸

去重表会无限增长!每个成功的请求都缓存着。解决方案是 Client 确认机制

1
2
3
4
5
6
7
// Client 端:收到成功响应后,显式告诉 Server "我收到了,你可以清缓存了"
func (ck *Clerk) PutAppend(key, value, op string) string {
id := atomic.AddInt64(&globalId, 1)
// ... 重试直到成功 ...
ck.Finish(id) // 关键:通知 Server 删除缓存
return result
}
1
2
3
4
5
6
// Server 端
func (kv *KVServer) Finish(args *FinishArgs, reply *FinishReply) {
kv.mu.Lock()
delete(kv.request, args.Id) // 释放内存
kv.mu.Unlock()
}

三种操作的处理

操作 幂等? 需要去重? 说明
Get 读操作天然幂等,重复执行无影响
Put 否(但实际做了) 覆盖写,结果相同
Append 追加写,重复执行会多加一次

严格来说只有 Append 需要去重,但实现中对 Put/Append 统一去重简化了逻辑。

Client 的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Clerk struct {
server *labrpc.ClientEnd
}

func (ck *Clerk) PutAppend(key, value, op string) string {
id := atomic.AddInt64(&id, 1) // 全局唯一 ID
args := &PutAppendArgs{Key: key, Value: value, Op: op, Id: id}

for { // 无限重试直到成功
reply := &PutAppendReply{}
ok := ck.server.Call("KVServer.PutAppend", args, reply)
if ok {
ck.Finish(id) // 确认收到
return reply.Value
}
// 网络失败,带着相同 ID 重试
}
}

关键点:同一个请求的所有重试使用相同的 ID。这样 Server 就能识别重复。

时序图:完整的请求生命周期

1
2
3
4
5
6
7
8
9
Client                         Server
│ │
│── PutAppend(id=42) ────────→ │ 查 request[42]:不存在
│ │ 执行 Append,缓存 request[42]=result
│←── OK(result) ──────────── │
│ │
│── Finish(id=42) ──────────→ │ delete(request[42])
│←── OK ─────────────────── │
│ │

如果第一步的回复丢了:

1
2
3
4
5
6
7
8
9
Client                         Server
│ │
│── PutAppend(id=42) ────────→ │ 执行,缓存 request[42]=result
│←── OK(result) ──── ✗ 丢了 │
│ │
│── PutAppend(id=42) ────────→ │ 查 request[42]:已存在!
│←── result(缓存值)──────── │ 不重复执行
│ │
│── Finish(id=42) ──────────→ │ 清除缓存

这个 Lab 建立的思维模式

  1. 不可靠网络是常态:任何 RPC 都可能丢失、延迟、重复
  2. 去重是幂等的基础:非幂等操作必须有去重机制
  3. Client 和 Server 要协作:只靠一方无法解决问题

后面的 Lab3/4/5 都会复用这套模式:ClientId + RequestId + 去重表。区别只是 Server 从单机变成了分布式。

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

MapReduce 系统架构图
Lab1 MapReduce 整体架构:Coordinator 分发任务,Worker 执行 Map/Reduce,通过文件系统传递中间数据。

一句话总结

Lab1 实现一个分布式 MapReduce 框架:一个 Coordinator 协调多个 Worker,把大任务拆成小任务并行执行,还要处理 Worker 挂掉的情况。

为什么要做这个

MapReduce 是理解分布式计算的第一步。它回答一个核心问题:如何把一个大的计算任务分给多台机器去做,还能容忍某些机器挂掉?

你学到的思维模式——任务分发、超时检测、原子写入——在后面每个 Lab 都会反复出现。

核心角色

Coordinator(协调器)

Coordinator 是”老板”,它不干活,只管分配和追踪:

1
2
3
4
5
6
7
8
type Master struct {
files []string // 输入文件列表
nReduce int // Reduce 任务数
mapfinished int // 已完成的 Map 数
reducefinished int // 已完成的 Reduce 数
maptasklog []int // Map 任务状态:0=未分配, 1=等待中, 2=已完成
reducetasklog []int // Reduce 任务状态
}

它暴露 RPC 接口让 Worker 调用:

  • AllocateTask:Worker 来要任务,返回任务类型和文件信息
  • ReceiveFinishedMap/Reduce:Worker 报告完成

Worker(工作节点)

Worker 是”打工人”,它的生命周期就是一个循环:

1
请求任务 → 执行 → 报告完成 → 请求下一个任务 → ...

执行流程详解

Map 阶段

1
2
3
4
5
6
7
8
9
10
11
12
输入文件 pg-*.txt


┌─────────────┐
│ Map Worker │ ← 读取一个输入文件
│ mapf(k, v) │ ← 用户提供的 Map 函数
└─────────────┘

▼ 按 hash(key) % nReduce 分桶
┌──┬──┬──┬──┐
│ 0│ 1│ 2│...│ ← 中间文件 mr-X-Y
└──┴──┴──┴──┘

每个 Map Worker:

  1. 读取分配到的输入文件
  2. 调用用户的 mapf 函数产生 key-value 对
  3. hash(key) % nReduce 把结果分成 nReduce 个桶
  4. 写入中间文件 mr-X-Y(X 是 Map 任务号,Y 是桶号)

Reduce 阶段

1
2
3
4
5
6
7
8
9
10
中间文件 mr-*-Y(所有属于桶 Y 的文件)


┌───────────────┐
│ Reduce Worker │ ← 收集同一桶的所有中间文件
│ reducef(k, vs) │ ← 用户提供的 Reduce 函数
└───────────────┘


输出文件 mr-out-Y

每个 Reduce Worker:

  1. 收集所有 mr-*-Y 文件(同一桶号的所有 Map 输出)
  2. 按 key 排序
  3. 对相同 key 的所有 value 调用用户的 reducef 函数
  4. 写入最终输出 mr-out-Y

关键设计决策

1. 容错:10 秒超时重分配

Worker 随时可能挂掉。Coordinator 的策略很简单:

1
2
3
4
5
6
7
8
go func(taskId int) {
time.Sleep(10 * time.Second)
mu.Lock()
if maptasklog[taskId] == 1 { // 还在等待中
maptasklog[taskId] = 0 // 重置为未分配
}
mu.Unlock()
}()

分配任务时启动一个 goroutine,10 秒后如果任务还没完成,就标记为未分配,下次有 Worker 来要任务时会重新分配。

2. 原子写入:防止部分输出

如果 Worker 写到一半挂了,文件里只有半截数据怎么办?答案是先写临时文件,再原子重命名

1
2
3
4
tmpFile, _ := os.CreateTemp("", "mr-tmp-*")
// ... 写入所有数据 ...
tmpFile.Close()
os.Rename(tmpFile.Name(), finalName) // 原子操作

os.Rename 在 Unix 上是原子的——要么整个文件出现,要么什么都没有。不存在”半个文件”的状态。

3. 阶段隔离:Map 全部完成后才开始 Reduce

Coordinator 维护阶段状态:

  • mapfinished < len(files) → 只分配 Map 任务
  • mapfinished == len(files)reducefinished < nReduce → 分配 Reduce 任务
  • 全部完成 → 返回 Done

这确保 Reduce Worker 不会读到不完整的中间数据。

我的实现要点

回看 src/mr/ 目录下的代码:

文件 职责
coordinator.go Coordinator 逻辑:任务分配、超时检测、完成追踪
worker.go Worker 循环:请求→执行→报告
rpc.go RPC 请求/响应结构体定义

整个 Lab 代码量不大(~200 行核心逻辑),但建立了后续所有 Lab 的基础思维:无状态 Worker + 有状态 Coordinator + 幂等操作 + 超时容错

面试常问

  • MapReduce 如何容错? 超时重分配 + 原子写入 + 幂等 Reduce
  • 为什么 Map 和 Reduce 不能并行? Reduce 依赖 Map 的全部中间输出
  • 中间文件命名 mr-X-Y 的含义? X=Map任务号, Y=Reduce分区号
  • 瓶颈在哪? 单 Coordinator 是瓶颈,所有调度决策都经过它

来源:本地 README.mdtinykv-understanding/README.mddoc/reading_list.md,以及我自己做 MIT 6.5840 和 TinyKV 后的对比笔记。官方 Project 文档只做引用,不整段搬运。

最近把 MIT 6.5840 和 TinyKV 都重新梳理了一遍。它们看起来都在写 KV、Raft、分片和容错,但真正做下来会发现,二者的训练目标很不一样。

MIT 6.5840 更像分布式系统正确性的训练营。它关心的是:在 RPC 丢包、乱序、重试、网络分区、节点重启这些条件下,系统怎样还能给出正确结果。TinyKV 更像分布式数据库存储层的缩小版。它关心的是:数据如何落到本地存储,Raft 如何接入真实的 raftstore,Region 如何分裂和调度,事务如何通过 MVCC 和 Percolator 协议实现。

一句话概括:

1
2
MIT 6.5840 训练的是分布式系统底层正确性。
TinyKV 训练的是把这些正确性能力放进数据库存储层。

先用一张图把两条路线摆在一起:

MIT 6.5840 和 TinyKV 的整体对比

也可以用 Mermaid 把它抽象成两条学习路径:



flowchart LR
    ROOT["两条路线都在训练 KV + Raft,但落点不同"]

    ROOT --> MIT0["MIT 6.5840<br/>系统正确性"]
    MIT0 --> RAFT["Raft / KVRaft<br/>复制状态机"]
    RAFT --> SHARD["ShardKV<br/>分片迁移"]

    ROOT --> TINY0["TinyKV<br/>数据库存储层"]
    TINY0 --> RK["RaftKV / Multi-Raft<br/>Region + raftstore"]
    RK --> TXN["Transactions<br/>MVCC / Percolator"]

    RAFT -. "共识底座" .-> RK
    SHARD -. "分片思想" .-> RK

阅读全文 »

来源:本地 TinyKV 项目文件:tinykv-understanding/labs/lab1-standalonekv.md
顺序:MIT 6.5840 和 TinyKV / Lab1 / Lab2 / Lab3 / Lab3B / Lab4 / 面经索引

TinyKV Lab1 请求路径
Lab1 把 Raw KV 请求直接映射到底层 BadgerDB。

Lab1 看起来轻,但它决定了后面几层怎么和本地存储说话。先把 Raw API、Storage.Reader/Write 和 CF 这几个点讲清楚,后面的 Raft apply 和 MVCC 才有地方落。

官方页面:https://yunpengn.github.io/tinykv/doc/project1-StandaloneKV.html

一句话

Lab1 要做一个单机版 KV 服务。

它还没有 Raft、没有多副本、没有事务。客户端发 RawGetRawPutRawDeleteRawScan 请求,服务端直接读写本地的 Badger 数据库。

阅读全文 »

来源:本地 TinyKV 项目文件:tinykv-understanding/labs/lab2-raftkv.md
顺序:MIT 6.5840 和 TinyKV / Lab1 / Lab2 / Lab3 / Lab3B / Lab4 / 面经索引

TinyKV Lab2 RaftKV 写入流程
写请求先进入 Raft 日志,commit 之后再 apply 到 BadgerDB。

Lab2 的重点不是“多套一层 Raft”这么简单。真正要想明白的是:共识算法只负责排出一个确定顺序,上层还要把日志持久化、发消息、apply 和 callback 串起来。顺序错了,崩溃恢复时就会露馅。

官方页面:https://yunpengn.github.io/tinykv/doc/project2-RaftKV.html

一句话

Lab2 要把 Lab1 的单机 KV,升级成基于 Raft 的多副本 KV。

Lab1 是:

1
请求 -> 直接写本地 Badger

Lab2 变成:

1
请求 -> 先写 Raft 日志 -> 多数副本确认 -> 再写 Badger

更完整一点,写请求会走这条路:

1
2
3
4
5
6
7
8
客户端请求
-> leader
-> 变成一条 Raft log
-> 复制到多数副本
-> commit
-> apply 到状态机
-> 写入 Badger
-> 返回结果

这样做的目的很简单:只要大多数节点还活着,服务就能继续工作,而且多个副本的数据不会乱。

更直观地说,Lab2 解决的是 Lab1 的两个问题:

1
2
3
4
5
问题 1:只有一台机器,挂了就没服务。
解决:复制到多台机器。

问题 2:多台机器各写各的,数据会乱。
解决:用 Raft 规定所有机器执行同一串操作。

Raft log 可以先理解成“操作流水账”:

1
2
3
log[1] = Put name Tom
log[2] = Put age 18
log[3] = Delete city

BadgerDB 是执行流水账后的最终结果:

1
2
3
name = Tom
age = 18
city 不存在

所以 Lab2 的关键不是“怎么把 value 写进硬盘”,而是:

1
2
所有副本怎样先同意这条操作排在第几位,
然后再按同样顺序写进各自的硬盘。
阅读全文 »

来源:本地 TinyKV 项目文件:tinykv-understanding/labs/lab3-multiraftkv.md
顺序:MIT 6.5840 和 TinyKV / Lab1 / Lab2 / Lab3 / Lab3B / Lab4 / 面经索引

TinyKV Lab3 Store Peer Region 关系
Store、Peer、Region 和 Scheduler 的关系。

Lab3 开始才真正像一个分布式存储系统。单个 Raft group 只能让数据更可靠,不能让容量和吞吐横向扩展;Region 和 Multi-Raft 解决的是“把不同 key range 分给不同 Raft group”这件事。

官方页面:https://yunpengn.github.io/tinykv/doc/project3-MultiRaftKV.html

一句话

Lab3 要把“一个 Raft 组管理全部 key”,升级成“多个 Raft 组分别管理不同 key 范围”。

Lab2 是:

1
全部 key -> 一个 Region -> 一个 Raft 组

Lab3 变成:

1
2
3
[a, h) -> Region 1 -> Raft 组 1
[h, p) -> Region 2 -> Raft 组 2
[p, z) -> Region 3 -> Raft 组 3

这样数据量大了以后,就可以把不同范围的数据分摊到不同节点上。

从 Lab2 到 Lab3,可以先看这张图:



graph LR
    subgraph Lab2["Lab2:单 Raft 组"]
        AllKeys["全部 key<br/>[空, 空)"] --> OneRegion["Region 1"]
        OneRegion --> OneRaft["一个 Raft 组"]
    end

    subgraph Lab3["Lab3:Multi-Raft"]
        R1["Region 1<br/>[空, h)"] --> G1["Raft 组 1"]
        R2["Region 2<br/>[h, p)"] --> G2["Raft 组 2"]
        R3["Region 3<br/>[p, 空)"] --> G3["Raft 组 3"]
    end

    OneRaft --> R1
    OneRaft --> R2
    OneRaft --> R3

Lab3 的目标不是“把 Raft 换掉”,而是让系统里同时跑很多个 Raft 组。每个 Raft 组只负责一段 key。

阅读全文 »

来源:本地 TinyKV 项目文件:tinykv-understanding/labs/lab3b-split-heartbeat-difficulty.md
顺序:MIT 6.5840 和 TinyKV / Lab1 / Lab2 / Lab3 / Lab3B / Lab4 / 面经索引

TinyKV Lab3B split 状态收敛
Region split 需要先经 Raft commit,再更新本地元信息。

Lab3B 难在状态收敛。配置变更、用户请求、split 后的路由、scheduler heartbeat 都在改同一批 Region 元信息;如果顺序没有讲清楚,很多 bug 看起来像偶现,实际上是状态机边界没守住。

这份记录整理的是我们在 Lab3B 实现 Region SplitChangePeer、snapshot recovery 相关逻辑时遇到的几个问题。它们表面上出现在不同测试里,但本质都和 raftstore 在 split/conf change 之后的状态收敛有关。

问题目录

编号 问题 根因总结 典型现象 修复位置
1 split 后 scheduler 短暂找不到右半边 region split apply 后只立即上报了 left region,right region 依赖后续异步 heartbeat,导致 scheduler 暂时出现 range gap panic: find no region for "3 00000000" applySplit 中同时上报 left/right
2 被移除的 peer 继续 apply 后续 committed entries RemoveNode 删除自己后 peer 已经 destroy,但同一个 Ready 中剩余 committed entries 仍被继续 apply,可能破坏 raft/apply 状态一致性 unexpected raft log index: lastIndex 0 < appliedIndex ... HandleRaftReady 每次 applyEntry 后检查 d.stopped
3 用 left region 访问 right key 时没有稳定返回 KeyNotInRegion 普通 KV 请求只在 apply 阶段检查 key range,请求已经进入 Raft 后才发现越界,错误返回受 commit/apply 时序影响 TestOneSplit3B 中 expected KeyNotInRegion,但 header error 为 nil preProposeRaftCommand 对普通 KV 请求提前检查 key range
阅读全文 »
0%