MIT 6.5840 Lab5:Sharded KV

顺序:Lab1 MapReduce / Lab2 KV Server / Lab3 Raft / Lab4 KV over Raft / Lab5 Sharded KV

Sharded KV 分片架构图
Lab5 完整架构:ShardCtrler 管理分片分配,多个 Raft Group 各负责一部分 Shard,配置变更时触发迁移。

一句话总结

Lab5 把单一 Raft Group 扩展为多个 Raft Group,每个 Group 负责一部分 Key(分片),并且支持动态迁移——加减节点时数据能自动搬家。

为什么要做这个

Lab4 的 KV 服务有一个致命问题:所有数据都在一个 Raft Group 里

  • 写入瓶颈:所有写都要经过唯一的 Leader
  • 存储瓶颈:所有数据存在每台机器上
  • 扩展方式:只能加副本(提高容错),不能加容量

分片(Sharding)解决了这个问题:把 key space 分成 N 份,每份由不同的 Group 负责。

整体架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────────────────────────────────────────────────┐
│ ShardCtrler │
│ (管理 Shard → Group 的映射关系) │
│ ┌─────────────────────────────────────────┐ │
│ │ Config: Shard[0..9] → Group ID │ │
│ │ [G1, G1, G1, G2, G2, G2, G3, G3, G3, G3]│ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
↕ Query/Join/Leave
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Group 1 │ │ Group 2 │ │ Group 3 │
│ (Raft×3) │ │ (Raft×3) │ │ (Raft×3) │
│ Shard 0-2 │ │ Shard 3-5 │ │ Shard 6-9 │
└──────────┘ └──────────┘ └──────────┘
  • ShardCtrler:集中式配置管理,本身也是 Raft 复制的
  • Group:每个 Group 是一个独立的 Raft 集群,负责若干 Shard
  • Shard:key 通过 key[0] % NShards 映射到 Shard 号

Lab5A:ShardCtrler

ShardCtrler 管理配置的变迁历史:

1
2
3
4
5
type Config struct {
Num int // 配置版本号
Shards [10]int // Shard → Group ID 的映射
Groups map[int][]string // Group ID → server 列表
}

四个操作:

操作 作用 触发 Rebalance?
Join(groups) 新增 Group
Leave(gids) 移除 Group
Move(shard, gid) 手动指定某 Shard 归属
Query(num) 查询某版本配置

Rebalance 算法

目标:让每个 Group 负责的 Shard 数量尽可能均匀(差值 ≤ 1)。

1
2
3
4
5
func adjustShards(shards *[10]int, groups map[int][]string) {
// 1. 统计每个 Group 当前负责几个 Shard
// 2. 排序 Group(保证确定性!)
// 3. 循环:从最多的 Group 拿一个给最少的,直到差值 ≤ 1
}

关键细节:排序保证确定性。Go 的 map 迭代顺序是随机的,如果不排序,不同副本可能算出不同的分配结果!

Lab5B:Sharded KV

这是整个课程最复杂的 Lab。核心挑战:配置变更时,数据如何安全迁移?

分片四状态机

每个 Shard 在每个 Group 上有四种状态:

1
2
3
4
5
6
7
8
9
              Config 变更
新 Owner
Serving ──────────────→ Pulling ──────────→ GCing ──────────→ Serving
拉数据 通知旧 Owner 删除

Config 变更
旧 Owner
Serving ──────────────→ BePulling ─────────→ (数据被删除)
等待新 Owner 来拉

状态转换的含义:

  • Serving:正常服务,接受 Client 请求
  • Pulling:我是新 Owner,正在从旧 Owner 拉数据,暂不接受该 Shard 的请求
  • BePulling:我是旧 Owner,等待新 Owner 来拉我的数据
  • GCing:数据已拉到,但还没通知旧 Owner 删除

为什么用 Pull 而不是 Push?

如果旧 Owner 主动 Push:

  • 旧 Owner 挂了 → 数据卡在半路,新 Owner 不知道什么时候能收到
  • 需要额外的重试和确认机制

如果新 Owner 主动 Pull:

  • 旧 Owner 挂了 → 新 Owner 重试即可
  • 主动权在需要数据的一方,逻辑更简单

三个后台 Monitor

只在 Leader 上运行的三个 goroutine,驱动整个迁移流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Monitor 1:拉取新配置
func (kv *ShardKV) monitorRequestConfig() {
// 前提:所有 Shard 都是 Serving 状态
// 动作:向 ShardCtrler 查询 config N+1
// 结果:通过 Raft 提交 AddConfig 命令
}

// Monitor 2:拉取 Shard 数据
func (kv *ShardKV) monitorInsert() {
// 前提:存在 Pulling 状态的 Shard
// 动作:向旧 Owner 发 GetShards RPC
// 结果:通过 Raft 提交 InsertShard 命令
}

// Monitor 3:通知旧 Owner 清理
func (kv *ShardKV) monitorGC() {
// 前提:存在 GCing 状态的 Shard
// 动作:向旧 Owner 发 DeleteShards RPC
// 结果:通过 Raft 提交 AdjustShardState 命令
}

配置升级必须逐步进行

核心约束:所有 Shard 必须全部处于 Serving 状态,才能开始拉取下一个配置。

1
2
3
4
5
6
7
8
func (kv *ShardKV) allShardsServing() bool {
for _, shard := range kv.shards {
if shard.State != Serving {
return false
}
}
return true
}

为什么?假设跳过了中间配置:

  • Config 1: Shard 3 在 Group A
  • Config 2: Shard 3 从 A → B
  • Config 3: Shard 3 从 B → C

如果 C 直接从 Config 1 跳到 Config 3,它去找 A 拉数据。但 A 可能已经按 Config 2 把数据给 B 了!

迁移的完整流程

以 Shard 3 从 Group A → Group B 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
时间轴 →

Group A Group B
────────────────────────────────────────────
Serving(Shard 3) (无 Shard 3)
│ │
│ 新 Config 到来 │
▼ ▼
BePulling(Shard 3) Pulling(Shard 3)
│ │
│←── GetShards RPC ────────│ monitorInsert 发起
│──── 返回数据 ───────────→│
│ ▼
│ GCing(Shard 3) ← 数据已到位
│ │
│←── DeleteShards RPC ─────│ monitorGC 发起
▼ ▼
(清除 Shard 3) Serving(Shard 3) ← 迁移完成!

去重表也要跟着迁移

1
2
3
4
type ShardData struct {
KVData map[string]string // Shard 的 KV 数据
LastRequestMap map[int64]int64 // 该 Shard 的去重表
}

为什么?假设 Client X 的 Append 请求被 Group A 执行了。Shard 迁移到 Group B 后,如果 Client X 重试(因为没收到回复),Group B 必须知道这个请求已经执行过。

合并去重表时取 max:

1
2
3
4
5
for clientId, requestId := range incomingLastRequestMap {
if requestId > kv.LastRequestMap[clientId] {
kv.LastRequestMap[clientId] = requestId
}
}

所有内部操作走 Raft

配置变更、Shard 插入、Shard 删除、状态调整——这些全部作为命令通过 Raft 提交:

1
2
3
4
type Command struct {
Type string // "AddConfig" | "InsertShard" | "DeleteShard" | "AdjustShardState" | "Get" | "Put" | "Append"
Data interface{}
}

这保证了 Group 内所有副本看到完全相同的状态变迁序列。如果只在 Leader 本地执行这些操作,Leader 切换后新 Leader 的状态可能和 Follower 不一致。

Client 的视角

Client 完全不需要知道迁移的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (ck *Clerk) Get(key string) string {
shard := key2shard(key)
for {
gid := ck.config.Shards[shard]
servers := ck.config.Groups[gid]
for _, srv := range servers {
reply := srv.Call("ShardKV.Get", args)
if reply.Err == OK {
return reply.Value
}
if reply.Err == ErrWrongGroup {
break // 去找新配置
}
}
// 刷新配置重试
ck.config = ck.sm.Query(-1)
}
}

收到 ErrWrongGroup 时,Client 向 ShardCtrler 查询最新配置,然后重试。整个迁移过程对 Client 几乎透明。

这个 Lab 的核心收获

  1. 分片是水平扩展的基础:把数据按 key 分到不同机器,每台只负责一部分
  2. 配置变更必须序列化:不能跳配置,否则数据可能找错 Owner
  3. Pull 优于 Push:主动权在需求方更容易实现容错
  4. 内部操作也要共识:状态变更必须经过 Raft,保证副本一致
  5. 去重表是业务状态的一部分:迁移数据时必须同步迁移去重表

这 5 个 Lab 从 MapReduce 走到 Sharded KV,构建了一个完整的分布式系统知识栈。每一层都在解决上一层遗留的问题,最终形成一个可扩展、容错、强一致的分布式 KV 服务。