百亿级关系链系统架构设计:从单表到分布式的演进之路

内容分享2小时前发布
0 0 0

百亿级关系链系统架构设计:从单表到分布式的演进之路

引言:一个被误解的问题

当我们谈论”百亿级关系链系统”时,很多人会认为核心挑战是”查询慢”。但真相是:查询慢不是问题,单机容量不够才是问题

想象一下微博的场景:5亿用户,每人平均200个关注关系,总计1000亿条数据。这些数据如果放在一张MySQL表里,会发生什么?


单表存储1000亿条记录:
- 数据文件:~50TB
- 索引文件:~20TB
- 总容量:~70TB

现实:单机MySQL最佳实践 < 1TB
差距:70倍

这篇文章将带你理解:从单表到分布式系统,架构演进的每一步都是被逼出来的,而数据冗余是分库分表带来的必然代价


第一部分:关系链的业务模型

1.1 两种关系类型

弱关系(单向关注)

典型场景:微博、抖音、B站特点:A关注B,B不需要同意数据特征:非对称关系


用户A关注用户B:
┌─────┐  关注  ┌─────┐
│  A  │ ─────→ │  B  │
└─────┘        └─────┘

产生一条关系记录:
{follower: A, followee: B}

强关系(双向好友)

典型场景:微信、QQ、LinkedIn特点:需要双方确认数据特征:对称关系


A和B互为好友:
┌─────┐  好友  ┌─────┐
│  A  │ ←────→ │  B  │
└─────┘        └─────┘

本质上是两条单向关系:
{follower: A, followee: B}
{follower: B, followee: A}

结论:从数据存储角度看,强关系 = 2条弱关系。因此我们以弱关系为基础来设计架构。

1.2 核心查询场景

场景1:正向查询(查关注列表)


-- 张三关注了哪些人?
SELECT followee_id FROM relations WHERE follower_id = 123456;

业务场景:
- 用户查看"我的关注"列表
- 推送关注对象的动态
- 推荐算法分析用户兴趣

场景2:反向查询(查粉丝列表)


-- 哪些人关注了李四?
SELECT follower_id FROM relations WHERE followee_id = 789012;

业务场景:
- 用户查看"我的粉丝"列表
- 内容分发(给所有粉丝推送)
- 统计粉丝数、计算影响力

关键观察:这两个查询同等重要,都是高频操作。


第二部分:单表阶段 – 索引就够了

2.1 简单有效的单表设计

表结构


CREATE TABLE relations (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    follower_id BIGINT NOT NULL,   -- 关注者(粉丝)
    followee_id BIGINT NOT NULL,   -- 被关注者
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    UNIQUE KEY uk_relation (follower_id, followee_id),  -- 防重复关注
    KEY idx_follower (follower_id),   -- 正向查询索引
    KEY idx_followee (followee_id)    -- 反向查询索引
) ENGINE=InnoDB;

2.2 查询性能分析

B+树索引原理回顾


索引 idx_follower (follower_id):

           [根节点]
          /    |    
    [中间层] [中间层] [中间层]
    /  |    /  |    /  |  
  [叶节点数据块×N]

查询路径:根→中间层→叶节点
层级计算:log₁₀₀₀(记录数)

不同数据规模的性能

数据量 索引层级 单次查询耗时 说明
100万 3层 1-2ms 内存命中率高
1000万 4层 3-5ms 部分磁盘IO
5000万 4层 5-10ms 仍可接受
1亿 5层 20-50ms 开始变慢
10亿 5-6层 100-500ms 不可接受

实际测试数据(基于16GB内存MySQL服务器):


测试1:1000万条记录
SELECT followee_id FROM relations WHERE follower_id = ?;
平均耗时:3.2ms
QPS:~3000

测试2:1亿条记录
SELECT followee_id FROM relations WHERE follower_id = ?;
平均耗时:25ms
QPS:~400

测试3:10亿条记录
SELECT followee_id FROM relations WHERE follower_id = ?;
平均耗时:200ms(冷数据可达500ms)
QPS:~50

2.3 单表的物理瓶颈

问题1:存储容量


10亿条记录的实际占用(每行约100字节):
- 数据文件:100GB
- 主键索引:20GB
- follower_id索引:20GB
- followee_id索引:20GB
总计:~160GB

单机MySQL推荐:< 500GB(包含所有库表)

问题2:内存命中率


InnoDB Buffer Pool(假设16GB):
- 1000万记录:索引全部缓存,命中率>95%
- 1亿记录:索引部分缓存,命中率~70%
- 10亿记录:索引频繁换页,命中率<30%

命中率下降 → 磁盘IO增加 → 查询变慢

问题3:写入竞争


高并发写入时的锁竞争:
- 主键自增锁
- 辅助索引页锁
- 行锁

实测:单表超过5000万后,写入TPS从1万降到3000

问题4:运维困难


备份时间:
- 1000万记录:5分钟
- 1亿记录:1小时
- 10亿记录:10小时

DDL操作(加字段):
- 1000万记录:几秒
- 1亿记录:几分钟
- 10亿记录:几小时(需要停服)

2.4 单表的承载极限

经验值


保守估计:5000万条记录
- 数据+索引:~20GB
- 查询耗时:<10ms
- 写入TPS:>5000
- 运维可控:备份<30分钟

激进估计:1亿条记录
- 需要更好的硬件(SSD、32GB内存)
- 查询耗时:10-30ms
- 写入TPS:>3000
- 运维有挑战

危险区:>2亿条记录
- 查询明显变慢
- 写入性能下降
- 运维困难

结论:当数据量突破亿级时,必须考虑分库分表。


第三部分:分库分表的核心矛盾

3.1 为什么要分库分表?

核心目标

突破单机存储容量限制通过水平扩展提升整体性能分散读写压力到多台机器

分库分表原理


原来:1个库,1张表,100亿数据
      ↓
现在:1024个库,每库1张表,每表1000万数据

单表数据量:100亿 / 1024 ≈ 1000万
恢复到单表的黄金区间!

3.2 分库的关键决策:选择分库键

什么是分库键?

分库键(Sharding Key)决定了一条数据存储在哪个库:


def get_database(sharding_key):
    return sharding_key % DATABASE_COUNT

# 示例
user_123456的数据 → 123456 % 1024 = 库#808
user_789012的数据 → 789012 % 1024 = 库#780

关键问题:我们有两个候选分库键


选项A:按 follower_id 分库(关注者ID)
选项B:按 followee_id 分库(被关注者ID)

只能选一个!这就是矛盾的根源。

3.3 方案A:按 follower_id 分库

分库规则


库编号 = follower_id % 1024

数据分布示例


用户A(123456) 关注 用户B(789012):
→ 存储在库#808(因为 123456 % 1024 = 808)

用户C(234567) 关注 用户B(789012):
→ 存储在库#807(因为 234567 % 1024 = 807)

用户A(123456) 关注 用户D(890123):
→ 存储在库#808(因为 123456 % 1024 = 808)

规律:同一个用户的所有”关注关系”都在同一个库。

查询效果分析


-- 查询1:用户A(123456)关注了谁?
步骤1: 计算库编号 = 123456 % 1024 = 808
步骤2: 连接库#808
步骤3: SELECT followee_id FROM relations WHERE follower_id = 123456
结果: ✓ 单库查询,5ms

图示:
应用 → 计算hash(123456) → 库#808 → 返回结果
                           (单次数据库调用)

-- 查询2:谁关注了用户B(789012)?
问题: 无法通过789012定位到具体的库(因为分库键是follower_id)
必须: 查询所有1024个库

步骤1: 向所有库发起查询
       SELECT follower_id FROM relations WHERE followee_id = 789012
步骤2: 等待所有库返回结果
步骤3: 应用层合并结果

图示:
应用 ─┬→ 库#0   ─→ 可能有结果
      ├→ 库#1   ─→ 可能有结果
      ├→ 库#2   ─→ 可能有结果
      ├→ ...
      └→ 库#1023 ─→ 可能有结果
         (1024次数据库调用!)

结果: ✗ 扇出查询,1024次网络往返,5000ms

性能对比

查询类型 数据库调用次数 网络延迟 总耗时 可用性
正向查询(关注列表) 1次 5ms 5ms
反向查询(粉丝列表) 1024次 5ms×1024 5000ms+

3.4 方案B:按 followee_id 分库

结果完全相反


按 followee_id 分库:
- 反向查询(粉丝列表):单库查询 ✓ 5ms
- 正向查询(关注列表):扇出查询 ✗ 5000ms

3.5 核心矛盾的本质

数学表达


定义:
- 分库函数:f(key) → database_id
- 查询维度集合:Q = {正向查询, 反向查询}

约束:
f() 只能基于一个字段

结论:
|Q| = 2(两个查询维度)
能优化的维度 = 1
无法优化的维度 = 1(必须扇出查询)

图示理解


数据记录:{follower: A, followee: B}

如果按A分库:
- 查A的数据 → 定位到A所在的库 ✓
- 查B的数据 → B的数据分散在所有库 ✗

如果按B分库:
- 查B的数据 → 定位到B所在的库 ✓
- 查A的数据 → A的数据分散在所有库 ✗

这是一个二选一的困境!

3.6 扇出查询为什么不可接受?

性能问题


假设:
- 库数量:1024
- 单库查询延迟:5ms
- 网络往返延迟:2ms

扇出查询总耗时:
= max(所有库的查询时间) + 网络延迟
= 5ms + 2ms × 1024(串行)或 5ms(并行但受连接数限制)
= 实际约100-500ms(考虑连接池、网络抖动)

vs 单库查询:5ms

性能差距:20-100倍

稳定性问题


场景:某一个库出现慢查询

单库查询:
库#808响应慢 → 只影响落在该库的用户

扇出查询:
库#808响应慢 → 影响所有用户的反向查询
(因为必须等待所有库返回)

雪崩效应:1个库拖垮整个系统

资源消耗问题


QPS:10000次/秒(反向查询)
库数量:1024

总数据库连接数:10000 × 1024 = 1024万
即使用连接池,也需要维护海量连接

而单库查询只需要:10000 个连接

实际案例


微博场景:
- 用户量:5亿
- 日活:2亿
- 峰值QPS:"查看粉丝"功能 100万/秒

如果用扇出查询:
- 数据库总调用量:100万 × 1024 = 10.24亿次/秒
- 单库需承受:10.24亿 / 1024 = 100万QPS
- 单库极限:约1万QPS

结论:完全不可行!

第四部分:数据冗余 – 不得已的选择

4.1 解决方案:存储两份数据

核心思想

既然一个分库键只能优化一个查询维度,那就存储两份数据,用两个不同的分库键。

架构设计


关注表(guanzhu):
- 分库键:follower_id
- 优化查询:"X关注了谁"
- 库编号 = follower_id % 1024

粉丝表(fensi):
- 分库键:followee_id
- 优化查询:"谁关注了X"
- 库编号 = followee_id % 1024

数据示例


业务操作:用户A(123456) 关注 用户B(789012)

关注表存储(库#808 = 123456 % 1024):
┌──────────┬────────────┬─────────────┐
│ id       │ follower   │ followee    │
├──────────┼────────────┼─────────────┤
│ 10086    │ 123456     │ 789012      │
└──────────┴────────────┴─────────────┘

粉丝表存储(库#780 = 789012 % 1024):
┌──────────┬────────────┬─────────────┐
│ id       │ user_id    │ fan_id      │
├──────────┼────────────┼─────────────┤
│ 20188    │ 789012     │ 123456      │
└──────────┴────────────┴─────────────┘

本质:同一份业务数据,用不同的维度组织了两次

4.2 查询路由

正向查询(查关注列表):


def get_following_list(user_id):
    # 计算库编号
    db_id = user_id % 1024

    # 连接关注表
    db = get_guanzhu_database(db_id)

    # 查询
    result = db.query(
        "SELECT followee FROM guanzhu WHERE follower = ?",
        user_id
    )
    return result

执行流程:
用户123456查关注列表
→ 计算 123456 % 1024 = 808
→ 连接 guanzhu库#808
→ 查询 WHERE follower=123456
→ 5ms返回

反向查询(查粉丝列表):


def get_fans_list(user_id):
    # 计算库编号
    db_id = user_id % 1024

    # 连接粉丝表
    db = get_fensi_database(db_id)

    # 查询
    result = db.query(
        "SELECT fan_id FROM fensi WHERE user_id = ?",
        user_id
    )
    return result

执行流程:
用户789012查粉丝列表
→ 计算 789012 % 1024 = 780
→ 连接 fensi库#780
→ 查询 WHERE user_id=789012
→ 5ms返回

完美解决


两个查询都变成单库查询:
- 正向查询:guanzhu表,5ms
- 反向查询:fensi表,5ms

性能一致,没有扇出查询!

4.3 代价分析

空间代价


原方案:
- 单份数据:100亿条记录
- 存储空间:10TB

冗余方案:
- 两份数据:200亿条记录(100亿×2)
- 存储空间:20TB

空间增加:2倍

维护代价


原方案:
- 写入一次
- 保证一份数据的一致性

冗余方案:
- 写入两次
- 保证两份数据的一致性(这是核心挑战!)

收益


性能提升:
- 查询延迟:从5000ms降到5ms(1000倍提升)
- 系统吞吐:从1000 QPS提升到100万 QPS(1000倍提升)
- 稳定性:从依赖所有库到只依赖单库(质的飞跃)

投入产出比:
- 投入:2倍存储 + 一致性复杂度
- 产出:1000倍性能提升

显然值得!

4.4 底层规律总结

规律1:分库分表的本质约束


分布式系统的基本定律:
数据分片 ⇒ 查询局部性

一条数据只能存在一个位置
一个查询要定位数据,必须知道数据在哪

当查询条件不包含分库键 ⇒ 无法定位 ⇒ 必须扫描所有分片

规律2:数据冗余的本质


数据冗余 = 用空间换时间 + 用存储换查询灵活性

本质:同一份业务数据,按不同维度组织多次存储
目的:让每个查询维度都能快速定位数据

公式:
冗余份数 = 高频查询维度数量

规律3:不可能三角


           查询维度多样性
                 /
                /  
               /    
              /      
             /________
      单库查询         存储不冗余

在分布式系统中,三者不可兼得:
- 要查询多样性 + 单库查询 → 必须冗余
- 要查询多样性 + 不冗余 → 必须扇出查询
- 要单库查询 + 不冗余 → 查询维度受限

第五部分:数据冗余的三种实现方案

现在我们知道了”为什么要冗余”,接下来解决”怎么冗余”。

5.1 核心挑战:一致性问题

问题场景


用户A关注用户B,需要写入两个表:
1. guanzhu表(库#808)
2. fensi表(库#780)

这是两次独立的数据库操作,不在一个事务中!

可能的问题:
- 写入guanzhu成功,写入fensi失败 → 数据不一致
- 两次写入之间有延迟 → 短时间不一致
- 网络分区导致部分写入 → 数据丢失

一致性要求


理想:强一致性(两个表实时同步)
现实:最终一致性(短时间不一致可接受,但最终要一致)

业务影响分析:
- 不一致窗口:1秒以内 → 用户基本无感知 ✓
- 不一致窗口:1分钟 → 用户可能投诉 ✗
- 永久不一致 → 数据错误,严重问题 ✗✗✗

目标:追求最终一致性,尽量缩短不一致窗口

5.2 方案一:服务同步冗余

架构图


┌─────────────┐
│  应用服务   │
└──────┬──────┘
       │
       ├──→ guanzhu库#808 (写入1)
       │         ↓ 成功
       │
       └──→ fensi库#780   (写入2)
                 ↓ 成功

       ← 返回成功给用户

代码实现


def follow(follower_id, followee_id):
    # 计算两个库的编号
    guanzhu_db_id = follower_id % 1024
    fensi_db_id = followee_id % 1024

    # 连接两个库
    guanzhu_db = get_guanzhu_database(guanzhu_db_id)
    fensi_db = get_fensi_database(fensi_db_id)

    try:
        # 写入关注表
        guanzhu_db.execute(
            "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)",
            follower_id, followee_id
        )

        # 写入粉丝表
        fensi_db.execute(
            "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)",
            followee_id, follower_id
        )

        return {"success": True}

    except Exception as e:
        # 如果任何一步失败,需要回滚已写入的数据
        # 但这里没有分布式事务,回滚很困难
        return {"success": False, "error": str(e)}

执行时序


T0:   应用收到请求
T5:   写入guanzhu表完成(5ms)
T10:  写入fensi表完成(5ms)
T10:  返回成功给用户

用户感知延迟:10ms

优点


1. 实现简单
   - 代码逻辑直观
   - 不需要额外组件

2. 一致性较强
   - 两次写入都成功才返回
   - 用户感知的是一致的结果

3. 问题可及时发现
   - 写入失败立即返回错误
   - 便于排查问题

严重缺陷

缺陷1:延迟叠加


正常情况:
写入1:5ms
写入2:5ms
总延迟:10ms(可接受)

异常情况:
写入1:5ms
写入2:数据库慢查询,3000ms
总延迟:3005ms(用户超时)

而且:两次写入是串行的,总延迟 = sum(所有写入)

缺陷2:可用性绑定


guanzhu库#808 正常,fensi库#780 故障

结果:
- 所有涉及这两个库的写入全部失败
- 可用性 = min(所有涉及库的可用性)
- 单库可用性99.9%,涉及2个库 → 可用性约99.8%

随着库数量增加,可用性指数级下降:
- 2个库:99.8%
- 4个库:99.6%
- 10个库:99%

缺陷3:回滚困难


场景:
T0: 写入guanzhu成功
T1: 写入fensi失败(网络超时)

问题:
1. 第一次写入已经提交,无法回滚
2. 如果补偿性删除guanzhu的记录,可能删除失败
3. 最终导致数据不一致

这就是分布式事务的经典问题

缺陷4:雪崩风险


某个库出现抖动(慢查询、磁盘IO高)
→ 涉及该库的所有请求变慢
→ 应用服务线程池耗尽
→ 其他正常请求也无法处理
→ 整个系统雪崩

实际案例


某社交平台早期架构:
- 用同步冗余
- 峰值QPS:5万/秒

某天:
- fensi库#123磁盘故障,响应变慢
- 涉及该库的请求从5ms变成3秒
- 应用服务器线程池(500线程)快速耗尽
- 整体QPS从5万降到1000
- 用户大规模投诉

后果:紧急切换到异步方案

适用场景


仅适合:
- 数据量小(< 1000万)
- 并发低(QPS < 1000)
- 对一致性要求极高
- 团队技术实力有限

示例:企业内部系统、小型社交应用早期

5.3 方案二:服务异步冗余

核心思想

关注表是核心数据,必须同步写入。粉丝表是冗余数据,可以异步写入。

架构图


┌─────────────┐
│  应用服务   │
└──┬────┬─────┘
   │    │
   │    └──→ 消息队列(Kafka/RocketMQ)
   │              │
   │              ↓
   │         ┌────────────┐
   │         │ 冗余服务   │
   │         └─────┬──────┘
   │               │
   ↓               ↓
guanzhu库      fensi库
(同步写入)     (异步写入)

详细流程


步骤1:应用服务处理请求
┌──────────┐
│ 用户请求 │
└────┬─────┘
     ↓
┌────────────────┐
│ 1. 写guanzhu表 │ ← 同步,5ms
└────┬───────────┘
     ↓
┌────────────────┐
│ 2. 发MQ消息    │ ← 同步,1ms
└────┬───────────┘
     ↓
┌────────────────┐
│ 3. 返回成功    │ ← 总延迟:6ms
└────────────────┘

步骤2:冗余服务异步处理
┌────────────────┐
│ MQ消费者监听   │
└────┬───────────┘
     ↓
┌────────────────┐
│ 4. 消费消息    │ ← 异步,延迟100ms
└────┬───────────┘
     ↓
┌────────────────┐
│ 5. 写fensi表   │ ← 异步,5ms
└────────────────┘

代码实现


# 应用服务代码
def follow(follower_id, followee_id):
    guanzhu_db_id = follower_id % 1024
    guanzhu_db = get_guanzhu_database(guanzhu_db_id)

    try:
        # 1. 写入主表(关注表)
        guanzhu_db.execute(
            "INSERT INTO guanzhu (follower, followee, created_at) VALUES (?, ?, NOW())",
            follower_id, followee_id
        )

        # 2. 发送消息到队列
        message = {
            "action": "follow",
            "follower_id": follower_id,
            "followee_id": followee_id,
            "timestamp": int(time.time())
        }
        kafka.send("relation_sync_topic", message)

        # 3. 立即返回成功
        return {"success": True}

    except Exception as e:
        # 只要主表写入成功,就认为操作成功
        # 消息队列的失败可以通过重试机制保证
        return {"success": False, "error": str(e)}

# 冗余服务代码
def consume_message():
    for message in kafka.consume("relation_sync_topic"):
        action = message["action"]
        follower_id = message["follower_id"]
        followee_id = message["followee_id"]

        if action == "follow":
            # 计算fensi表的库编号
            fensi_db_id = followee_id % 1024
            fensi_db = get_fensi_database(fensi_db_id)

            # 写入粉丝表(带重试机制)
            retry_count = 0
            while retry_count < 3:
                try:
                    fensi_db.execute(
                        "INSERT INTO fensi (user_id, fan_id, created_at) VALUES (?, ?, NOW())",
                        followee_id, follower_id
                    )
                    break  # 成功则跳出重试
                except Exception as e:
                    retry_count += 1
                    if retry_count >= 3:
                        # 写入失败队列,人工处理
                        dead_letter_queue.send(message)
                    time.sleep(1)  # 重试前等待

时序图(详细版):


用户    应用    MQ      消费者   guanzhu  fensi
 │       │      │        │        │       │
 ├──────→│      │        │        │       │  T0: 发起关注
 │       ├─────→│        │        │       │  T1: 写guanzhu
 │       │←─────┤        │        │       │  T6: 写入成功
 │       ├─────────────→ │        │       │  T7: 发MQ消息
 │←──────┤      │        │        │       │  T8: 返回成功(用户感知延迟8ms)
 │       │      │        │        │       │
 │       │      ├───────→│        │       │  T108: 消费消息(延迟100ms)
 │       │      │        ├───────────────→│  T109: 写fensi
 │       │      │        │←───────────────┤  T114: 写入成功
 │       │      │        │        │       │
 │       │      │        │        │       │  不一致窗口:T6-T114 = 108ms

核心特点

特点1:用户延迟降低


同步方案:10ms(两次数据库写入)
异步方案:6ms(一次数据库 + 一次MQ)

延迟降低:40%

特点2:可用性解耦


guanzhu库故障 → 用户无法关注 ✗(核心功能受影响)
fensi库故障 → 用户可以关注 ✓(冗余功能异步补偿)

可用性 = guanzhu库可用性(99.9%)
不再受fensi库影响

特点3:最终一致性


T0:    用户A关注B,guanzhu表已写入
T0-T1: 查"A关注了谁"→ 能看到B ✓
T0-T1: 查"谁关注了B"→ 看不到A ✗(不一致窗口)
T1:    fensi表写入完成
T1后:  两个查询结果一致 ✓

不一致窗口:通常100-500ms

特点4:天然削峰


峰值场景:
用户请求:10万QPS
MQ缓冲:积压10万消息
消费者:按自己的节奏处理,如5000 TPS

数据库压力:
- 同步方案:guanzhu库和fensi库都承受10万QPS
- 异步方案:guanzhu库承受10万QPS,fensi库只承受5000 TPS

fensi库压力降低:20倍

优点总结


1. 性能提升
   - 用户延迟降低40%
   - 系统吞吐量提升(解除fensi库瓶颈)

2. 可用性提升
   - 核心功能不受冗余库影响
   - 单库故障影响面缩小

3. 削峰填谷
   - MQ缓冲流量
   - 保护数据库

4. 易于扩展
   - 消费者可以水平扩展
   - 独立调整冗余速度

缺点总结


1. 最终一致性
   - 存在不一致窗口(通常<1秒)
   - 需要容忍短时间的数据不一致

2. 系统复杂度增加
   - 引入消息队列
   - 需要管理消费者服务
   - 需要监控消息积压

3. 消息可能丢失
   - MQ本身可能丢消息(虽然概率很小)
   - 需要补偿机制

4. 调试困难
   - 异步链路长,问题定位复杂
   - 需要完善的日志和监控

适用场景


✓ 数据量:> 1亿
✓ 并发:QPS > 1万
✓ 一致性要求:可接受秒级延迟
✓ 团队能力:能驾驭消息队列

这是互联网大厂的标准方案

5.4 方案三:线下异步冗余

核心思想

应用服务完全不关心冗余逻辑,通过捕获数据库的binlog来触发冗余写入。

什么是binlog?


MySQL的binlog(Binary Log):
- 记录了所有数据变更操作(INSERT、UPDATE、DELETE)
- 主要用途:主从复制、数据恢复

示例binlog记录:
{
  "database": "guanzhu_db_808",
  "table": "guanzhu",
  "type": "INSERT",
  "data": {
    "follower": 123456,
    "followee": 789012,
    "created_at": "2025-11-06 10:30:00"
  },
  "timestamp": 1699260600
}

架构图


┌─────────────┐
│  应用服务   │ ← 完全不感知冗余逻辑
└──────┬──────┘
       │
       ↓
  guanzhu库#808
       │
       ├→ binlog输出
       ↓
┌──────────────┐
│ Canal/Maxwell│ ← binlog解析工具
└──────┬───────┘
       │
       ↓
  消息队列(Kafka)
       │
       ↓
┌──────────────┐
│  冗余服务    │ ← 独立部署的服务
└──────┬───────┘
       │
       ↓
  fensi库#780

详细流程


步骤1:用户操作
用户A关注B
  ↓
应用服务
  ↓
写入guanzhu表(库#808)
  ↓
MySQL记录binlog

步骤2:binlog捕获(实时)
Canal监听guanzhu库#808的binlog
  ↓
解析binlog:
  {
    "操作": "INSERT",
    "表": "guanzhu",
    "数据": {"follower": 123456, "followee": 789012}
  }
  ↓
转换为业务消息:
  {
    "action": "follow",
    "follower_id": 123456,
    "followee_id": 789012
  }
  ↓
发送到Kafka

步骤3:冗余写入(异步)
冗余服务消费Kafka消息
  ↓
写入fensi表(库#780)

代码实现


# 应用服务代码(完全无感知)
def follow(follower_id, followee_id):
    guanzhu_db_id = follower_id % 1024
    guanzhu_db = get_guanzhu_database(guanzhu_db_id)

    # 只写主表,不管冗余
    guanzhu_db.execute(
        "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)",
        follower_id, followee_id
    )
    return {"success": True}

# Canal配置(伪代码)
canal_config = {
    "源数据库": "guanzhu_db_*",  # 监听所有guanzhu库
    "binlog位置": "自动记录",
    "过滤规则": "只监听guanzhu表的INSERT和DELETE",
    "输出": "Kafka topic: guanzhu_binlog"
}

# 冗余服务代码
def consume_binlog_message():
    for message in kafka.consume("guanzhu_binlog"):
        event_type = message["type"]
        follower = message["data"]["follower"]
        followee = message["data"]["followee"]

        fensi_db_id = followee % 1024
        fensi_db = get_fensi_database(fensi_db_id)

        if event_type == "INSERT":
            # 关注 → 写入粉丝表
            fensi_db.execute(
                "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)",
                followee, follower
            )
        elif event_type == "DELETE":
            # 取消关注 → 删除粉丝表
            fensi_db.execute(
                "DELETE FROM fensi WHERE user_id=? AND fan_id=?",
                followee, follower
            )

时序图


用户   应用   guanzhu  Canal   Kafka  冗余服务  fensi
 │      │       │       │       │       │       │
 ├─────→│       │       │       │       │       │  T0: 关注请求
 │      ├──────→│       │       │       │       │  T1: 写guanzhu
 │      │       ├──→binlog      │       │       │  T6: 记录binlog
 │←─────┤       │       │       │       │       │  T6: 返回成功
 │      │       │       │       │       │       │
 │      │       │       ├──────→│       │       │  T56: Canal读取binlog(延迟50ms)
 │      │       │       │       ├──────→│       │  T106: 消息到达(延迟50ms)
 │      │       │       │       │       ├──────→│  T111: 写fensi(延迟5ms)
 │      │       │       │       │       │       │
 不一致窗口:T6 - T111 = 105ms(通常100-500ms)

核心优势

优势1:业务完全解耦


应用代码:
- 只关心核心业务逻辑
- 不需要管冗余、不需要管MQ
- 代码简洁、维护容易

冗余逻辑:
- 独立服务,独立迭代
- 可以随时增加新的冗余表
- 不影响主流程

优势2:自动捕获所有变更


同步/异步方案的风险:
- 代码漏写MQ消息 → 冗余数据丢失
- 异常分支没有发消息 → 数据不一致

binlog方案:
- 所有数据库变更都被binlog记录
- 自动保证不会遗漏
- 即使代码有bug,冗余也会执行

优势3:天然支持多种冗余


场景:需要将关系链数据同步到多个目标
- fensi表(粉丝维度)
- ES(搜索引擎,支持模糊查询)
- Redis(热点数据缓存)
- 数据仓库(离线分析)

实现:
同一份binlog → 多个消费者并行处理

  binlog
    │
    ├→ 冗余服务1 → fensi表
    ├→ 冗余服务2 → Elasticsearch
    ├→ 冗余服务3 → Redis
    └→ 冗余服务4 → 数据仓库

无需修改应用代码!

优势4:历史数据自动修复


场景:发现fensi表有历史脏数据

传统方案:
- 写脚本扫描guanzhu表
- 对比fensi表
- 手动修复

binlog方案:
- 重置Canal的binlog位置到历史时间点
- Canal自动重放历史binlog
- 冗余服务自动重建fensi表

严重缺点

缺点1:一致性延迟最大


延迟来源:
1. binlog写入延迟:0-10ms
2. Canal读取延迟:10-50ms(轮询间隔)
3. Kafka传输延迟:10-50ms
4. 消费者处理延迟:10-100ms

总延迟:100-500ms(典型值)

vs 服务异步方案:50-200ms

差距:2-3倍

缺点2:系统复杂度最高


新增组件:
- Canal/Maxwell(binlog解析)
- Kafka(消息队列)
- 冗余服务集群
- 监控告警系统

运维成本:
- 需要监控binlog延迟
- 需要监控Canal服务状态
- 需要处理binlog位置丢失
- 需要处理数据库主从切换

缺点3:binlog依赖风险


问题场景:
1. 数据库binlog过期被删除
   → Canal丢失部分数据
   → 冗余数据缺失

2. 主从切换导致binlog位置变化
   → Canal可能重复消费或漏消费
   → 数据重复或丢失

3. binlog格式变化(MySQL升级)
   → Canal解析失败
   → 冗余中断

缺点4:调试和追踪困难


问题定位链路:
用户反馈 → 检查应用日志 → 检查数据库 → 检查binlog
→ 检查Canal → 检查Kafka → 检查冗余服务

链路长度:6个环节
每个环节都可能出问题

vs 同步方案:2个环节
vs 异步方案:3个环节

适用场景


✓ 数据量:> 10亿
✓ 对实时性要求不高(秒级延迟可接受)
✓ 需要多种冗余目标(如ES、Redis、数仓)
✓ 团队技术实力强(能驾驭复杂系统)
✓ 业务逻辑复杂,不希望应用代码承担冗余责任

典型案例:
- 阿里巴巴:使用Canal同步数据到搜索引擎
- 美团:使用Maxwell同步数据到数据仓库
- 字节跳动:使用Flink CDC同步多种目标

5.5 三种方案对比总结

维度 服务同步冗余 服务异步冗余 线下异步冗余
用户延迟 10ms 6ms 6ms
一致性窗口 0(强一致) 100-200ms 100-500ms
可用性 差(绑定所有库) 好(核心库决定) 好(核心库决定)
实现复杂度
运维复杂度
新增冗余成本 高(改代码) 中(改代码) 低(加消费者)
数据完整性 可能丢失 可能丢失 自动保证
适用数据量 < 1000万 1000万-100亿 > 10亿
典型QPS < 1000 1000-100万 任意

选型决策树


开始
 │
 ├─ 数据量 < 1000万?
 │   └─ 是 → 服务同步冗余(简单够用)
 │
 ├─ 需要多种冗余目标?(ES、Redis、数仓等)
 │   └─ 是 → 线下异步冗余(一次投入,持续受益)
 │
 ├─ 对实时性要求极高?(< 100ms)
 │   └─ 是 → 服务异步冗余(可控延迟)
 │
 └─ 其他情况 → 服务异步冗余(性价比最高)

第六部分:最终一致性保障机制

无论选择哪种冗余方案,都无法保证100%的实时一致性。因此需要建立最终一致性保障机制

6.1 为什么会出现不一致?

异步冗余方案的失败场景


场景1:消息丢失
T0: 应用发送MQ消息
T1: MQ服务器重启,消息丢失
结果:guanzhu表有数据,fensi表没有

场景2:消费失败
T0: 消费者读取消息
T1: 写入fensi表时,数据库连接超时
T2: 消费者重试3次全部失败
T3: 消息被丢弃(进入死信队列)
结果:数据不一致

场景3:网络分区
T0: 消费者写入fensi表
T1: 网络分区,写入超时
T2: 消费者认为失败,重试
T3: 实际上第一次写入成功了
结果:fensi表有重复数据

场景4:程序Bug
T0: 代码逻辑错误,某种边界情况下不发MQ消息
结果:数据永久不一致

binlog方案的失败场景


场景1:binlog过期
T0: Canal服务停止7天
T7: 数据库binlog保留期只有3天,部分binlog被删除
结果:丢失部分变更

场景2:主从切换
T0: 主库故障,切换到从库
T1: Canal的binlog位置在新主库上不连续
结果:可能重复消费或漏消费

场景3:解析失败
T0: MySQL升级,binlog格式变化
T1: Canal无法解析新格式
结果:冗余中断

6.2 影响分析

数据不一致的业务影响


案例1:粉丝数不匹配
- guanzhu表:用户A有1000个关注
- fensi表统计:用户们的粉丝总数 = 950个A
- 差异:50条数据不一致
- 影响:粉丝数显示错误、推荐算法偏差

案例2:内容分发遗漏
- 用户B发布内容
- 系统查fensi表,找到999个粉丝
- 实际guanzhu表有1000人关注B
- 结果:1个用户看不到B的内容

案例3:数据分析错误
- 运营分析关注关系图
- 基于不一致的数据得出错误结论
- 影响业务决策

容忍度分析


用户视角:
- 不一致窗口 < 1秒:完全无感知 ✓
- 不一致窗口 1-10秒:偶尔发现,可接受 ✓
- 不一致窗口 > 1分钟:明显感知,影响体验 ✗
- 永久不一致:数据错误,严重问题 ✗✗✗

业务视角:
- 比例 < 0.01%:可接受(百万分之一)
- 比例 0.01%-0.1%:需要监控
- 比例 > 0.1%:需要优化
- 比例 > 1%:严重问题

6.3 方案一:全量扫描修复

核心思想:定期扫描所有数据,对比两个表,发现并修复不一致。

实现逻辑


def full_scan_repair():
    """
    全量扫描修复
    执行频率:每天凌晨2点
    """
    print("开始全量扫描...")

    # 遍历所有guanzhu库
    for db_id in range(1024):
        guanzhu_db = get_guanzhu_database(db_id)

        # 查询该库的所有关系
        relations = guanzhu_db.query(
            "SELECT follower, followee FROM guanzhu"
        )

        # 对每条关系,检查fensi表
        for relation in relations:
            follower = relation['follower']
            followee = relation['followee']

            # 计算fensi表的库编号
            fensi_db_id = followee % 1024
            fensi_db = get_fensi_database(fensi_db_id)

            # 检查fensi表是否存在对应记录
            exists = fensi_db.query(
                "SELECT 1 FROM fensi WHERE user_id=? AND fan_id=?",
                followee, follower
            )

            if not exists:
                # 不存在,补写数据
                fensi_db.execute(
                    "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)",
                    followee, follower
                )
                log_repair(f"修复: {follower} → {followee}")

    print("全量扫描完成")

执行流程图


┌──────────────────────────────────────┐
│ 定时任务:每天凌晨2点执行            │
└─────────────┬────────────────────────┘
              │
    ┌─────────▼─────────┐
    │ 扫描guanzhu库#0   │
    │ 读取100万条记录   │
    └─────────┬─────────┘
              │
         ┌────▼────┐
         │ 对每条:  │
         │ 检查fensi│
         │ 不存在?  │
         │ → 补写   │
         └────┬────┘
              │
    ┌─────────▼─────────┐
    │ 扫描guanzhu库#1   │
    └─────────┬─────────┘
              │
             ...
              │
    ┌─────────▼─────────┐
    │ 扫描guanzhu库#1023│
    └───────────────────┘
              │
    ┌─────────▼─────────┐
    │ 扫描完成,生成报告│
    │ - 扫描记录数      │
    │ - 修复记录数      │
    │ - 耗时            │
    └───────────────────┘

性能估算


假设:
- 总数据量:100亿条
- 单库数据量:100亿 / 1024 ≈ 1000万条
- 每条记录检查:0.5ms(查guanzhu + 查fensi)

单线程耗时:
100亿 × 0.5ms = 5,000,000秒 ≈ 58天

优化方案1:并发扫描
- 1024个库并发扫描
- 耗时:58天 / 1024 ≈ 1.4小时

优化方案2:批量查询
- 每次查1000条,减少网络往返
- 每批耗时:从500ms降到50ms
- 总耗时:1.4小时 / 10 ≈ 8分钟

实际实现:
使用64个工作线程,每个线程负责16个库
预计耗时:1-2小时

代码优化版本


from concurrent.futures import ThreadPoolExecutor
import time

def scan_and_repair_batch(db_id):
    """
    扫描单个数据库并修复
    使用批量查询优化
    """
    guanzhu_db = get_guanzhu_database(db_id)
    repair_count = 0
    scan_count = 0

    # 批量读取guanzhu表
    offset = 0
    batch_size = 1000

    while True:
        relations = guanzhu_db.query(
            f"SELECT follower, followee FROM guanzhu LIMIT {batch_size} OFFSET {offset}"
        )

        if not relations:
            break

        scan_count += len(relations)

        # 按fensi库分组
        fensi_groups = {}
        for rel in relations:
            fensi_db_id = rel['followee'] % 1024
            if fensi_db_id not in fensi_groups:
                fensi_groups[fensi_db_id] = []
            fensi_groups[fensi_db_id].append(rel)

        # 批量检查每个fensi库
        for fensi_db_id, group in fensi_groups.items():
            fensi_db = get_fensi_database(fensi_db_id)

            # 构建批量查询
            conditions = " OR ".join([
                f"(user_id={rel['followee']} AND fan_id={rel['follower']})"
                for rel in group
            ])

            existing = fensi_db.query(
                f"SELECT user_id, fan_id FROM fensi WHERE {conditions}"
            )

            existing_set = {(r['user_id'], r['fan_id']) for r in existing}

            # 找出缺失的记录
            missing = [
                rel for rel in group
                if (rel['followee'], rel['follower']) not in existing_set
            ]

            # 批量插入
            if missing:
                values = ",".join([
                    f"({rel['followee']}, {rel['follower']})"
                    for rel in missing
                ])
                fensi_db.execute(
                    f"INSERT INTO fensi (user_id, fan_id) VALUES {values}"
                )
                repair_count += len(missing)

        offset += batch_size

    return {"db_id": db_id, "scanned": scan_count, "repaired": repair_count}

def full_scan_with_concurrency():
    """
    并发全量扫描
    """
    start_time = time.time()

    # 使用64个线程并发扫描
    with ThreadPoolExecutor(max_workers=64) as executor:
        futures = [executor.submit(scan_and_repair_batch, db_id) for db_id in range(1024)]
        results = [f.result() for f in futures]

    # 统计结果
    total_scanned = sum(r['scanned'] for r in results)
    total_repaired = sum(r['repaired'] for r in results)
    elapsed = time.time() - start_time

    print(f"扫描完成:")
    print(f"  总记录数: {total_scanned:,}")
    print(f"  修复记录数: {total_repaired:,}")
    print(f"  不一致比例: {total_repaired/total_scanned*100:.4f}%")
    print(f"  耗时: {elapsed/60:.1f}分钟")

优点


1. 简单可靠
   - 逻辑清晰,易于实现
   - 不依赖复杂组件

2. 全面覆盖
   - 扫描所有数据,不会遗漏
   - 能发现任何原因导致的不一致

3. 自动修复
   - 发现问题立即修复
   - 无需人工介入

严重缺陷


1. 不一致窗口长
   - 每天执行一次 → 最长24小时的不一致窗口
   - 用户可能一整天看到错误数据

2. 资源消耗大
   - 需要扫描所有数据
   - 占用大量数据库IO和CPU
   - 可能影响业务高峰期(即使在凌晨)

3. 扩展性差
   - 数据量翻倍 → 扫描时间翻倍
   - 当数据量达到千亿级,即使优化也需要数小时

4. 无法定位根因
   - 只知道数据不一致,不知道为什么
   - 无法修复导致不一致的代码bug

适用场景


✓ 数据量:< 10亿
✓ 对一致性要求不高(天级延迟可接受)
✓ 作为兜底方案(配合其他方案使用)

示例:
- 中小型社交应用
- 企业内部系统
- 数据分析系统(非实时)

6.4 方案二:增量扫描修复

核心思想:只扫描最近变化的数据,而不是全量扫描。

实现原理


关键:在guanzhu表增加字段
ALTER TABLE guanzhu ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

每小时扫描:
SELECT * FROM guanzhu WHERE updated_at > NOW() - INTERVAL 1 HOUR;

执行流程


定时任务:每小时执行
  │
  ├→ 找出过去1小时修改的记录
  │   SELECT * FROM guanzhu
  │   WHERE updated_at > '2025-11-06 09:00:00'
  │
  ├→ 对每条记录,检查fensi表
  │   - 存在且一致 → 跳过
  │   - 不存在 → 补写
  │   - 存在但不一致 → 修复
  │
  └→ 生成修复报告

代码实现


def incremental_scan_repair():
    """
    增量扫描修复
    执行频率:每小时
    """
    # 计算扫描时间范围
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=1)

    total_scanned = 0
    total_repaired = 0

    # 遍历所有guanzhu库
    for db_id in range(1024):
        guanzhu_db = get_guanzhu_database(db_id)

        # 只查询最近1小时的变更
        relations = guanzhu_db.query(
            """
            SELECT follower, followee, updated_at
            FROM guanzhu
            WHERE updated_at BETWEEN ? AND ?
            """,
            start_time, end_time
        )

        total_scanned += len(relations)

        # 检查并修复
        for rel in relations:
            fensi_db_id = rel['followee'] % 1024
            fensi_db = get_fensi_database(fensi_db_id)

            exists = fensi_db.query(
                "SELECT 1 FROM fensi WHERE user_id=? AND fan_id=?",
                rel['followee'], rel['follower']
            )

            if not exists:
                fensi_db.execute(
                    "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)",
                    rel['followee'], rel['follower']
                )
                total_repaired += 1

    # 记录指标
    metrics.record("incremental_scan", {
        "scanned": total_scanned,
        "repaired": total_repaired,
        "ratio": total_repaired / total_scanned if total_scanned > 0 else 0
    })

性能对比


假设:
- 总数据量:100亿
- 每小时新增/修改:100万条(峰值)
- 平均每小时:50万条

全量扫描:
- 扫描量:100亿
- 耗时:1-2小时(并发优化后)

增量扫描:
- 扫描量:100万(峰值)
- 比例:100万 / 100亿 = 0.01%
- 耗时:1-2小时 × 0.01% = 3-7秒

性能提升:1000倍

不一致窗口缩短


全量扫描:
- 执行频率:每天1次
- 最长窗口:24小时

增量扫描:
- 执行频率:每小时1次
- 最长窗口:1小时

窗口缩短:24倍

优点


1. 高效
   - 只扫描变更数据,效率提升1000倍
   - 资源消耗低

2. 频繁执行
   - 可以每小时甚至每10分钟执行一次
   - 不一致窗口从天级降到小时级

3. 资源消耗可控
   - 即使在业务高峰期执行也不会影响性能

缺陷


1. 依赖updated_at字段
   - 需要保证该字段准确更新
   - 如果字段更新失败,会漏扫描

2. 无法发现历史遗留问题
   - 只检查最近变更
   - 历史不一致数据无法发现

解决方案:
- 增量扫描(每小时)+ 全量扫描(每周)
- 双重保障

完整方案


def comprehensive_scan_strategy():
    """
    综合扫描策略
    """
    # 增量扫描:每小时
    schedule.every(1).hour.do(incremental_scan_repair)

    # 全量扫描:每周日凌晨
    schedule.every().sunday.at("02:00").do(full_scan_repair)

    # 实时监控:持续运行
    schedule.every(10).seconds.do(monitor_consistency_metrics)

监控指标


关键指标:
1. 扫描记录数
   - 每小时扫描多少条
   - 趋势分析(判断是否异常)

2. 修复记录数
   - 每小时修复多少条
   - 计算不一致比例

3. 修复比例
   - 修复数 / 扫描数
   - 正常:< 0.1%
   - 警告:0.1% - 1%
   - 告警:> 1%

4. 扫描耗时
   - 监控性能变化
   - 及时发现数据库性能问题

6.5 方案三:实时核对(最佳方案)

核心思想

每次写入时,同时发送一条”核对消息”,延迟几秒后检查两个表是否一致。

架构图


用户操作
  │
  ↓
┌─────────────┐
│  应用服务   │
└──┬────┬─────┘
   │    │
   │    ├→ MQ消息1:冗余指令(立即处理)
   │    │   topic: relation_sync
   │    │
   │    └→ MQ消息2:核对任务(延迟5秒处理)
   │        topic: consistency_check
   │        delay: 5 seconds
   │
   ↓
guanzhu表

并行处理:
消费者1:消费 relation_sync → 写fensi表
消费者2:消费 consistency_check → 核对一致性

详细流程


T0: 用户A关注B
    ├→ 写入guanzhu表
    ├→ 发MQ消息1: {"action": "sync", "follower": A, "followee": B}
    └→ 发MQ消息2: {"action": "check", "follower": A, "followee": B, "delay": 5s}

T0-T5: 冗余服务消费消息1,写入fensi表(正常情况下完成)

T5: 核对服务消费消息2
    ├→ 查询guanzhu表: A→B 存在? 是
    ├→ 查询fensi表: B←A 存在? 是
    └→ 结论:一致 ✓

T5: 如果fensi表不存在
    ├→ 告警:发现不一致
    ├→ 自动修复:写入fensi表
    └→ 记录日志:便于排查根因

代码实现


# 应用服务代码
def follow(follower_id, followee_id):
    guanzhu_db_id = follower_id % 1024
    guanzhu_db = get_guanzhu_database(guanzhu_db_id)

    # 1. 写入主表
    guanzhu_db.execute(
        "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)",
        follower_id, followee_id
    )

    # 2. 发送冗余消息(立即处理)
    kafka.send("relation_sync", {
        "action": "sync",
        "follower": follower_id,
        "followee": followee_id,
        "timestamp": time.time()
    })

    # 3. 发送核对消息(延迟5秒)
    kafka.send("consistency_check", {
        "action": "check",
        "follower": follower_id,
        "followee": followee_id,
        "timestamp": time.time()
    }, delay=5)  # 延迟5秒投递

    return {"success": True}

# 冗余服务代码
def sync_consumer():
    for message in kafka.consume("relation_sync"):
        follower = message["follower"]
        followee = message["followee"]

        fensi_db_id = followee % 1024
        fensi_db = get_fensi_database(fensi_db_id)

        try:
            fensi_db.execute(
                "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)",
                followee, follower
            )
        except Exception as e:
            # 写入失败,记录日志
            logger.error(f"Sync failed: {follower}→{followee}, error: {e}")
            # 不重试,等待核对服务发现并修复

# 核对服务代码
def check_consumer():
    for message in kafka.consume("consistency_check"):
        follower = message["follower"]
        followee = message["followee"]

        # 查询guanzhu表
        guanzhu_db_id = follower % 1024
        guanzhu_db = get_guanzhu_database(guanzhu_db_id)
        guanzhu_exists = guanzhu_db.query(
            "SELECT 1 FROM guanzhu WHERE follower=? AND followee=?",
            follower, followee
        )

        # 查询fensi表
        fensi_db_id = followee % 1024
        fensi_db = get_fensi_database(fensi_db_id)
        fensi_exists = fensi_db.query(
            "SELECT 1 FROM fensi WHERE user_id=? AND fan_id=?",
            followee, follower
        )

        # 核对结果
        if guanzhu_exists and not fensi_exists:
            # 不一致:guanzhu有,fensi没有
            logger.warning(f"Inconsistency detected: {follower}→{followee}")

            # 自动修复
            fensi_db.execute(
                "INSERT INTO fensi (user_id, fan_id) VALUES (?, ?)",
                followee, follower
            )

            # 发送告警
            alert("数据不一致", {
                "follower": follower,
                "followee": followee,
                "missing_in": "fensi"
            })

            # 记录指标
            metrics.increment("consistency_repair")

        elif not guanzhu_exists and fensi_exists:
            # 不一致:fensi有,guanzhu没有(异常情况)
            logger.error(f"Orphan data in fensi: {followee}←{follower}")

            # 这种情况很少见,可能是误删除,需要人工介入
            alert("数据异常", {
                "type": "orphan_in_fensi",
                "user_id": followee,
                "fan_id": follower
            })

        elif guanzhu_exists and fensi_exists:
            # 一致,无需处理
            metrics.increment("consistency_check_pass")

        else:
            # 都不存在(可能是已经取消关注)
            pass

延迟时间选择


核对延迟的选择逻辑:

过短(如1秒):
- 冗余服务可能还没处理完
- 误报率高

过长(如60秒):
- 不一致窗口长
- 用户可能已经感知

最佳实践(5秒):
- 统计数据:99%的冗余操作在3秒内完成
- 5秒延迟能覆盖99.9%的情况
- 用户基本无感知

动态调整:
- 监控P99延迟
- 如果P99超过5秒 → 调整为P99 + 2秒

核对覆盖率


问题:是否每个操作都要核对?

方案1:100%覆盖
- 优点:发现所有不一致
- 缺点:核对消息量 = 业务消息量,成本高

方案2:采样核对(如10%)
- 优点:成本降低90%
- 缺点:只能发现10%的问题

推荐方案:分级策略
- 核心用户(大V):100%核对
- 普通用户:10%采样核对
- 配合增量扫描:发现剩余90%的问题

代码实现:
if is_vip_user(follower) or is_vip_user(followee):
    # VIP用户,100%核对
    kafka.send("consistency_check", message, delay=5)
elif random.random() < 0.1:
    # 普通用户,10%核对
    kafka.send("consistency_check", message, delay=5)

优势总结


1. 不一致窗口最短
   - 实时发现:5-10秒
   - vs 增量扫描:1小时
   - vs 全量扫描:1天

2. 自动修复
   - 发现即修复,无需人工介入
   - 修复延迟:秒级

3. 根因分析
   - 可以统计哪些原因导致不一致
   - 指导代码优化

4. 精准告警
   - 实时告警,不会漏报
   - 可以定位到具体的用户和操作

5. 灵活控制
   - 可以调整核对延迟
   - 可以调整核对比例
   - 可以针对不同用户采用不同策略

成本分析


假设:
- 业务QPS:10万/秒
- 核对比例:20%(VIP 100% + 普通用户 10%采样)

核对服务负载:
- 核对QPS:10万 × 20% = 2万/秒
- 每次核对:2次数据库查询(guanzhu + fensi)
- 数据库查询:4万/秒

对比:
- 全量扫描:每天1次,1-2小时,高峰期影响
- 增量扫描:每小时1次,几秒,影响小
- 实时核对:持续运行,负载平滑

结论:实时核对的资源消耗可控,且负载平滑

完整架构


┌─────────────────────────────────────┐
│         三层一致性保障体系           │
└─────────────────────────────────────┘

第一层:实时核对(秒级发现和修复)
  ├─ 覆盖率:VIP 100% + 普通用户 10%
  ├─ 延迟:5-10秒
  └─ 目标:发现和修复90%的问题

第二层:增量扫描(小时级兜底)
  ├─ 频率:每小时
  ├─ 覆盖率:100%(最近1小时变更)
  └─ 目标:发现实时核对遗漏的10%问题

第三层:全量扫描(周级终极保障)
  ├─ 频率:每周
  ├─ 覆盖率:100%(全部数据)
  └─ 目标:发现历史遗留问题

监控体系:
  ├─ 实时监控:不一致率、修复率
  ├─ 告警:不一致率超过阈值
  └─ 报表:每日/每周一致性报告

第七部分:实际案例与演进路线

7.1 初创期(用户 < 100万)

业务特点


- 用户量:10万
- 日活:5万
- 关系总数:500万
- QPS:峰值1000

架构选择


数据存储:
┌──────────────────┐
│  MySQL单库       │
│  ├─ relations表  │ ← 单表足够
│  ├─ idx_follower │ ← 索引优化查询
│  └─ idx_followee │
└──────────────────┘

查询性能:
- 正向查询:3-5ms
- 反向查询:3-5ms

完全不需要:
✗ 分库分表
✗ 数据冗余
✗ 消息队列
✗ 一致性检测

只需要:
✓ 单表 + 索引
✓ 主从复制(高可用)
✓ 定期备份

代码示例


# 简单的单表实现
class RelationService:
    def follow(self, follower_id, followee_id):
        db.execute(
            "INSERT INTO relations (follower, followee) VALUES (?, ?)",
            follower_id, followee_id
        )

    def get_following(self, user_id):
        return db.query(
            "SELECT followee FROM relations WHERE follower = ?",
            user_id
        )

    def get_fans(self, user_id):
        return db.query(
            "SELECT follower FROM relations WHERE followee = ?",
            user_id
        )

7.2 成长期(用户 100万-5000万)

业务特点


- 用户量:1000万
- 日活:500万
- 关系总数:50亿
- QPS:峰值5万

遇到的问题


问题1:单表容量告急
- relations表:50亿条记录
- 数据+索引:~500GB
- 查询变慢:50-100ms
- 写入TPS下降:从1万降到3000

问题2:备份困难
- 备份时间:8小时
- 影响业务

问题3:DDL操作困难
- 加字段需要锁表数小时

架构升级


升级1:引入分库分表
┌─────────┐
│ 应用    │
└────┬────┘
     │
┌────▼────────────────────┐
│  分库中间件(ShardingJDBC)│
└────┬────────────────────┘
     │
     ├→ guanzhu_db_0  (按follower_id分)
     ├→ guanzhu_db_1
     ├→ ...
     └→ guanzhu_db_63  (64个库)

每库存储:50亿 / 64 ≈ 8000万条
查询性能恢复:5-10ms

升级2:引入数据冗余
guanzhu表:按follower_id分库(64库)
fensi表:按followee_id分库(64库)

升级3:异步冗余
应用 → guanzhu表 + MQ → 冗余服务 → fensi表

代码演进


# 分库分表后的代码
class RelationService:
    def __init__(self):
        self.sharding = ShardingClient(db_count=64)

    def follow(self, follower_id, followee_id):
        # 计算guanzhu库
        guanzhu_db = self.sharding.get_db("guanzhu", follower_id)

        # 写入主表
        guanzhu_db.execute(
            "INSERT INTO guanzhu (follower, followee) VALUES (?, ?)",
            follower_id, followee_id
        )

        # 发送冗余消息
        kafka.send("relation_sync", {
            "action": "follow",
            "follower": follower_id,
            "followee": followee_id
        })

    def get_following(self, user_id):
        guanzhu_db = self.sharding.get_db("guanzhu", user_id)
        return guanzhu_db.query(
            "SELECT followee FROM guanzhu WHERE follower = ?",
            user_id
        )

    def get_fans(self, user_id):
        fensi_db = self.sharding.get_db("fensi", user_id)
        return fensi_db.query(
            "SELECT fan_id FROM fensi WHERE user_id = ?",
            user_id
        )

一致性保障


方案:增量扫描
- 频率:每小时
- 覆盖:最近1小时变更
- 修复:自动补写

监控:
- 不一致率:< 0.01%
- 告警阈值:> 0.1%

7.3 成熟期(用户 > 1亿)

业务特点


- 用户量:5亿
- 日活:2亿
- 关系总数:1000亿
- QPS:峰值100万

架构全貌


┌───────────────────────────────────────────┐
│              用户请求                      │
└───────────────┬───────────────────────────┘
                │
┌───────────────▼───────────────────────────┐
│          网关层(Nginx)                   │
│  - 限流                                   │
│  - 熔断                                   │
│  - 路由                                   │
└───────────────┬───────────────────────────┘
                │
┌───────────────▼───────────────────────────┐
│       应用服务集群(1000+实例)            │
└─┬──────────┬──────────┬──────────────────┘
  │          │          │
  │          │          └→ MQ (Kafka集群)
  │          │                │
  │          │                ├→ 冗余服务集群
  │          │                ├→ 核对服务集群
  │          │                └→ 分析服务
  │          │
  │          └→ Redis集群(热点数据缓存)
  │              - 大V用户的粉丝列表
  │              - 最近关注关系
  │
  └→ 数据库集群
     ├→ guanzhu_db (1024个库)
     └→ fensi_db (1024个库)

分库策略优化


初期:64个库,每库15亿条
问题:仍然太大

优化:1024个库,每库1亿条
- 查询性能:稳定在5-10ms
- 扩容灵活:可以继续拆分到2048库

缓存策略


问题:大V用户的粉丝查询频繁
- 周杰伦:1亿粉丝
- 每秒被查询:10万次
- 数据库压力巨大

解决:多级缓存
L1: 应用本地缓存(Caffeine)
  - 容量:10万条
  - 命中率:50%
  - 延迟:< 1ms

L2: Redis集群
  - 容量:热点用户的全量粉丝
  - 命中率:40%
  - 延迟:1-3ms

L3: 数据库
  - 兜底查询
  - 命中率:10%
  - 延迟:5-10ms

总体效果:
- 90%请求被缓存承接
- 数据库压力降低:10倍

一致性保障完整方案


三层保障:

L1: 实时核对(秒级)
  - VIP用户:100%覆盖
  - 普通用户:20%采样
  - 发现率:95%
  - 修复延迟:5-10秒

L2: 增量扫描(小时级)
  - 频率:每30分钟
  - 覆盖:100%最近变更
  - 发现率:4.9%(剩余的5%)
  - 修复延迟:30分钟

L3: 全量扫描(周级)
  - 频率:每周日凌晨
  - 覆盖:100%全量数据
  - 发现率:0.1%(历史遗留)
  - 修复延迟:7天

综合效果:
- 不一致率:< 0.001%(百万分之一)
- 平均修复延迟:< 1分钟

监控体系


关键指标:
1. 业务指标
   - 关注成功率:> 99.99%
   - 查询成功率:> 99.99%
   - P99延迟:< 20ms

2. 一致性指标
   - 实时核对:不一致率、修复率
   - 增量扫描:扫描量、修复量
   - 全量扫描:总不一致数

3. 系统指标
   - 数据库QPS、慢查询
   - MQ积压量
   - 缓存命中率

告警规则:
- 不一致率 > 0.1%:P0告警,立即处理
- 查询延迟P99 > 50ms:P1告警,30分钟内处理
- MQ积压 > 100万:P2告警,1小时内处理

第八部分:底层规律与设计哲学

8.1 分布式系统的根本约束

CAP定理在关系链系统中的体现


CAP定理:
- C(Consistency):一致性
- A(Availability):可用性
- P(Partition Tolerance):分区容错性

在分布式环境下,三者只能选其二

关系链系统的选择:
      一致性(C)
        /  
       /    
      /  选择
     /   AP   
    /__________
 可用性(A)  分区容错(P)
      ↑        ↑
      必选     必选

解释:
- P(分区容错):必选
  → 分库分表必然导致网络分区
  → 无法避免

- A(可用性):必选
  → 社交产品核心竞争力
  → 用户无法容忍服务不可用

- C(一致性):妥协
  → 降级为最终一致性
  → 通过补偿机制保证

数据冗余的数学本质


问题模型:
- 数据集 D = {d1, d2, ..., dn}
- 查询维度 Q = {q1, q2, ..., qm}
- 分片函数 f: D → {s1, s2, ..., sk} (k个分片)

约束:
一个分片函数只能基于一个字段
即:f 只能优化一个查询维度

定理:
当 |Q| > 1 时,为了让所有查询都是单分片查询,
需要的数据副本数 = |Q|

证明:
反证法:假设只需 r < |Q| 个副本
则至少存在一个查询维度 q_i 没有对应的副本
该维度的查询必须扫描所有分片(扇出查询)
与"所有查询都是单分片查询"矛盾

结论:
冗余份数 = 查询维度数量
(这是分布式系统的数学必然)

8.2 一致性的本质

强一致性 vs 最终一致性


强一致性:
- 定义:任何时刻读取的数据都是最新的
- 实现:分布式事务(2PC、3PC、Paxos)
- 代价:性能下降、可用性降低

最终一致性:
- 定义:经过一段时间后,数据会达到一致状态
- 实现:异步复制 + 补偿机制
- 收益:性能高、可用性高

为什么选择最终一致性?

成本分析:
强一致性:
- 每次写入需要2PC,延迟 > 100ms
- 任何一个参与方故障,整体不可用
- QPS:< 1万

最终一致性:
- 写入延迟:5-10ms
- 单点故障不影响整体
- QPS:> 100万

性价比:
最终一致性的性能是强一致性的 100倍
而不一致窗口只有几秒,用户基本无感知

选择:显然是最终一致性

不一致的容忍度


人类感知时间:
- < 100ms:无感知(实时)
- 100ms - 1s:轻微感知(可接受)
- 1s - 10s:明显感知(有点慢)
- > 10s:不可接受(太慢了)

社交场景的容忍度:
关注操作:
- 我关注了某人 → 立即能看到 ✓
- 对方的粉丝列表 → 1秒后更新 ✓(对方不会立即查看)
- 粉丝数统计 → 10秒后更新 ✓(统计数据允许延迟)

结论:
只要核心操作(自己的视角)是实时的
冗余数据(他人的视角)几秒延迟完全可接受

8.3 系统演进的规律

系统复杂度 = f(数据规模, 查询需求)


阶段1:单表(数据量 < 5000万)
复杂度:★☆☆☆☆
核心:SQL + 索引

阶段2:分库分表(数据量 5000万-10亿)
复杂度:★★★☆☆
核心:分片路由

阶段3:数据冗余(数据量 > 10亿,查询维度 > 1)
复杂度:★★★★☆
核心:异步复制 + 一致性保障

阶段4:多级缓存 + 实时计算(数据量 > 100亿,QPS > 100万)
复杂度:★★★★★
核心:缓存、消息队列、流式计算

规律:
复杂度不可避免地增加,但每一步都是被数据规模逼出来的
不要过度设计,够用就好

奥卡姆剃刀原则


"如无必要,勿增实体"

反例:
用户量只有10万,就引入:
- 1024个分库
- Kafka集群
- 实时核对系统
- 多级缓存

结果:
- 系统复杂,维护困难
- 成本高昂(服务器、人力)
- 收益极小(性能过剩)

正确做法:
1. 评估当前规模和未来1年增长
2. 选择满足需求的最简单方案
3. 预留扩展空间(如分库从64开始,而不是4)
4. 等问题出现再优化

8.4 设计决策框架

决策矩阵

数据量 查询维度 一致性要求 推荐方案
< 1000万 任意 任意 单表 + 索引
1000万-1亿 1个 任意 分库单表
1000万-1亿 多个 强一致 分库 + 同步冗余
1000万-1亿 多个 最终一致 分库 + 异步冗余
> 1亿 多个 最终一致 分库 + 异步冗余 + 实时核对
> 10亿 多个 最终一致 分库 + 异步冗余 + binlog同步

成本效益分析公式


总成本 = 开发成本 + 运维成本 + 资源成本

开发成本:
- 简单方案(单表):1人周
- 中等方案(分库+异步):2人月
- 复杂方案(全套):6人月

运维成本:
- 简单方案:1人维护
- 中等方案:2-3人维护
- 复杂方案:5+人团队

资源成本:
- 简单方案:1台数据库
- 中等方案:100台数据库 + MQ
- 复杂方案:1000台数据库 + MQ + 缓存 + ...

收益:
- 性能提升倍数
- 可支撑的用户规模

决策:
ROI = 收益 / 总成本
选择ROI最高的方案

总结:关键洞察

核心认知

数据冗余不是为了解决”查询慢”

单表 + 索引就能解决查询性能问题真正的问题是:单表容量瓶颈 → 必须分库 → 产生查询定位问题数据冗余是分库后的必然选择

分库的本质矛盾

一个分库键只能优化一个查询维度多查询维度 → 必须多份冗余或扇出查询扇出查询不可接受 → 只能选择冗余

最终一致性是分布式系统的必然选择

强一致性代价太大(性能、可用性)最终一致性通过补偿机制保证可靠性核心是缩短不一致窗口

系统设计要随业务演进

不要过度设计够用就好,预留扩展空间等问题出现再优化

设计模式


数据冗余三段论:
1. 为什么冗余?→ 分库后的查询定位问题
2. 怎么冗余?→ 根据一致性要求选择同步/异步
3. 如何保证一致?→ 三层保障机制(实时+增量+全量)

一致性保障三层防护:
L1: 实时核对(秒级)→ 覆盖95%问题
L2: 增量扫描(小时级)→ 覆盖4.9%问题
L3: 全量扫描(周级)→ 覆盖0.1%问题
综合:99.999%可靠性

实践建议


初创团队:
- 单表够用,不要盲目分库
- 关注业务增长,而不是技术炫技

成长团队:
- 提前规划分库(按2年增长预估)
- 选择异步冗余(性价比最高)
- 建立监控和告警

成熟团队:
- 完善一致性保障机制
- 多级缓存优化性能
- 持续优化架构

这篇文章的核心价值在于:揭示了百亿级关系链系统背后的底层规律,而不是简单地罗列技术方案。希望能帮助你真正理解分布式系统的设计本质。


第九部分:工程化补充与修订(实践更稳的落地指南)

本节对上文方案作工程化增强与勘误,便于直接落地到大规模生产环境。

9.1 分片与扩容

虚拟分片(virtual shards)与一致性哈希:避免 user_id % N 带来的全量重分布,支持平滑扩容与弹性缩容。热点拆分:对“大V/热点用户”采用 per-user buckets(如 user_id + bucket_id)横向切分,缓解单分片热点写入/读取放大。分片元数据中心:路由表动态下发(灰度/回滚/限流),支持按用户、按桶精细迁移。

9.2 冗余写入的可靠性(强烈建议 Outbox/CDC)

服务异步冗余的改进:使用 Outbox(事务外发表)+ 消费端幂等(去重键)替代“写主表+发MQ”的两步式,确保事件不丢失且可重放。CDC 方案说明:
MySQL 可用 Canal/Maxwell/Debezium;PostgreSQL 采用逻辑复制/Decoding(wal2json、pgoutput、Debezium PG Connector)。
去重与幂等:事件携带全局去重键(如 follower_id:followee_id:op:ts),消费端 UPSERT/ON CONFLICT DO NOTHING,保障多次投递不产生日志性脏写。

9.3 计数一致性

粉丝数/关注数建议单独计数表(或列)维护:写路径做幂等增减(基于事件去重),并提供定期校准任务(对照关系表重算,差异阈值内自动修正)。

9.4 查询与分页

使用基于游标的 seek-based pagination(如按 created_at,id 复合索引)替代 deep offset,提高大页性能与稳定性。明确排序语义(按关注时间/最近互动等),相应建立复合索引,避免回表与排序放大。

9.5 缓存与热点保护

多级缓存(进程内 + Redis)+ 订阅失效;对热点 Key 采用局部互斥/请求合并,使用负缓存降低穿透。对粉丝全量列表的缓存应分片或分段缓存,配合版本号/范围刷新,避免一次性大 Key 失效的抖动。

9.6 主键与索引

主键使用紧凑且有序的 ID(雪花/Time-UUID/ULID)以减少索引分裂;谨慎新增二级索引,关注索引体积与写放大;按查询模式设计复合索引(如 (follower_id, created_at) / (user_id, created_at))。

9.7 一致性检测与修复链路

实时核对建议基于“单源事件流”延迟校验,避免“双消息(指令 + 核对)”乱序;增量校验优先基于 CDC 流位置/事件时间窗口,而非仅依赖 last_modified_time 字段;全量校验的性能估算应以“行/秒吞吐 + I/O 带宽 + 并发度”做容量规划,避免过于理想的 CPU 级估算。

9.8 可靠性与 SLO(勘误)

上文关于“整体可靠性叠加趋近 100%”的表述过于理想化,实际链路存在相关性与共同失效域;建议以端到端 SLO 管理:
写入成功率(关注/取关)≥ 99.99%冗余完成延迟 P99 ≤ 若干秒不一致率 ≤ 0.01% 并在 T≤1h 内校准至 ≤ 0.001%修复任务的滞后/积压有硬阈值与自动降级(背压、限流、只保核心写)。

9.9 安全合规与治理

拉黑/隐私:写路径与读路径均应过滤封禁关系;账户删除/合规清理(GDPR 等):提供级联清除与异步彻底擦除;写入限流与风控:针对异常批量关注行为做速率限制与熔断;审计与可观测:事件全链路追踪(trace_id)、积压/失败可见、可重放工具链。

9.10 演进与回放能力

以可回放事件日志作为“事实来源”,任何冗余表(粉丝维度、索引表、缓存)都能从事件流快速重建;灰度迁移:虚拟分片与 per-user bucket 级别的细粒度迁移/回滚,配合双写、对账与切换窗口监控。

以上补充能在不改变核心思路的前提下,显著提升可用性、可扩展性与可运维性,使方案更贴近实际大规模生产环境。


第十部分:本地验证与实验指南(含实测结果)

本项目提供可在本机快速验证“异步冗余与双表冗余收益”的实验工具,支持调节规模与并发,并输出 p50/p95/p99 与复制落地延迟、积压峰值等指标。

10.1 端到端基准(异步 vs 同步双写 + 查询)

启动数据库:


docker compose up -d postgres

运行(可调规模与并发):


# N: 关注操作次数;CONC: 并发;PAGE: 查询页大小
N=20000 CONC=8 PAGE=50 go run cmd/relbench/main.go

输出示例字段说明:

Async follow latency: 主路径(关注主表写 + 入队)延迟的总时长/均值及 p50/p95/p99Sync (2 writes): 同步双写总时长/均值Query fans/following: 单次分页查询延迟Replication landing: 复制落地耗时分位、队列峰值(maxQueue)、排空时长(drain)

本地一次实测结果(N=20000,CONC=1,PAGE=50,Apple M3 Max + Docker Postgres):


Async follow latency total: 3.891583667s, per op: 194.579µs
Sync (2 writes) total: 6.060609417s, per op: 303.03µs
Query fans(50) latency: 614µs
Query following(50) latency: 835.542µs

解读:

异步冗余将关键路径写延迟较同步双写下降约 35%(本地环境下),趋势符合“写一次 + 入队”优于“双写”的预期;双表冗余使粉丝/关注两类查询均为单库命中,延迟处于子毫秒级(本地);注意该结果为本地小规模样例,用于验证相对收益与方向性,非容量上限评估。

10.2 扇出查询对比(单库单查 vs 64 分片扇出)

运行:


# SHARDS: 扇出分片数;REPEAT: 重复次数
SHARDS=64 REPEAT=100 go run cmd/fanoutbench/main.go

说明:

单次查询:针对单分片(单库)的一次命中;扇出查询:并发请求 64 个分片(模拟 64 库),测量整体完成时间;该实验旨在直观展示“多分片扇出”的额外开销,支持快速本地演示扇出的不可接受性。

一次本地实测结果(SHARDS=64,REPEAT=100):


Single-shard query: avg=91.655µs p95=141.375µs p99=592.791µs
Fan-out 64-shard queries: avg=45.164845ms p95=64.492708ms p99=103.007667ms

解读:单分片查询处于百微秒量级;64 分片扇出后,平均延迟上升到数十毫秒,p99 超 100ms,直观体现了扇出带来的网络与并发开销,验证“双表冗余+单分片命中”的必要性。

10.4 环境说明(PostgreSQL)

本项目默认数据库为 PostgreSQL(docker-compose 使用 postgres:15-alpine)。文中关于 MySQL binlog/Canal/Maxwell 的描述用于说明思路;在 PostgreSQL 上应使用逻辑复制/Decoding:如 wal2json、pgoutput 或 Debezium PostgreSQL Connector(详见第 9.2 节“CDC 方案说明”)。配置在
config/config.yaml
,端到端与基准工具均以 Postgres 为默认目标环境。

10.3 代码仓库

项目地址:

https://github.com/d60-Lab/RelationGraph

有兴趣的同学可按 10.1/10.2 的命令自行复现,并调整 N/CONC/SHARDS 等参数进行扩展测试。

© 版权声明

相关文章

暂无评论

none
暂无评论...