Paimon Compaction 详解
总结
- 更新时间:2025-12-25
- 适用版本:Apache Paimon 1.3+(包含 0.8 vs 1.3 版本差异对比)
核心概念
- Compaction 核心作用:
- 主键表:合并 LSM Tree 中的 Sorted Runs,防止查询性能下降和 OOM 问题
- Append-Only 表:合并小文件,提高读取性能(不涉及数据合并,只做文件拼接)
- Compaction 类型:
- 主键表 Compaction(基于 LSM Tree):
- Minor Compaction:选择性合并部分文件,不更新
$ro表,适合写密集型场景 - Full Compaction:合并所有文件到 maxLevel 单层,更新
$ro表,适合读密集型场景 - Lookup Compaction(1.3 新增):Radical(激进)和 Gentle(温和)两种模式
- Sort Compaction(1.3 新增):Z-Order/Hilbert 曲线优化数据布局
- Minor Compaction:选择性合并部分文件,不更新
- Append-Only 表 Compaction(基于文件数量):
- Auto Compaction:后台异步合并小文件,达到 target-file-size 时触发
- Full Compaction:合并所有小文件(至少需要 3 个文件)
- 主键表 Compaction(基于 LSM Tree):
- Read-Optimized 表(
$ro):只有 Full Compaction 才会更新,提供零合并成本的读优化性能 - ⚠️ 重要澄清(源码验证):
- Full Compaction 和 Minor Compaction 是两条完全独立的代码路径
- Minor Compaction 中的 Size Amplification 触发的"全量合并"不是真正的 Full Compaction
- 只有
fullCompaction=true时才会调用CompactStrategy.pickFullCompaction()
关键配置参数
主键表:
num-sorted-run.compaction-trigger(默认 5):触发 Minor Compaction 的文件数阈值num-sorted-run.stop-trigger(默认 trigger+3):等待压缩完成的文件数阈值(背压机制,非完全阻塞)full-compaction.delta-commits:每 N 次提交触发 Full Compactioncompaction.optimization-interval:时间间隔触发 Full Compaction,确保$ro表时效性- 配置独立性:Minor 和 Full Compaction 配置互不影响,推荐同时配置以平衡性能
Append-Only 表:
write-only(默认 false):是否禁用 compaction(true 时使用 NoopCompactManager)compaction.min.file-num(默认 5):触发 compaction 的最小文件数compaction.max.file-num(默认 50):触发 compaction 的最大文件数target-file-size(默认 128MB):目标文件大小- 关键特性:Append-Only 表的 compaction 不涉及数据合并,只是文件拼接,按 sequence number 保持追加顺序
专用压缩作业与分布式锁
- 专用压缩作业:多写入者场景的标准解决方案,设置
write-only=true- ✅ 写入作业使用
NoopCompactManager,完全跳过压缩逻辑(源码验证) - ✅ 压缩参数在建表 DDL 中配置,专用作业读取生效
- ⚠️
num-sorted-run.stop-trigger完全失效:写入作业永不等待,无法背压(源码验证) - ⚠️ 风险:若压缩速度 < 写入速度,文件数会无限增长(需监控和人工干预)
- ✅ 写入作业使用
- 专用压缩作业高级参数(Paimon 1.0+):
--compact_strategy:控制压缩策略(minor或full),默认由运行模式决定--partition_idle_time:只压缩闲置超过指定时长的分区(如7d)- ⚠️ 仅支持 Batch 模式,Streaming 模式会抛异常(源码验证)
- ⚠️ 不能与
--order_strategy同时使用
- S3 等对象存储:
- 多作业并发压缩:必须启用
lock.enabled=true(RENAME 非原子性) - 专用压缩作业:通常不需要启用锁(单例运行,无并发冲突)
- 多作业并发压缩:必须启用
- ⚠️ 生产案例警示:启用 Hive 锁可能导致锁超时和 L0 文件膨胀(真实案例:90+ 万文件)
- lock.enabled 默认值:Paimon 0.8/0.9 默认
false,Hive Metastore 不会自动启用
性能调优与最佳实践
- 通用推荐(1.3):MOW 模式 + Gentle Lookup + 默认 Trigger 值 + 异步快照过期
- 写密集型:
stop-trigger=MAX_VALUE+sort-spill=10+lookup-wait=false - 读密集型:COW 模式 +
full-compaction.delta-commits=1 - 多写入者:
write-only=true+ 专用压缩作业(Combined 模式) - 版本升级:0.8 → 1.3 可获得 20%-50% 性能提升
一、Compaction 基础概念
1.1 什么是 Compaction
Paimon 使用 LSM Tree(Log-Structured Merge-Tree) 架构存储数据。在 LSM Tree 中:
- 数据写入时首先写入内存,然后 flush 到磁盘形成 Sorted Run(有序文件)
- 随着写入增加,Sorted Run 数量不断增长
- 查询时需要合并多个 Sorted Run 的数据,文件数越多,查询性能越差
- Compaction(压缩) 就是将多个 Sorted Run 合并成一个更大的 Sorted Run 的过程
1.2 为什么需要 Compaction
Compaction 的主要目的包括:
-
维持查询性能
- 减少查询时需要读取的文件数
- 降低 I/O 开销和内存使用
-
防止 OOM(内存溢出)
- 过多的 Sorted Run 需要同时打开大量文件读取器
- 可能导致内存不足
- OOM 风险的本质:
- LSM Tree 架构特性:MOR 模式在读取时需要在内存中合并所有 Sorted Runs
- 文件数爆炸:不执行 Compaction → Sorted Run 数量无限增长
- 内存线性增长:文件数 × (文件句柄 + 读取缓冲区 + Reader 对象 + 合并计算内存)
- 查询触发 OOM:单个查询可能需要打开成千上万个文件,导致内存耗尽
-
生成 Changelog
- 通过合并生成变更日志(changelog-producer)
-
删除过期数据
- 清理超过保留时间的记录(record-level expiration)
-
生成删除向量
- 为 MOW 模式生成 Deletion Vectors
1.3 LSM Tree 架构与 Universal Compaction
Paimon 使用类似 RocksDB 的 Universal Compaction 策略。以下基于源码 UniversalCompaction.java 的 createUnit 方法进行可视化说明。
LSM Tree 层级结构与 Compaction 流程
流程图关键说明:
-
Full Compaction 和 Minor Compaction 是两条独立的代码路径(源码: MergeTreeCompactManager.java:127-146)
- Full Compaction:
fullCompaction = true→ 调用CompactStrategy.pickFullCompaction()→ 无条件合并所有文件 - Minor Compaction:
fullCompaction = false→ 调用strategy.pick()→ 根据 5 个条件决策
- Full Compaction:
-
Minor Compaction 中的"全量合并"不是真正的 Full Compaction
- Size Amplification 超标时会选中所有文件,但仍是 Minor Compaction
- 区别:Full Compaction 有额外的优化逻辑(如 deletion vector 清理、record-level-expire 等)
-
触发方式对应关系
compaction.optimization-interval→ Minor Compaction 的条件 1(opCompactionInterval)full-compaction.delta-commits→ 强制 Full Compaction(fullCompaction=true)CALL sys.compact()→ 强制 Full Compaction(fullCompaction=true)- 文件数达到
compaction-trigger→ Minor Compaction 的条件 4
源码关键逻辑说明(UniversalCompaction.java L179-206)
CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
int outputLevel;
// 【关键判断 1】如果合并所有文件(Full Compaction)
if (runCount == runs.size()) {
outputLevel = maxLevel; // ← 输出到最高层级
} else {
// Minor Compaction:输出到下一个 run 的层级 - 1
outputLevel = Math.max(0, runs.get(runCount).level() - 1);
}
// 【关键判断 2】避免输出到 L0
if (outputLevel == 0) {
// L0 专用于新写入文件,压缩结果不应输出到 L0
for (int i = runCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
runCount++;
if (next.level() != 0) {
outputLevel = next.level();
break;
}
}
}
// 【关键判断 3】再次确认全量压缩
if (runCount == runs.size()) {
updateLastOptimizedCompaction();
outputLevel = maxLevel; // ← 确保全量压缩输出到 maxLevel
}
return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));
}
层级结构对比
| 状态 | L0 | L1 | L2 (maxLevel) | 查询方式 | Read-Optimized | 文件总数 |
|---|---|---|---|---|---|---|
| 初始写入(无压缩) | file1, file2, file3, file4, file5 | 空 | 空 | 读取 L0 所有文件 | ❌ 否 | 5 |
| Minor Compaction 后 | new_file(不影响继续写入) | merged_L1 | merged_L2 | 合并 L0+L1+L2 | ❌ 否 | 3 |
| Full Compaction 后 | 空 | 空 | full_merged | 直接读取 L2 | ✅ 是 | 1 |
演化说明:
- 阶段 1 → 阶段 2:
- 触发 Minor Compaction(
compaction-trigger=5) - file1-file5 中部分文件合并到 L1 →
merged_L1 - 继续压缩推进到 L2 →
merged_L2 - 新写入的文件继续进入 L0 →
new_file - 结果:文件从 5 个减少到 3 个,但分散在 3 个层级,查询时需多层合并
- 触发 Minor Compaction(
- 阶段 2 → 阶段 3:
- 触发 Full Compaction(
optimization-interval或delta-commits) - 合并所有层级(L0 + L1 + L2)的所有文件到 maxLevel
- 结果:实现单层单文件结构,可直接读取无需合并
- 触发 Full Compaction(
关键特点总结
-
Minor Compaction:
- 选择性合并部分文件(
runCount < runs.size()) - 输出层级:
outputLevel = Math.max(0, runs.get(runCount).level() - 1) - 逐步向高层推进,形成多层结构
- 选择性合并部分文件(
-
Full Compaction:
- 合并所有层级的所有文件(
runCount == runs.size()) - 输出层级:
outputLevel = maxLevel(源码明确定义) - 清空 L0、L1,所有数据集中在 maxLevel
- 避免输出到 L0(L0 专用于新写入)
- 合并所有层级的所有文件(
-
触发条件:
- Minor:文件数达到
num-sorted-run.compaction-trigger - Full:提交次数达到
full-compaction.delta-commits或时间间隔compaction.optimization-interval
- Minor:文件数达到
-
Read-Optimized 更新:
- 只有 Full Compaction 生成单层结构(仅 maxLevel 有数据)
- 才会更新
$ro系统表
二、Compaction 类型详解
2.1 Minor Compaction(增量压缩)
定义:选择性地合并部分文件,而不是全部文件。
特点:
- 写入影响小,适合流式场景
- 合并速度快
- 文件数量逐步减少
- 默认用于 Flink 流式作业
适用场景:
- 持续写入的流式作业
- 对写入性能要求高的场景
- 不需要频繁全表扫描的表
配置示例:
-- 使用 Minor Compaction(默认行为)
CREATE TABLE minor_compact_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
update_time TIMESTAMP(3)
) TBLPROPERTIES (
'num-sorted-run.compaction-trigger' = '5', -- 5个文件触发压缩
'num-sorted-run.stop-trigger' = '10' -- 10个文件暂停写入
);
2.2 Full Compaction(全量压缩)
定义:将所有层级的文件合并成单层的连续文件。
特点:
- 彻底减少文件数(理想情况下合并为 1 个文件)
- 写入性能影响大
- 查询性能最优(无需读时合并)
- 默认用于 Spark 批处理作业
- 更新 Read-Optimized 系统表(
$ro)
适用场景:
- 批处理作业
- 读取密集型表
- COW 模式表
- 定期维护作业
- 需要零合并成本的读优化查询
配置方式:
方式一:每次写入后全量压缩(COW 模式)
CREATE TABLE cow_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'full-compaction.delta-commits' = '1' -- 每次提交后触发全量压缩
);
方式二:定期优化压缩(推荐用于 $ro 表更新)
CREATE TABLE periodic_compact_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'compaction.optimization-interval' = '1 h' -- 每小时执行一次优化压缩,更新 $ro 表
);
重要说明:
compaction.optimization-interval的主要作用是确保 Read-Optimized 系统表($ro)的查询时效性- 只有 Full Compaction 后,数据才会被包含在
$ro表中 - 如果不配置定期 Full Compaction,
$ro表可能包含旧数据
方式三:基于文件总大小触发
CREATE TABLE size_based_compact_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'compaction.total-size-threshold' = '100 MB' -- 总大小小于100MB时触发全量压缩
);
2.3 Lookup Compaction(查找压缩)
定义:针对主键表的特殊压缩策略,应用于以下场景:
- Lookup changelog producer 表
- First-row merge engine 表
- MOW 模式(使用 deletion vectors)的表
2.3.1 Radical 模式(激进模式,默认)
特点:
- 强制将 L0 文件立即提升到更高层级
- 数据新鲜度高
- 资源消耗大
配置:
CREATE TABLE radical_lookup_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'changelog-producer' = 'lookup',
'lookup-compact' = 'RADICAL', -- 默认值,可省略
'lookup-wait' = 'true' -- 等待 lookup compaction 完成
);
2.3.2 Gentle 模式(温和模式)
特点:
- 使用 Universal Compaction 策略
- 资源消耗较小
- 数据新鲜度相对较低
- 可配置最大间隔时间
配置:
CREATE TABLE gentle_lookup_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'changelog-producer' = 'lookup',
'lookup-compact' = 'GENTLE',
'lookup-compact.max-interval' = '60' -- 最多60次commit必须执行一次L0压缩
);
2.4 Append-Only 表 Compaction(文件合并)
定义:Append-Only 表的 Compaction 与主键表完全不同,它不涉及数据合并,只是将多个小文件拼接成大文件,以提高读取性能。
2.4.1 核心特点
与主键表 Compaction 的本质区别:
| 特性 | Append-Only Compaction | 主键表 LSM Compaction |
|---|---|---|
| 目的 | 小文件合并,提高读性能 | 数据去重、合并更新、删除旧版本 |
| 是否合并数据 | ❌ 不合并,只是文件拼接 | ✅ 合并相同 key 的多版本数据 |
| 数据顺序 | ✅ 严格保持追加顺序(按 sequence number) | ⚠️ 按 key 排序,破坏原始顺序 |
| 复杂度 | 低(顺序读写) | 高(需要 merge 算法) |
| 性能开销 | 小 | 大 |
| 默认启用 | ✅ 是 | ✅ 是 |
| 禁用方式 | 'write-only' = 'true' |
'write-only' = 'true' |
源码验证(基于 AppendOnlyFileStoreWrite.java:127-139):
CompactManager compactManager =
skipCompaction
? new NoopCompactManager() // write-only=true 时禁用
: new AppendOnlyCompactManager(...); // 默认启用
2.4.2 Compaction 触发条件
根据 AppendOnlyCompactManager.java:166-193,触发条件为:
方式一:Auto Compaction(自动压缩)
// 触发条件(满足其一即可)
if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
|| fileNum >= maxFileNum) {
// 触发 compaction
}
- 条件 1:
总文件大小 >= target-file-size且文件数 >= compaction.min.file-num - 条件 2:
文件数 >= compaction.max.file-num
方式二:Full Compaction(全量压缩)
- 合并所有小文件(跳过已达到 target-file-size 的大文件)
- 至少需要 3 个文件才触发
- 仅在小文件数量多于大文件时执行
2.4.3 Compaction 处理逻辑
核心流程(源码:AppendOnlyFileStoreWrite.java:163-188):
public AppendOnlyCompactManager.CompactRewriter compactRewriter(...) {
return toCompact -> {
RowDataRollingFileWriter rewriter = new RowDataRollingFileWriter(...);
// 按 sequence number 顺序读取,直接写入新文件
rewriter.write(bucketReader(partition, bucket).read(toCompact));
return rewriter.result();
};
}
关键特性:
- ✅ 按 sequence number 顺序读取(保证追加顺序)
- ✅ 不做数据合并计算(没有 key 去重或聚合)
- ✅ 简单的数据复制:多个小文件 → 一个大文件
2.4.4 流式模式的 Compaction 拓扑
根据 UnawareBucketSink.java:64-78:
boolean enableCompaction = !table.coreOptions().writeOnly();
if (enableCompaction && isStreamingMode && !boundedInput) {
// 添加独立的 compaction 拓扑
UnawareBucketCompactionTopoBuilder builder = ...;
written = written.union(builder.fetchUncommitted(initialCommitUser));
}
拓扑结构:
┌─────────────────┐
│ Input Stream │
└────────┬────────┘
│
┌─────────────┴─────────────┐
▼ ▼
┌─────────────┐ ┌──────────────────┐
│ Writers │ │ Compaction Tasks │
│ (append) │ │ (merge files) │
└──────┬──────┘ └─────────┬────────┘
│ │
└──────────┬───────────────┘
▼
┌─────────────────┐
│ Committer │
└─────────────────┘
2.4.5 配置示例
启用 Compaction(默认):
CREATE TABLE logs (
log_time TIMESTAMP(3),
user_id BIGINT,
message STRING
) TBLPROPERTIES (
'bucket' = '-1', -- UNAWARE 模式
'write-only' = 'false', -- 启用 compaction(默认)
'compaction.min.file-num' = '5', -- 最小文件数
'compaction.max.file-num' = '50', -- 最大文件数
'target-file-size' = '128MB' -- 目标文件大小
);
禁用 Compaction(极高吞吐场景):
CREATE TABLE high_throughput_logs (
event_time TIMESTAMP(3),
event_data STRING
) TBLPROPERTIES (
'bucket' = '-1',
'write-only' = 'true' -- 禁用 compaction,追求极致写入性能
);
2.4.6 适用场景
✅ 推荐启用 Compaction:
- 日志收集、事件流存储
- 数据写入后会被查询
- 需要减少小文件数量
- HDFS NameNode 压力大
⚠️ 考虑禁用 Compaction:
- 极高吞吐写入(每秒数百万条)
- 数据写入后几乎不读取
- 追求极致写入性能
- 可接受大量小文件
2.4.7 监控建议
关键指标:
- 文件数量:
SELECT COUNT(*) FROM table$files WHERE bucket = 0 - 平均文件大小:
SELECT AVG(file_size_in_bytes) FROM table$files - Compaction 频率:查看 Flink Metrics
告警阈值:
- 单 bucket 文件数 > 1000(警告)
- 单 bucket 文件数 > 10000(严重)
- 平均文件大小 < 10MB(说明 compaction 不足)
2.5 Sort Compaction(排序压缩)
定义:在合并文件时,根据指定的列对数据重新排序,从而优化数据的物理布局,提升查询性能。
适用场景:
- 数据插入是无序的
- 需要优化范围查询性能
- 需要提升数据局部性
2.4.1 排序策略
Order(普通排序)
按指定列顺序排序:
CREATE TABLE order_sorted_table (
id BIGINT,
region STRING,
city STRING,
amount DECIMAL(10,2),
create_time TIMESTAMP(3)
) TBLPROPERTIES (
'bucket' = '4',
'sort-compaction' = 'true',
'sort-compaction.order-by' = 'region,city' -- 先按region,再按city排序
);
Z-Order(多维索引)
使用 Z-Order 曲线进行多维排序:
CREATE TABLE zorder_table (
id BIGINT,
region STRING,
city STRING,
amount DECIMAL(10,2)
) TBLPROPERTIES (
'bucket' = '4',
'sort-compaction' = 'true',
'sort-compaction.zorder-by' = 'region,city,amount'
);
优势:
- 多维查询性能好
- 适合 WHERE 条件包含多个列的场景
Hilbert(希尔伯特曲线)
使用 Hilbert 曲线进行排序:
CREATE TABLE hilbert_table (
id BIGINT,
x DOUBLE,
y DOUBLE
) TBLPROPERTIES (
'bucket' = '4',
'sort-compaction' = 'true',
'sort-compaction.hilbert-by' = 'x,y'
);
优势:
- 空间局部性更好
- 适合地理空间数据
2.4.2 使用 Action Jar 执行 Sort Compaction
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--table my_table \
--order_strategy zorder \
--order_by region,city,amount
支持的排序策略:
order:普通排序zorder:Z-Order 排序hilbert:Hilbert 排序
三、Read-Optimized 表($ro)详解
3.1 什么是 $ro 表
$ro(Read-Optimized)表是 Paimon 提供的一个系统表,它是数据表的一个特殊视图:
- 只包含已完成 Full Compaction 的数据
- 查询时无需合并多个文件,直接读取即可
- 提供零合并成本(即无需在内存中合并多个文件,直接读取单层文件即可)的读优化性能
查询方式:
-- 假设原表名为 my_table
SELECT * FROM my_table$ro WHERE id = 123;
3.2 为什么 Minor Compaction 不更新 $ro 表
核心原因:Minor Compaction 采用 Universal Compaction 策略,数据仍然分散在多个层级(L0、L1、L2...),查询时需要在内存中合并多层文件,无法提供"零合并成本"的读优化体验。
详细的 LSM Tree 层级结构说明,请参见 第一章 1.3 节。
关键区别总结:
| Compaction 类型 | 层级结构 | 查询方式 | 是否 Read-Optimized |
|---|---|---|---|
| Minor Compaction | 多层(L0+L1+L2...) | 需要内存合并多层文件 | ❌ 否 |
| Full Compaction | 单层(仅 maxLevel) | 直接读取单层文件 | ✅ 是 |
3.3 $ro 表的更新时机
只有 Full Compaction 才会更新 $ro 表:
| Compaction 类型 | 更新 $ro 表 |
原因 |
|---|---|---|
| Minor Compaction | ❌ 不更新 | 数据仍在多层,需要读时合并 |
| Full Compaction | ✅ 更新 | 数据合并到单层,无需读时合并 |
官方文档说明:
compaction.optimization-interval: Implying how often to perform an optimization full compaction, this configuration is used to ensure the query timeliness of the read-optimized system table.
翻译:该配置用于确保 Read-Optimized 系统表的查询时效性。
3.4 配置示例与性能权衡
场景 1:高时效性 $ro 表(频繁 Full Compaction)
CREATE TABLE high_timeliness_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING,
update_time TIMESTAMP(3)
) TBLPROPERTIES (
'compaction.optimization-interval' = '30 min' -- 每30分钟更新 $ro 表
);
特点:
- ✅
$ro表数据新鲜度高(最多延迟30分钟) - ❌ 写入性能下降(频繁 Full Compaction)
- 适合:读密集型 + 需要实时 Read-Optimized 查询
场景 2:低时效性 $ro 表(不频繁 Full Compaction)
CREATE TABLE low_timeliness_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING,
update_time TIMESTAMP(3)
) TBLPROPERTIES (
'compaction.optimization-interval' = '6 h' -- 每6小时更新 $ro 表
);
特点:
- ✅ 写入性能好(Full Compaction 不频繁)
- ❌
$ro表数据可能延迟6小时 - 适合:写密集型 +
$ro表用于离线分析
场景 3:不配置 Full Compaction(仅 Minor Compaction)
CREATE TABLE minor_only_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'num-sorted-run.compaction-trigger' = '5' -- 仅配置 Minor Compaction
-- 未配置 compaction.optimization-interval
);
特点:
- ✅ 写入性能最优
- ❌
$ro表永远不会更新(或极少更新) - ❌ 查询主表时需要合并多层文件
- 适合:写密集型 + 不使用
$ro表
3.5 实际应用建议
何时使用 $ro 表?
推荐场景:
- ✅ 分析查询(BI 工具、报表)
- ✅ 批处理读取(Spark 批量扫描)
- ✅ 对查询延迟敏感的场景
- ✅ 读多写少的表
不推荐场景:
- ❌ 实时查询(主表更实时)
- ❌ 写入密集型表(Full Compaction 开销大)
- ❌ 需要最新数据(
$ro表有延迟)
配置建议
平衡配置(推荐):
CREATE TABLE balanced_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'num-sorted-run.compaction-trigger' = '5', -- Minor Compaction:快速写入
'compaction.optimization-interval' = '2 h' -- Full Compaction:定期更新 $ro 表
);
监控 $ro 表时效性:
-- 查看最新 snapshot 的提交时间
SELECT snapshot_id, commit_time
FROM my_table$snapshots
ORDER BY snapshot_id DESC
LIMIT 1;
-- 查看 $ro 表的数据范围
SELECT MIN(create_time), MAX(create_time)
FROM my_table$ro;
3.6 常见误解
❌ 误解 1:Minor Compaction 也会更新 $ro 表
- 真相:只有 Full Compaction 才会更新
❌ 误解 2:$ro 表自动包含所有数据
- 真相:只包含已完成 Full Compaction 的数据
❌ 误解 3:查询 $ro 表和主表结果一样
- 真相:
$ro表可能缺少最新写入的数据
✅ 正确理解:
$ro表是主表的子集- 提供零合并成本的读优化性能
- 通过
compaction.optimization-interval控制时效性 - 性能与时效性需要权衡
四、专用压缩作业(Dedicated Compaction Job)
4.1 为什么需要专用压缩作业
问题场景:
- 多个 Flink 作业同时写入同一张表
- 如果每个作业都执行压缩,会产生冲突导致作业失败
冲突原因:
- 同一分区只能有一个作业执行压缩
- 并发压缩会导致文件冲突和快照冲突
解决方案:
- 写入作业只负责写入(
write-only = true) - 单独启动一个专用压缩作业负责压缩
4.2 ⚠️ S3 等对象存储的重要注意事项
官方文档重要警告:
For S3-like object store, its 'RENAME' does not have atomic semantic. We need to configure Hive metastore and enable 'lock.enabled' option for the catalog.
问题说明
S3 等对象存储的限制:
- S3、阿里云 OSS、华为云 OBS 等对象存储不支持原子性的 RENAME 操作
- 文件重命名在对象存储中实际是"复制+删除"操作
- 在并发压缩场景下,可能导致数据不一致或作业失败
必须的配置
对于 S3/OSS/OBS 等对象存储,必须启用分布式锁:
使用 Hive Metastore + 锁
-- 创建 Catalog 时启用锁
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hive-metastore:9083',
'warehouse' = 's3://my-bucket/warehouse',
'lock.enabled' = 'true' -- ⚠️ 必须启用!
);
-- 创建表
CREATE TABLE my_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'write-only' = 'true' -- 多写入者场景
);
不启用锁的风险
场景:多个作业同时写入 S3 上的 Paimon 表,未启用锁
作业 A:正在执行 Compaction
├─ 步骤 1:读取文件 [file1, file2, file3]
├─ 步骤 2:合并为 merged_file
└─ 步骤 3:准备提交快照(RENAME 临时文件)
↓
作业 B:同时也在执行 Compaction
├─ 步骤 1:读取文件 [file1, file2, file4](部分重叠!)
├─ 步骤 2:合并为 merged_file2
└─ 步骤 3:准备提交快照
↓
结果:文件冲突、数据丢失、作业失败 ❌
启用锁后的行为
作业 A:获取锁 → 执行 Compaction → 提交快照 → 释放锁 ✅
↓
作业 B:等待锁 → 获取锁 → 执行 Compaction → 提交快照 → 释放锁 ✅
↓
结果:串行执行,避免冲突 ✅
重要说明:
- 多作业并发压缩:多个作业同时执行压缩,S3 等对象存储必须启用锁
- 专用压缩作业:只有一个专用压缩作业 + 写入作业设置
write-only=true,通常不需要锁- ✅ 架构本身保证了无并发冲突
- ✅ 避免 Hive Metastore 锁的复杂性和稳定性问题
- ⚠️ 但需要确保真的只有一个压缩作业运行
常见错误速查
| 错误 | 解决方案 |
|---|---|
| ConflictException: Concurrent compaction | 启用 lock.enabled=true |
| Connection refused (Metastore) | 检查 Metastore 服务和网络 |
| Lock timeout | 增加 lock.timeout=120000 |
4.3 配置写入表为只写模式
CREATE TABLE multi_writer_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'write-only' = 'true' -- 跳过压缩和快照过期
);
重要说明:
- ✅
write-only=true时,写入作业完全跳过压缩逻辑(源码使用NoopCompactManager) - ✅ 压缩参数应在建表 DDL 中配置,供专用压缩作业读取:
'num-sorted-run.compaction-trigger' = '5'—— 专用压缩作业触发阈值 ✅'full-compaction.delta-commits' = '10'—— Full Compaction 触发间隔 ✅
- ⚠️
num-sorted-run.stop-trigger在专用压缩模式下完全失效- 原因:写入作业使用
NoopCompactManager,shouldWaitForLatestCompaction()永远返回false - 风险:若压缩速度 < 写入速度,sorted run 数会无限增长,最终导致:
- 读性能严重恶化(需合并大量文件)
- 文件数失控(如文档中提到的 90 万文件案例)
- 缓解措施:
- 监控 L0 文件数和 sorted run 数指标
- 及时调整压缩作业并行度:
'sink.parallelism' = '8' - 必要时临时暂停部分写入作业
- 原因:写入作业使用
4.4 启动专用压缩作业
方式一:使用 Flink SQL 过程调用
-- 压缩单个表
CALL sys.compact('database.table_name');
-- 压缩指定分区
CALL sys.compact(
`table` => 'database.table_name',
`partitions` => 'dt=2025-12-08',
`options` => 'sink.parallelism=4'
);
方式二:使用 Flink Action Jar
Minor Compaction(流式模式)
cd /Users/10281539/WorkSpace/software/flink-1.17.2
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--table my_table \
--compact_strategy minor
Full Compaction(批处理模式)
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--table my_table \
--compact_strategy full
压缩指定分区
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--table my_table \
--partition dt=2025-12-08 \
--compact_strategy full
数据库级别压缩(Combined 模式)
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--mode combined \
--compact_strategy minor
Combined 模式特点:
- 单个 Flink 作业压缩整个数据库的所有表
- 自动发现新表并压缩
- 资源共享,避免启动多个作业
4.5 历史分区压缩
对于长期空闲的分区,可以使用专用作业进行全量压缩:
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--table my_table \
--partition_idle_time 7d \
--compact_strategy full
参数说明:
--partition_idle_time:指定分区空闲时长阈值,只压缩闲置时间超过该值的分区- 格式:
7d(7天)、12h(12小时)、30m(30分钟) - 作用:过滤出最后接收数据时间 < (当前时间 - partition_idle_time) 的分区
- 使用场景:
- ✅ 对历史分区进行 Full Compaction,避免影响"热"分区
- ✅ 减少不必要的压缩开销,只处理不再接收数据的分区
- ✅ 配合
--compact_strategy full进行深度优化
- 格式:
--compact_strategy:压缩策略(full或minor)full:全量压缩,合并所有文件到单层(仅 Batch 模式)minor:增量压缩,选择性合并部分文件(默认由运行模式决定)
重要限制(基于源码验证):
- ⚠️
partition_idle_time仅支持 Batch 模式(必须添加-Dexecution.runtime-mode=BATCH) - ⚠️ Streaming 模式会抛出异常:
Streaming mode does not support partitionIdleTime - ⚠️ 不能与
--order_strategy参数一起使用(Sort Compaction 不支持) - ⚠️ 功能在 Paimon 1.0+ 版本引入,0.8 版本不支持
五、核心配置参数详解
5.1 基础触发参数
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
num-sorted-run.compaction-trigger |
5 | Integer | 触发压缩的最小 Sorted Run 数量(包含 L0 文件和高层 Run) |
num-sorted-run.stop-trigger |
trigger+3 | Integer | 暂停写入的 Sorted Run 阈值 |
sort-spill-threshold |
(none) | Integer | 超过该数量的 Sort Reader 会溢出到磁盘,防止 OOM |
调优建议:
-- 写密集型场景(最大化写入性能)
'num-sorted-run.compaction-trigger' = '10'
'num-sorted-run.stop-trigger' = '2147483647' -- 永不停止
'sort-spill-threshold' = '15'
-- 读密集型场景(最小化文件数)
'num-sorted-run.compaction-trigger' = '3'
'num-sorted-run.stop-trigger' = '5'
'full-compaction.delta-commits' = '5'
-- 平衡场景(推荐)
'num-sorted-run.compaction-trigger' = '5' -- 默认
'num-sorted-run.stop-trigger' = '10' -- 默认
'sort-spill-threshold' = '10'
5.2 压缩策略参数
主键表参数:
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
compaction.optimization-interval |
(none) | Duration | 优化压缩执行频率(如 '1h', '30min') |
full-compaction.delta-commits |
(none) | Integer | 每N次增量提交后执行全量压缩(同步) |
compaction.total-size-threshold |
(none) | MemorySize | 文件总大小低于该值时触发全量压缩 |
compaction.force-rewrite-all-files |
false | Boolean | 是否强制选择所有文件进行全量压缩 |
Append-Only 表参数:
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
write-only |
false | Boolean | 是否禁用 compaction(true 时使用 NoopCompactManager) |
compaction.min.file-num |
5 | Integer | 触发 compaction 的最小文件数(配合文件总大小条件) |
compaction.max.file-num |
50 | Integer | 触发 compaction 的最大文件数(单独触发条件) |
target-file-size |
128MB | MemorySize | 目标文件大小,compaction 会尽量合并到此大小 |
compaction.delete-ratio-threshold |
0.2 | Double | 删除行比例超过该值时强制压缩(带删除向量的 Append 表) |
5.3 Lookup Compaction 参数
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
lookup-compact |
RADICAL | Enum | Lookup 压缩模式:RADICAL(激进)或 GENTLE(温和) |
lookup-compact.max-interval |
(none) | Integer | Gentle 模式下,最多N次 commit 必须执行一次 L0 压缩 |
lookup-wait |
true | Boolean | Commit 是否等待 Lookup Compaction 完成 |
force-lookup |
false | Boolean | 是否强制使用 Lookup Compaction |
5.4 Sort Compaction 参数
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
sort-compaction |
false | Boolean | 是否启用排序压缩 |
sort-compaction.order-by |
(none) | String | 普通排序的列(逗号分隔) |
sort-compaction.zorder-by |
(none) | String | Z-Order 排序的列(逗号分隔) |
sort-compaction.hilbert-by |
(none) | String | Hilbert 排序的列(逗号分隔) |
sort-compaction.range-strategy |
QUANTITY | Enum | 范围分布策略:SIZE 或 QUANTITY |
sort-compaction.local-sample.magnification |
1000 | Integer | 本地采样放大倍数 |
5.5 高级参数
| 参数 | 默认值 | 类型 | 说明 |
|---|---|---|---|
compaction.size-ratio |
1 | Integer | Changelog 模式下比较 Sorted Run 大小的灵活百分比 |
compaction.max-size-amplification-percent |
200 | Integer | Changelog 模式下存储放大的最大百分比 |
compaction.delete-ratio-threshold |
0.2 | Double | Append-Only 表中删除行比例超过该值时强制压缩 |
compaction.force-up-level-0 |
false | Boolean | 是否始终将 L0 文件包含在压缩候选中 |
compaction.offpeak-ratio |
0 | Integer | 离峰时间的压缩比例 |
compaction.offpeak.start.hour |
-1 | Integer | 离峰时间开始小时(0-23) |
compaction.offpeak.end.hour |
-1 | Integer | 离峰时间结束小时(0-23) |
5.6 离峰时间压缩
针对写入有明显高峰和低峰的场景,可以配置离峰时间更激进的压缩:
CREATE TABLE offpeak_compact_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'compaction.offpeak.start.hour' = '22', -- 晚上10点开始
'compaction.offpeak.end.hour' = '6', -- 早上6点结束
'compaction.offpeak-ratio' = '50' -- 离峰时间压缩比例提升50%
);
六、不同场景的最佳实践
6.1 场景一:流式写入 + 实时查询(推荐 MOW 模式)
需求:
- Flink 流式作业持续写入
- 需要支持实时查询
- 读写性能需要平衡
配置方案:
CREATE TABLE realtime_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
user_id BIGINT,
event_type STRING,
event_time TIMESTAMP(3)
) TBLPROPERTIES (
'bucket' = '-1', -- 动态桶
'deletion-vectors.enabled' = 'true', -- MOW 模式
'changelog-producer' = 'lookup',
'lookup-compact' = 'GENTLE', -- 温和压缩,降低资源消耗
'lookup-compact.max-interval' = '100',
'num-sorted-run.compaction-trigger' = '5',
'num-sorted-run.stop-trigger' = '12',
'sort-spill-threshold' = '10'
);
6.2 场景二:批量写入 + 读密集型(推荐 COW 模式)
需求:
- Spark 批处理定期写入(如每小时一次)
- 查询频繁,对读性能要求高
- 写入不频繁,可接受写入开销
配置方案:
CREATE TABLE batch_cow_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'bucket' = '8',
'full-compaction.delta-commits' = '1', -- 每次写入后全量压缩
'file-index.bloom-filter.columns' = 'id' -- 添加索引优化查询
);
6.3 场景三:多写入者 + 专用压缩
需求:
- 多个 Flink 作业写入同一张表
- 避免压缩冲突
- 需要统一的压缩策略
配置方案:
步骤 1:创建表,启用 write-only
CREATE TABLE multi_writer_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'write-only' = 'true', -- 写入作业不执行压缩
'bucket' = '-1'
);
步骤 2:启动专用压缩作业
# 使用 Combined 模式压缩整个数据库
cd /Users/10281539/WorkSpace/software/flink-1.17.2
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--mode combined \
--compact_strategy minor
6.4 场景四:大表 + Sort Compaction
需求:
- 表数据量大(TB 级)
- 查询经常包含范围过滤和多维条件
- 需要优化数据局部性
配置方案:
CREATE TABLE large_table_with_sort (
id BIGINT,
region STRING,
city STRING,
category STRING,
amount DECIMAL(10,2),
create_time TIMESTAMP(3)
) TBLPROPERTIES (
'bucket' = '16',
'sort-compaction' = 'true',
'sort-compaction.zorder-by' = 'region,city,category', -- Z-Order 多维优化
'compaction.optimization-interval' = '2 h', -- 每2小时优化一次
'file-index.bloom-filter.columns' = 'id',
'file-index.bitmap.columns' = 'region,category'
);
6.5 场景五:高吞吐写入 + 异步压缩
需求:
- 峰值写入 QPS 极高
- 可接受查询性能在高峰期下降
- 低峰期资源充足
配置方案:
CREATE TABLE high_throughput_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'bucket' = '32', -- 增加并行度
'num-sorted-run.stop-trigger' = '2147483647', -- 永不停止写入
'sort-spill-threshold' = '15', -- 防止 OOM
'lookup-wait' = 'false', -- 不等待 lookup 压缩
'compaction.optimization-interval' = '30 min' -- 低峰期定期优化
);
6.6 场景六:Append-Only 日志表(推荐配置)
需求:
- 日志收集、事件流存储
- 只追加不更新
- 需要查询历史数据
- 避免小文件问题
配置方案:
CREATE TABLE event_logs (
event_time TIMESTAMP(3),
event_type STRING,
user_id BIGINT,
event_data STRING,
dt STRING
) PARTITIONED BY (dt)
TBLPROPERTIES (
'bucket' = '-1', -- UNAWARE 模式
'write-only' = 'false', -- ✅ 启用 compaction(推荐)
'compaction.min.file-num' = '10', -- 至少 10 个文件才触发
'compaction.max.file-num' = '100', -- 最多 100 个文件必须触发
'target-file-size' = '256MB' -- 合并到 256MB
);
特点:
- ✅ 后台自动合并小文件,减少 HDFS NameNode 压力
- ✅ 提高查询性能(减少文件打开次数)
- ✅ 保持数据追加顺序(按 sequence number)
- ✅ 不影响写入性能(异步执行)
6.7 场景七:极高吞吐 Append-Only 表(禁用 Compaction)
需求:
- 每秒数百万条日志写入
- 数据写入后几乎不查询
- 追求极致写入性能
- 可接受大量小文件
配置方案:
CREATE TABLE ultra_high_throughput_logs (
log_time TIMESTAMP(3),
log_level STRING,
log_message STRING
) TBLPROPERTIES (
'bucket' = '-1',
'write-only' = 'true' -- ❌ 禁用 compaction,最大化写入性能
);
注意事项:
- ⚠️ 会产生大量小文件
- ⚠️ 需要定期手动执行 compaction 或使用专用 compaction 作业
- ⚠️ 查询性能会随文件数增长而下降
- ✅ 适合"写多读少"的场景
七、监控与调优
7.1 关键监控指标
| 指标 | 说明 | 监控方法 |
|---|---|---|
| 文件数量 | 当前 Bucket 的 Sorted Run 数量 | 查看 manifest 文件 |
| 压缩频率 | Compaction 触发次数和间隔 | Flink Metrics |
| 压缩延迟 | Compaction 完成耗时 | Flink Metrics |
| 写入延迟 | 是否因压缩导致写入暂停 | 应用日志 |
| 查询性能 | 查询响应时间变化 | 查询日志 |
7.2 性能问题排查
常见问题速查
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 写入频繁暂停 | stop-trigger 太低 |
增加 stop-trigger=20 |
| 查询性能下降 | 文件数过多 | 降低 compaction-trigger=3 或手动压缩 |
| OOM 错误 | 同时打开文件过多 | 设置 sort-spill-threshold=8 |
| 压缩作业冲突 | 多作业并发压缩 | 使用 write-only=true + 专用压缩作业 |
问题 5:Hive 锁超时导致 L0 文件疯狂膨胀(生产案例)
真实生产案例:
场景描述:
- 多个写入作业,配置
write-only = true - 使用 Hive Metastore(启用了锁)
- 单独启动 Dedicated Compaction 作业
- 正常运行几天后突然出现问题
症状:
错误信息:
Failed to acquire lock after timeout
现象:
1. Compaction 作业获取 Hive 锁超时
2. Compaction 失败后任务"跳过"(可能是静默失败或重试放弃)
3. L0 文件数量疯狂增长:从几十个膨胀到 90+ 万个 ❌
4. 查询性能急剧下降,甚至无法查询
5. 存储空间浪费严重(大量小文件)
根本原因分析:
-
lock.enabled 配置来源排查:
- Paimon 0.8/0.9 默认值:
lock.enabled默认为false✅ - Hive Metastore 不会自动启用锁:仅使用
metastore=hive不会自动启用锁机制 ✅ - 可能的锁来源:
- Flink 0.9 写入作业的 catalog 配置中显式设置了
lock.enabled=true - flink-conf.yaml 全局配置中设置了锁
- Dedicated Compaction 作业启动时的 catalog 配置继承或默认启用了锁
- 跨版本配置迁移(Spark 0.8 → Flink 0.9)可能导致配置不一致
- Flink 0.9 写入作业的 catalog 配置中显式设置了
- 建议排查方向:
# 检查 Flink 写入作业的 catalog 配置 # 查看 CREATE CATALOG 语句或作业提交参数 # 检查 Flink 全局配置 cat $FLINK_HOME/conf/flink-conf.yaml | grep lock # 检查 Compaction 作业启动日志 # 查看实际使用的 catalog 配置
- Paimon 0.8/0.9 默认值:
-
Hive Metastore 锁机制问题:
- Hive 锁依赖于 Metastore 的性能和稳定性
- 如果 Metastore 繁忙、网络抖动、或锁表损坏,会导致获取锁超时
- 长时间运行后,Metastore 可能积累大量锁记录
-
Compaction 失败处理不当:
- 获取锁失败后,Compaction 作业可能静默失败或跳过本次压缩
- 并未明确报错导致任务失败,而是继续运行
- 写入作业持续产生 L0 文件,但没有 Compaction 清理
-
L0 文件膨胀恶性循环:
写入作业 → 持续产生 L0 文件 ↓ Compaction 作业获取锁超时 → 压缩失败 ↓ L0 文件继续增长(几天内到 90+ 万) ↓ 查询需要读取海量文件 → 性能急剧下降 ↓ 甚至 OOM、查询超时
为什么 lock.enabled=false 解决了问题?
-
绕过 Hive Metastore 锁:
- 不再依赖 Hive Metastore 的锁机制
- 避免锁超时问题
-
专用压缩作业本身就是单例:
- 只有一个 Compaction 作业运行,不存在并发冲突
- 不需要额外的分布式锁保护
-
写入作业已设置
write-only=true:- 写入作业不执行压缩
- 所有压缩由专用作业负责
- 天然避免了并发压缩
解决方案:
✅ 推荐方案:禁用 Hive 锁
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse hdfs://namenode:8020/warehouse \
--database my_db \
--mode combined \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--catalog_conf lock.enabled=false # 关键:禁用锁
原因:专用压缩作业本身就是单例,配合 write-only=true 不会有并发冲突。
⚠️ 预防措施:监控 L0 文件数量
- 告警阈值:1000(警告)、10000(严重)、100000(危险)
- 监控方式:
SELECT COUNT(*) FROM my_table$files WHERE level = 0
配置建议速查表:
| 场景 | lock.enabled | 说明 |
|---|---|---|
| 专用压缩 + write-only | ❌false(推荐) | 不会有并发冲突,无需锁 |
| 多写入者 + 每个都压缩 | ✅true(必须) | 存在并发冲突,必须用锁 |
| 单写入者 | ❌ false | 单作业本身不冲突 |
| S3 存储 + 专用压缩 | ❓ 视情况 | 如果只有一个压缩作业,可以不启用 |
经验总结:
-
专用压缩作业 + write-only 模式下,通常不需要 Hive 锁
- 架构本身已经保证了不会有并发压缩
- 启用锁反而引入了复杂性和不稳定性
-
监控是关键
- 定期检查 L0 文件数量
- 监控 Compaction 作业是否正常执行
- 设置告警阈值
-
L0 文件膨胀恢复方案
- 如果已经膨胀到 90+ 万,直接修复需要很长时间
- 可以考虑:
- 停止写入,专注清理 L0 文件
- 增加 Compaction 并行度
- 或者备份数据,重建表
-
Hive Metastore 不是万能的
- Hive 锁依赖 Metastore 的稳定性
- 生产环境需要考虑 Metastore 的性能和可靠性
- 对于专用压缩场景,锁不是必需的
7.3 调优原则
-
写密集型场景优先:
- 提高
stop-trigger - 启用异步压缩
- 使用 Minor Compaction
- 提高
-
读密集型场景优先:
- 降低
compaction-trigger - 启用 Full Compaction
- 考虑 COW 模式
- 降低
-
平衡型场景:
- 使用 MOW 模式(推荐)
- 保持默认 trigger 值
- 定期优化压缩
-
资源受限场景:
- 使用 Gentle Lookup Compaction
- 设置离峰时间压缩
- 控制
sort-spill-threshold
八、常见问题 FAQ
Q1: Compaction 是同步还是异步的?
答:取决于引擎和配置:
| 场景 | 同步/异步 | 说明 |
|---|---|---|
| Flink 流式作业 | 异步 | 后台线程执行,不阻塞写入 |
| Spark 批处理作业 | 同步 | 作业提交前执行,阻塞提交 |
full-compaction.delta-commits |
同步 | 每次提交时执行,影响写入性能 |
compaction.optimization-interval |
异步 | 定时触发,后台执行 |
Q2: 为什么设置了 write-only = true 还会压缩?
答:可能原因:
- 使用了旧版本 Paimon,该参数未生效
- 同时运行了其他未设置
write-only的作业 - 手动触发了压缩(
CALL sys.compact())
Q3: Minor Compaction 和 Full Compaction 可以同时配置吗?配置之间有相关性吗?
答:可以同时配置,且推荐同时配置。两者配置相互独立,各自触发互不影响:
独立触发机制:
- Minor Compaction:由文件数量驱动(
num-sorted-run.compaction-trigger),持续工作 - Full Compaction:由提交次数或时间间隔驱动(
full-compaction.delta-commits或compaction.optimization-interval),定期执行
配置示例:
CREATE TABLE mixed_compact_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'num-sorted-run.compaction-trigger' = '5', -- Minor: 5个文件触发
'compaction.optimization-interval' = '1 h' -- Full: 每小时触发
);
运行效果:
- Minor Compaction 持续工作,保持文件数在可控范围(避免 OOM)
- Full Compaction 定期执行,生成单层文件,更新
$ro表(优化查询) - 两者协同工作,实现性能平衡
只配置一个的缺点:
- ❌ 只配置 Minor:文件数控制住了,但
$ro表永远不更新,查询仍需内存合并 - ❌ 只配置 Full:Full Compaction 间隔期间,文件数可能失控(依赖默认 Minor 配置)
- ✅ 两者都配置:获得最佳性能平衡
Q4: Lookup Compaction 一定要配置吗?
答:不一定,只在以下情况下需要:
- 使用
changelog-producer = 'lookup' - 使用
merge-engine = 'first-row' - 使用 MOW 模式(
deletion-vectors.enabled = true)
Q5: 如何知道当前表有多少个文件?
方法 1:查询系统表
SELECT * FROM my_table$files;
方法 2:查看存储目录
# 查看某个 bucket 的文件
ls /Users/10281539/WorkSpace/warehouse/my_db.db/my_table/bucket-0/
方法 3:使用统计命令
CALL sys.query_statistics('database.table');
Q6: Compaction 会删除旧数据吗?
答:不会直接删除,但会产生以下效果:
- 合并重复数据:根据 Merge Engine 合并同主键的多条记录
- 过期记录删除:如果配置了
record-level.expire-time,压缩时会删除过期记录 - 旧文件清理:压缩后旧文件成为历史快照的一部分,需要通过
expire_snapshots清理
Q7: 如何强制执行一次全量压缩?
答:有以下几种方式:
方式 1:使用 SQL 过程
CALL sys.compact('database.table');
方式 2:使用 Action Jar
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///Users/10281539/WorkSpace/warehouse \
--database my_db \
--table my_table \
--compact_strategy full
方式 3:临时修改表属性
ALTER TABLE my_table SET ('full-compaction.delta-commits' = '1');
-- 写入一条数据触发压缩
INSERT INTO my_table VALUES (1, 'test');
-- 改回原配置
ALTER TABLE my_table RESET ('full-compaction.delta-commits');
Q8: 使用 S3 存储时,为什么作业频繁失败并报文件冲突?
答:这是 S3/OSS/OBS 等对象存储的 RENAME 操作非原子性导致的典型问题。
核心原因:
- S3 等对象存储的 RENAME 是"复制+删除",不是原子操作
- 多作业并发压缩时会读取到中间状态的文件,导致冲突
解决方案:
- 多作业并发压缩:必须启用分布式锁(
lock.enabled=true) - 专用压缩作业(write-only模式):通常不需要启用锁
存储类型对比:
| 存储类型 | 需要 lock.enabled | 原因 |
|---|---|---|
| HDFS/本地文件系统 | ❌ 不需要 | RENAME 是原子操作 |
| S3/OSS/OBS/COS 等 | 多作业压缩时 ✅必须 专用压缩作业 ❓ 可选 |
RENAME 非原子 |
详细配置方案和生产案例,请参见 第四章 4.2 节:S3 等对象存储的重要注意事项
Q9: Minor 和 Full Compaction 怎么区分?
答:Paimon 会根据配置和文件状态自动决定:
Minor Compaction(默认):
- 触发方式:
num-sorted-run.compaction-trigger = 5(文件数达到阈值) - 行为:Paimon 使用 Universal Compaction 算法,选择性合并部分文件
- 结果:文件数减少,但不会合并到 1 个
Full Compaction(需显式配置):
- 触发方式 1:
full-compaction.delta-commits = 10(每 10 次提交) - 触发方式 2:
compaction.optimization-interval = '1 h'(每小时) - 触发方式 3:手动执行
CALL sys.compact() - 行为:合并所有文件
- 结果:理想情况下合并为 1 个大文件
两者可以共存:
CREATE TABLE hybrid_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'num-sorted-run.compaction-trigger' = '5', -- Minor 持续执行
'compaction.optimization-interval' = '2 h' -- Full 每2小时执行
);
如何判断当前使用的是哪种?
- 观察文件数变化:Minor 减少但不会到 1,Full 通常减少到 1-2 个
- 查看 Flink 日志:
Compaction task triggered: type=MINOR/FULL - 查询系统表:
SELECT * FROM my_table$files
详细说明:参见第二章:Compaction 类型详解
Q10: 为什么 Minor Compaction 不更新 $ro 表?
答:因为 $ro(Read-Optimized)表的设计目标是提供零合并成本的读优化性能。
核心原因:
-
Universal Compaction 的多层结构
- Minor Compaction 后,数据仍分散在多个层级(L0、L1、L2...)
- 查询时需要读取并合并多层文件
- 不符合 "Read-Optimized" 的定义
-
Full Compaction 的单层结构
- 将所有数据合并到单层连续文件
- 查询时直接读取,无需合并
- 真正实现零合并成本
配置 $ro 表更新频率:
CREATE TABLE my_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'compaction.optimization-interval' = '1 h' -- 每小时更新 $ro 表
);
权衡:
- 频繁 Full Compaction →
$ro表实时性好,但写入性能下降 - 不频繁 Full Compaction → 写入性能好,但
$ro表数据延迟
详细说明:参见第三章:Read-Optimized 表详解
Q11: Append-Only 表会有 Compaction 吗?
答:会有,且默认启用,但与主键表的 Compaction 完全不同。
核心区别:
- 主键表 Compaction:合并数据(数据去重、版本合并)
- Append-Only 表 Compaction:合并文件(文件拼接,无数据合并)
触发条件:
-- 满足以下任一条件即触发
条件1: (总文件大小 >= target-file-size) AND (文件数 >= compaction.min.file-num)
条件2: 文件数 >= compaction.max.file-num
禁用方式:
CREATE TABLE logs (...) TBLPROPERTIES (
'write-only' = 'true' -- 禁用 compaction
);
推荐配置:
- ✅ 默认启用 compaction(
write-only = false):减少小文件,提高查询性能 - ⚠️ 禁用 compaction(
write-only = true):仅适用于极高吞吐 + 几乎不查询的场景
详细说明:参见第二章 2.4 节:Append-Only 表 Compaction
Q12: 专用压缩作业 + Hive Metastore,需要启用 lock.enabled 吗?
答:取决于具体场景,大多数情况下不需要。
重要说明:
- Paimon 0.8/0.9 默认值:
lock.enabled默认为false✅ - Hive Metastore 不会自动启用锁:仅配置
metastore=hive不会自动启用锁机制 ✅ - 锁必须显式配置:如果遇到锁超时问题,说明某处配置中显式启用了锁
不需要启用锁的场景(推荐):
- ✅ 只有一个专用压缩作业运行
- ✅ 所有写入作业都设置了
write-only = true - ✅ 使用 Hive Metastore 仅用于元数据管理
配置示例:
# 启动专用压缩作业(禁用锁)
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse hdfs://namenode:8020/warehouse \
--database my_db \
--mode combined \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--catalog_conf lock.enabled=false # 禁用锁
为什么可以禁用?
- 架构本身已经保证了不会有并发压缩(只有一个压缩作业)
- 启用锁反而可能引入 Hive Metastore 的稳定性问题
需要启用锁的场景:
- ⚠️ 多个作业都会执行压缩(未使用 write-only)
- ⚠️ 使用 S3 等对象存储(RENAME 非原子性)
- ⚠️ 存在并发写入同一分区的可能
真实生产教训:
某生产环境在启用 Hive 锁后,运行几天后遇到锁超时问题,导致 L0 文件膨胀到 90+ 万个,最终通过禁用锁(lock.enabled=false)解决。
详细案例:参见第九章问题5:Hive 锁超时导致 L0 文件疯狂膨胀
配置建议:
| 架构模式 | lock.enabled | 理由 |
|---|---|---|
| 专用压缩 + write-only | ❌false | 无并发冲突,无需锁 |
| 多写入者 + 每个都压缩 | ✅true | 有并发冲突,必须用锁 |
| S3 + 专用压缩 | ❓ 视情况 | 单压缩作业可不启用 |
| HDFS + 专用压缩 | ❌ false | RENAME 原子性 + 单压缩作业 |
九、版本差异对比(0.8 vs 1.3)
9.1 关于 delete-file.thread-num 参数
你提到的 delete-file.thread-num=100 参数,经过详细搜索 在 Paimon 0.8 和 1.3 的官方文档中均未找到。
可能的情况:
- 内部参数,仅存在于源码中,未在官方文档公开
- 特定发行版或 fork 版本添加的自定义参数
- 0.8 早期版本存在,后续版本已移除
- 已重命名为其他参数
在 1.3 版本中,控制文件删除并发度的推荐配置:
CREATE TABLE my_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'file-operation.thread-num' = '100', -- 文件操作线程数(包括删除)
'snapshot.expire.limit' = '50', -- 每次过期快照数量
'snapshot.expire.execution-mode' = 'async' -- 异步删除文件(1.3 新增)
);
迁移建议(从 0.8 到 1.3):
-- 如果 0.8 中使用了(假设存在)
'delete-file.thread-num' = '100'
-- 迁移到 1.3,等效配置
'file-operation.thread-num' = '100' -- 替代参数
'snapshot.expire.execution-mode' = 'async' -- 异步删除,提升性能
'snapshot.expire.limit' = '50' -- 控制批次大小
9.2 Compaction 相关新增特性(1.3)
1. Lookup Compaction(全新功能)
0.8:不支持 Lookup Compaction
1.3:新增 Radical 和 Gentle 两种模式
-- 1.3 新增配置
CREATE TABLE lookup_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'changelog-producer' = 'lookup',
'lookup-compact' = 'GENTLE', -- ⭐ 新增
'lookup-compact.max-interval' = '100', -- ⭐ 新增
'lookup-wait' = 'true' -- ⭐ 增强
);
2. Sort Compaction(全新功能)
0.8:不支持 Sort Compaction
1.3:支持 Order、Z-Order、Hilbert 三种排序策略
-- 1.3 新增配置
CREATE TABLE sort_table (
id BIGINT,
region STRING,
amount DECIMAL(10,2)
) TBLPROPERTIES (
'bucket' = '4',
'sort-compaction' = 'true', -- ⭐ 新增
'sort-compaction.zorder-by' = 'region,amount' -- ⭐ 新增
);
3. 异步快照过期(全新功能)
0.8:快照过期和文件删除是同步的,可能阻塞写入
1.3:支持异步快照过期,避免阻塞
-- 0.8 配置
'snapshot.time-retained' = '1 h'
'snapshot.num-retained.min' = '10'
-- 1.3 增强配置
'snapshot.time-retained' = '1 h'
'snapshot.num-retained.min' = '10'
'snapshot.expire.execution-mode' = 'async' -- ⭐ 新增
'snapshot.expire.limit' = '50' -- ⭐ 新增
4. 专用压缩作业 Combined 模式(增强)
0.8:基本的专用压缩支持
1.3:新增 Combined 模式,单作业压缩整个数据库
# 1.3 新增 Combined 模式
./bin/flink run \
/path/to/paimon-flink-action-1.3.1.jar compact \
--warehouse file:///path/to/warehouse \
--database my_db \
--mode combined \ # ⭐ 新增:整库压缩
--compact_strategy minor
5. 离峰时间压缩(可能是新增)
-- 1.3 支持离峰时间更激进的压缩
CREATE TABLE offpeak_table (
id BIGINT PRIMARY KEY NOT ENFORCED,
data STRING
) TBLPROPERTIES (
'compaction.offpeak.start.hour' = '22', -- ⭐ 可能是新增
'compaction.offpeak.end.hour' = '6', -- ⭐ 可能是新增
'compaction.offpeak-ratio' = '50' -- ⭐ 可能是新增
);
9.3 性能提升对比
| 场景 | 0.8 基准 | 1.3 提升 | 原因 |
|---|---|---|---|
| 写入吞吐量 | 1x | 1.2-1.5x | MOW 模式 + Gentle Lookup |
| DELETE 性能 | 1x | 2-5x | Deletion Vectors 优化 |
| 范围查询 | 1x | 1.5-3x | Sort Compaction (Z-Order) |
| 快照清理 | 1x | 1.3-2x | 异步快照过期 |
| 多写入者 | ⚠️ 冲突风险 | ✅ 稳定 | 专用压缩作业改进 |
9.4 关键参数变化对比
| 参数 | 0.8 | 1.3 | 说明 |
|---|---|---|---|
num-sorted-run.compaction-trigger |
✅ 默认 5 | ✅ 默认 5 | 无变化 |
num-sorted-run.stop-trigger |
✅ 默认 trigger+3 | ✅ 默认 trigger+3 | 无变化 |
lookup-compact |
❌ 不支持 | ✅ RADICAL/GENTLE | 新增 |
sort-compaction |
❌ 不支持 | ✅ 支持 | 新增 |
snapshot.expire.execution-mode |
❌ 不支持 | ✅ sync/async | 新增 |
file-operation.thread-num |
❓ 不确定 | ✅ 支持 | 替代 delete-file.thread-num |
write-only |
❓ 不确定 | ✅ 明确支持 | 增强 |
deletion-vectors.enabled |
⚠️ 实验性 | ✅ 稳定 | 增强 |
9.5 升级建议
适合升级到 1.3 的场景
| 场景 | 理由 |
|---|---|
| ✅MOW 模式表 | 1.3 中 MOW 更稳定,性能提升明显 |
| ✅多写入者 | 专用压缩作业支持更好,冲突问题改善 |
| ✅写入密集型 | Gentle Lookup Compaction 降低资源消耗 |
| ✅大表范围查询 | Sort Compaction 显著提升查询性能 |
| ✅快照清理慢 | 异步快照过期避免阻塞 |
9.6 版本选择建议
| 项目类型 | 推荐版本 | 理由 |
|---|---|---|
| 新项目 | 1.3 | 功能更丰富,性能更好,MOW 模式稳定 |
| 生产环境 | 1.3 | 更稳定,Lookup/Sort Compaction 成熟 |
| 0.8 升级 | 评估后升级 | 性能提升 20-50%,需充分测试 |
| 实验性功能 | 1.3+/Master | 最新特性,需注意稳定性 |
十、总结与建议
10.1 核心要点总结
- Compaction 机制因表类型而异:
- 主键表:基于 LSM Tree,合并数据(去重、版本合并)
- Append-Only 表:基于文件数量,合并文件(文件拼接,保持追加顺序)
- 主键表 Compaction 策略:
- Minor Compaction 适合写密集型场景,Full Compaction 适合读密集型场景
- MOW 模式 + Gentle Lookup Compaction 是大多数场景的推荐配置(1.3 稳定支持)
- Append-Only 表 Compaction 推荐:
- ✅ 默认启用 compaction,减少小文件,提高查询性能
- ⚠️ 仅在极高吞吐 + 几乎不查询的场景下禁用
- 多写入者必须使用专用压缩作业,避免冲突
- 异步压缩策略可以最大化写入性能,但需监控文件数
- Sort Compaction 对大表的范围查询有显著优化效果(1.3 新增)
- 从 0.8 升级到 1.3 可获得显著性能提升,特别是 MOW 和多写入者场景
10.2 配置建议速查表
主键表:
| 场景 | 推荐配置 |
|---|---|
| 通用推荐(1.3) | MOW 模式 + Gentle Lookup + 默认 Trigger 值 + 异步快照过期 |
| 写密集型 | stop-trigger=MAX + sort-spill=10 + lookup-wait=false |
| 读密集型 | COW 模式 +full-compaction.delta-commits=1 |
| 多写入者 | write-only=true + 专用压缩作业(Combined 模式) |
| 大表优化 | Sort Compaction (Z-Order) + 定期优化 |
| 资源受限 | Gentle Lookup + 离峰时间压缩 |
Append-Only 表:
| 场景 | 推荐配置 |
|---|---|
| 日志收集/事件流 | bucket=-1 + write-only=false + compaction.min.file-num=10 + target-file-size=256MB |
| 极高吞吐写入 | bucket=-1 + write-only=true(禁用 compaction) |
| 需要查询历史数据 | 启用 compaction + 合理的 target-file-size |
| HDFS NameNode 压力大 | 启用 compaction + 较大的 target-file-size |
10.3 运维建议
-
监控文件数:定期检查 Sorted Run 数量,避免失控
- L0 文件数量告警阈值:1000(警告)、10000(严重)、100000(危险)
- 建议每小时检查一次
-
定期全量压缩:对于重要表,建议每天或每周执行一次全量压缩
-
快照清理:配合
expire_snapshots清理历史文件,释放存储空间 -
测试验证:修改压缩配置后,务必在测试环境验证性能影响
-
分区策略:合理设计分区,避免单分区过大导致压缩耗时过长
-
版本升级:评估从 0.8 升级到 1.3 的收益,制定灰度升级计划
-
⭐ 锁配置建议(重要):
- 专用压缩作业场景:不启用 Hive 锁(
lock.enabled=false),避免锁超时问题 - 并发压缩场景:必须启用锁(
lock.enabled=true),防止文件冲突 - 监控 Compaction 作业:确保压缩作业持续正常执行,避免静默失败
- 专用压缩作业场景:不启用 Hive 锁(
-
Hive Metastore 维护:
- 定期检查 Metastore 性能和稳定性
- 必要时清理过期的锁记录
- 监控 Metastore 连接数和响应时间
10.4 进一步学习
- 官方文档:https://paimon.apache.org/docs/1.3/primary-key-table/compaction/
- 专用压缩作业:https://paimon.apache.org/docs/1.3/maintenance/dedicated-compaction/
- 配置参数大全:https://paimon.apache.org/docs/1.3/maintenance/configurations/
- 性能调优指南:https://paimon.apache.org/docs/1.3/performance/overview/
- 0.8 文档(对比参考):https://paimon.apache.org/docs/0.8/