Flink RowKind 详解
总结
- 更新时间:2026-04-29
- RowKind 是 Flink 流式处理中描述"一行数据发生了什么变化"的标记,每条流记录除字段值外都携带一个 RowKind
- 四种类型:
+I(插入)、-U(更新前旧值)、+U(更新后新值)、-D(删除) -U是流式算子实现"修改"语义的核心机制,聚合/JOIN 等算子必须依赖它来撤回旧值- 三种 Changelog 模式:insert-only(仅
+I)、upsert(无-U)、all(完整 changelog) - 写入时 RowKind 来自上游 Source(Kafka/CDC),读取时 RowKind 由 Paimon 的
changelog-producer决定,两者相互独立 input模式是唯一透传上游 RowKind 的模式,上游为 CDC 时才有意义
一、四种 RowKind 的含义
| 符号 | 英文全称 | 中文 | 含义 |
|---|---|---|---|
+I |
INSERT | 插入 | 新增一行 |
-U |
UPDATE_BEFORE | 更新前镜像 | 更新前的旧值,用于撤回 |
+U |
UPDATE_AFTER | 更新后镜像 | 更新后的新值 |
-D |
DELETE | 删除 | 删除一行 |
一次 UPDATE 操作在流上拆成两条消息:先发 -U(旧值),再发 +U(新值)。这是因为流式算子无法像数据库一样"原地修改",必须先撤回旧值再插入新值。
二、为什么需要 -U
以 COUNT 聚合为例:
初始状态:COUNT = 0
收到 +I(name=Alice, age=20) → COUNT = 1
收到 -U(name=Alice, age=20) → COUNT = 0 ← 撤回旧值
收到 +U(name=Alice, age=25) → COUNT = 1 ← 插入新值
如果没有 -U,算子收到 +U 时不知道要撤回什么,COUNT 就会变成 2(错误)。
-U 本质上是流式算子实现"修改"语义的机制。
三、三种 Changelog 模式
| 模式 | 包含的 RowKind | 说明 |
|---|---|---|
| insert-only | 仅 +I |
纯追加流,无修改删除 |
| upsert | +I +U -D |
缺少 -U,靠主键隐含旧值 |
| all(完整 changelog) | +I -U +U -D |
完整变更流 |
四、RowKind 的产生来源
4.1 数据库 CDC(最原始的来源)
MySQL Binlog、PostgreSQL WAL 等记录了数据库的每次变更,CDC 框架(Flink CDC、Debezium)将其解析为 RowKind:
MySQL INSERT → 读 Binlog → +I(新行)
MySQL UPDATE → 读 Binlog → -U(旧行) + +U(新行)
MySQL DELETE → 读 Binlog → -D(旧行)
4.2 Paimon 各 changelog-producer 模式(读取时产生)
| 模式 | 产出 RowKind | 原理 |
|---|---|---|
none(默认) |
+I +U -D |
读 delta 文件,无旧值,缺 -U |
input |
透传写入侧的 RowKind | 写入时是什么就是什么,上游是 CDC 才有意义 |
lookup |
+I -U +U -D |
写入时 lookup 旧值,现场构造 -U |
full-compaction |
+I -U +U -D |
两次 compaction 之间做 diff 对比生成 |
4.3 Flink ChangelogNormalize 算子(补偿机制)
当 source 只能产出 upsert 流(有 +U 无 -U)但下游需要完整 changelog 时,Flink planner 自动插入 ChangelogNormalize:
state: {k=1 → v=A}
收到 +U(k=1, v=B)
↓ 查 state,得到旧值 A
↓ 输出 -U(k=1, v=A) ← 补出的
↓ 输出 +U(k=1, v=B)
↓ 更新 state: {k=1 → v=B}
ChangelogNormalize 代价大的原因:必须为每个主键维护一份最新值的 state。
五、写入时 vs 读取时 RowKind(重要区分)
写入时的 RowKind 和读取时的 RowKind 是两回事,相互独立。
- 写入时:RowKind 来自上游(Kafka / CDC Source),决定 Paimon 存什么
- 读取时:RowKind 由 Paimon 的
changelog-producer决定,和上游无关(input模式除外)
| 上游来源 | changelog-producer |
读取时 RowKind 来源 |
|---|---|---|
| Kafka 普通格式 | none |
Paimon 自己产生(upsert 流,无 -U) |
| Kafka 普通格式 | lookup |
Paimon 自己产生(lookup 补出 -U) |
| CDC(完整 changelog) | none |
Paimon 自己产生(upsert 流,原始 -U 丢失) |
| CDC(完整 changelog) | input |
透传 CDC 的 RowKind,保留完整 changelog |
| CDC(完整 changelog) | lookup |
Paimon 自己产生(不依赖上游) |
结论:
- 上游是 Kafka(普通格式):写入全是
+I,读取时由 Paimon 按changelog-producer产生 - 上游是 CDC:写入有完整 RowKind,但读取时除非用
input模式,否则仍由 Paimon 自己产生 input是唯一"透传"上游 RowKind 的模式
六、完整流程图
MySQL Binlog
│
▼
Flink CDC Source (+I / -U / +U / -D) ← 完整 changelog
│
▼
Paimon 写入 (changelog-producer=none)
│ 只保存最终状态,丢弃 -U
▼
Paimon 流读 (+I / +U / -D) ← upsert 流,缺 -U
│
├─ 下游是聚合/JOIN
│ │
│ ▼
│ ChangelogNormalize ← 补出 -U,代价是维护 state
│ │
│ ▼
│ 完整 changelog (+I / -U / +U / -D)
│
└─ 下游是 SR/Paimon 主键表(upsert sink)
│ 不需要 -U,可设置 scan.remove-normalize=true 跳过
▼
直接 upsert 写入
七、实践建议
| 场景 | 推荐 changelog-producer | 是否需要 ChangelogNormalize |
|---|---|---|
| 下游写 Paimon/SR/JDBC 主键表 | none + scan.remove-normalize=true |
不需要 |
| 下游做聚合/JOIN | lookup |
不需要(源头就有完整 changelog) |
| 上游是 CDC,需透传变更 | input |
不需要 |
| 下游做聚合,且写入延迟可接受 | full-compaction |
不需要 |