Paimon Compaction 详解

Paimon Compaction 详解

Paimon Compaction 详解

总结

  • 更新时间:2025-12-25
  • 适用版本:Apache Paimon 1.3+(包含 0.8 vs 1.3 版本差异对比)

核心概念

  • Compaction 核心作用
    • 主键表:合并 LSM Tree 中的 Sorted Runs,防止查询性能下降和 OOM 问题
    • Append-Only 表:合并小文件,提高读取性能(不涉及数据合并,只做文件拼接)
  • Compaction 类型
    • 主键表 Compaction(基于 LSM Tree):
      • Minor Compaction:选择性合并部分文件,不更新 $ro 表,适合写密集型场景
      • Full Compaction:合并所有文件到 maxLevel 单层,更新 $ro 表,适合读密集型场景
      • Lookup Compaction(1.3 新增):Radical(激进)和 Gentle(温和)两种模式
      • Sort Compaction(1.3 新增):Z-Order/Hilbert 曲线优化数据布局
    • Append-Only 表 Compaction(基于文件数量):
      • Auto Compaction:后台异步合并小文件,达到 target-file-size 时触发
      • Full Compaction:合并所有小文件(至少需要 3 个文件)
  • Read-Optimized 表($ro:只有 Full Compaction 才会更新,提供零合并成本的读优化性能
  • ⚠️ 重要澄清(源码验证):
    • Full Compaction 和 Minor Compaction 是两条完全独立的代码路径
    • Minor Compaction 中的 Size Amplification 触发的"全量合并"不是真正的 Full Compaction
    • 只有 fullCompaction=true 时才会调用 CompactStrategy.pickFullCompaction()

关键配置参数

主键表

  • num-sorted-run.compaction-trigger(默认 5):触发 Minor Compaction 的文件数阈值
  • num-sorted-run.stop-trigger(默认 trigger+3):等待压缩完成的文件数阈值(背压机制,非完全阻塞)
  • full-compaction.delta-commits:每 N 次提交触发 Full Compaction
  • compaction.optimization-interval:时间间隔触发 Full Compaction,确保 $ro 表时效性
  • 配置独立性:Minor 和 Full Compaction 配置互不影响,推荐同时配置以平衡性能

Append-Only 表

  • write-only(默认 false):是否禁用 compaction(true 时使用 NoopCompactManager)
  • compaction.min.file-num(默认 5):触发 compaction 的最小文件数
  • compaction.max.file-num(默认 50):触发 compaction 的最大文件数
  • target-file-size(默认 128MB):目标文件大小
  • 关键特性:Append-Only 表的 compaction 不涉及数据合并,只是文件拼接,按 sequence number 保持追加顺序

专用压缩作业与分布式锁

  • 专用压缩作业:多写入者场景的标准解决方案,设置 write-only=true
    • ✅ 写入作业使用 NoopCompactManager,完全跳过压缩逻辑(源码验证)
    • ✅ 压缩参数在建表 DDL 中配置,专用作业读取生效
    • ⚠️ num-sorted-run.stop-trigger 完全失效:写入作业永不等待,无法背压(源码验证)
    • ⚠️ 风险:若压缩速度 < 写入速度,文件数会无限增长(需监控和人工干预)
  • 专用压缩作业高级参数(Paimon 1.0+):
    • --compact_strategy:控制压缩策略(minorfull),默认由运行模式决定
    • --partition_idle_time:只压缩闲置超过指定时长的分区(如 7d
      • ⚠️ 仅支持 Batch 模式,Streaming 模式会抛异常(源码验证)
      • ⚠️ 不能与 --order_strategy 同时使用
  • S3 等对象存储
    • 多作业并发压缩:必须启用 lock.enabled=true(RENAME 非原子性)
    • 专用压缩作业:通常不需要启用锁(单例运行,无并发冲突)
  • ⚠️ 生产案例警示:启用 Hive 锁可能导致锁超时和 L0 文件膨胀(真实案例:90+ 万文件)
  • lock.enabled 默认值:Paimon 0.8/0.9 默认 false,Hive Metastore 不会自动启用

性能调优与最佳实践

  • 通用推荐(1.3):MOW 模式 + Gentle Lookup + 默认 Trigger 值 + 异步快照过期
  • 写密集型stop-trigger=MAX_VALUE + sort-spill=10 + lookup-wait=false
  • 读密集型:COW 模式 + full-compaction.delta-commits=1
  • 多写入者write-only=true + 专用压缩作业(Combined 模式)
  • 版本升级:0.8 → 1.3 可获得 20%-50% 性能提升

一、Compaction 基础概念

1.1 什么是 Compaction

Paimon 使用 LSM Tree(Log-Structured Merge-Tree) 架构存储数据。在 LSM Tree 中:

  • 数据写入时首先写入内存,然后 flush 到磁盘形成 Sorted Run(有序文件)
  • 随着写入增加,Sorted Run 数量不断增长
  • 查询时需要合并多个 Sorted Run 的数据,文件数越多,查询性能越差
  • Compaction(压缩) 就是将多个 Sorted Run 合并成一个更大的 Sorted Run 的过程

1.2 为什么需要 Compaction

Compaction 的主要目的包括:

  1. 维持查询性能

    • 减少查询时需要读取的文件数
    • 降低 I/O 开销和内存使用
  2. 防止 OOM(内存溢出)

    • 过多的 Sorted Run 需要同时打开大量文件读取器
    • 可能导致内存不足
    • OOM 风险的本质
      • LSM Tree 架构特性:MOR 模式在读取时需要在内存中合并所有 Sorted Runs
      • 文件数爆炸:不执行 Compaction → Sorted Run 数量无限增长
      • 内存线性增长:文件数 × (文件句柄 + 读取缓冲区 + Reader 对象 + 合并计算内存)
      • 查询触发 OOM:单个查询可能需要打开成千上万个文件,导致内存耗尽
  3. 生成 Changelog

    • 通过合并生成变更日志(changelog-producer)
  4. 删除过期数据

    • 清理超过保留时间的记录(record-level expiration)
  5. 生成删除向量

    • 为 MOW 模式生成 Deletion Vectors

1.3 LSM Tree 架构与 Universal Compaction

Paimon 使用类似 RocksDB 的 Universal Compaction 策略。以下基于源码 UniversalCompaction.javacreateUnit 方法进行可视化说明。

LSM Tree 层级结构与 Compaction 流程

graph TB Start[压缩触发] --> CheckType{压缩类型判断<br/>源码: MergeTreeCompactManager.triggerCompaction} CheckType -->|fullCompaction=true<br/>强制 Full Compaction| FullPath[CompactStrategy.pickFullCompaction<br/>源码 L141] CheckType -->|fullCompaction=false<br/>普通 Minor Compaction| MinorPath[strategy.pick<br/>源码 L155] subgraph "Full Compaction 路径(独立)" FullPath --> FullSelect[无条件选择所有文件<br/>outputLevel = maxLevel] FullSelect --> FullExecute[提交压缩任务] FullExecute --> FullResult[结果: 单层结构<br/>L0/L1空, L2有数据] end subgraph "Minor Compaction 路径(Universal Compaction 算法)" MinorPath --> Check1{1. 检查 opCompactionInterval<br/>源码: UniversalCompaction L89-96} Check1 -->|超时| SelectAll1[选择所有文件<br/>outputLevel = maxLevel] Check1 -->|未超时| Check2{2. 检查 Size Amplification<br/>源码 L98-105} Check2 -->|空间放大超标| SelectAll2[选择所有文件<br/>触发全量合并<br/>仍是 Minor Compaction] Check2 -->|正常| Check3{3. 检查 Size Ratio<br/>源码 L107-114} Check3 -->|满足 size-ratio 条件| SelectPartial1[选择部分文件<br/>按 size-ratio 贪心算法] Check3 -->|不满足| Check4{4. 检查文件数<br/>源码 L116-124} Check4 -->|文件数超过 trigger| SelectPartial2[选择 candidateCount 个文件] Check4 -->|未超过| Check5{5. 检查 Lookup Interval<br/>源码 L126-144} Check5 -->|达到间隔| SelectL0[强制选择所有 L0 文件<br/>forcePickL0] Check5 -->|未达到| NoCompact[不压缩] SelectAll1 --> MinorExecute[提交压缩任务] SelectAll2 --> MinorExecute SelectPartial1 --> MinorExecute SelectPartial2 --> MinorExecute SelectL0 --> MinorExecute MinorExecute --> MinorResult[结果: 多层结构<br/>部分文件合并] NoCompact --> End[结束] end subgraph "触发方式" Trigger1[时间间隔触发<br/>compaction.optimization-interval] -.-> CheckType Trigger2[提交次数触发<br/>full-compaction.delta-commits] -.-> CheckType Trigger3[手动触发<br/>CALL sys.compact] -.-> CheckType Trigger4[自动触发<br/>文件数达到 compaction-trigger] -.-> CheckType end FullResult --> End MinorResult --> End style FullPath fill:#ff6b6b style MinorPath fill:#4ecdc4 style SelectAll1 fill:#ffe66d style SelectAll2 fill:#ffe66d style FullSelect fill:#ff6b6b style FullExecute fill:#ff6b6b style CheckType fill:#95e1d3 style Trigger1 fill:#e7f3ff style Trigger2 fill:#e7f3ff style Trigger3 fill:#e7f3ff style Trigger4 fill:#e7f3ff

流程图关键说明

  1. Full Compaction 和 Minor Compaction 是两条独立的代码路径(源码: MergeTreeCompactManager.java:127-146

    • Full CompactionfullCompaction = true → 调用 CompactStrategy.pickFullCompaction() → 无条件合并所有文件
    • Minor CompactionfullCompaction = false → 调用 strategy.pick() → 根据 5 个条件决策
  2. Minor Compaction 中的"全量合并"不是真正的 Full Compaction

    • Size Amplification 超标时会选中所有文件,但仍是 Minor Compaction
    • 区别:Full Compaction 有额外的优化逻辑(如 deletion vector 清理、record-level-expire 等)
  3. 触发方式对应关系

    • compaction.optimization-interval → Minor Compaction 的条件 1(opCompactionInterval)
    • full-compaction.delta-commits → 强制 Full Compaction(fullCompaction=true)
    • CALL sys.compact() → 强制 Full Compaction(fullCompaction=true)
    • 文件数达到 compaction-trigger → Minor Compaction 的条件 4

源码关键逻辑说明(UniversalCompaction.java L179-206)

CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
    int outputLevel;

    // 【关键判断 1】如果合并所有文件(Full Compaction)
    if (runCount == runs.size()) {
        outputLevel = maxLevel;  // ← 输出到最高层级
    } else {
        // Minor Compaction:输出到下一个 run 的层级 - 1
        outputLevel = Math.max(0, runs.get(runCount).level() - 1);
    }

    // 【关键判断 2】避免输出到 L0
    if (outputLevel == 0) {
        // L0 专用于新写入文件,压缩结果不应输出到 L0
        for (int i = runCount; i < runs.size(); i++) {
            LevelSortedRun next = runs.get(i);
            runCount++;
            if (next.level() != 0) {
                outputLevel = next.level();
                break;
            }
        }
    }

    // 【关键判断 3】再次确认全量压缩
    if (runCount == runs.size()) {
        updateLastOptimizedCompaction();
        outputLevel = maxLevel;  // ← 确保全量压缩输出到 maxLevel
    }

    return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));
}

层级结构对比

状态 L0 L1 L2 (maxLevel) 查询方式 Read-Optimized 文件总数
初始写入(无压缩) file1, file2, file3, file4, file5 读取 L0 所有文件 ❌ 否 5
Minor Compaction 后 new_file(不影响继续写入) merged_L1 merged_L2 合并 L0+L1+L2 ❌ 否 3
Full Compaction 后 full_merged 直接读取 L2 ✅ 是 1

演化说明

  • 阶段 1 → 阶段 2
    • 触发 Minor Compaction(compaction-trigger=5
    • file1-file5 中部分文件合并到 L1 → merged_L1
    • 继续压缩推进到 L2 → merged_L2
    • 新写入的文件继续进入 L0 → new_file
    • 结果:文件从 5 个减少到 3 个,但分散在 3 个层级,查询时需多层合并
  • 阶段 2 → 阶段 3
    • 触发 Full Compaction(optimization-intervaldelta-commits
    • 合并所有层级(L0 + L1 + L2)的所有文件到 maxLevel
    • 结果:实现单层单文件结构,可直接读取无需合并

关键特点总结

  • Minor Compaction

    • 选择性合并部分文件(runCount < runs.size()
    • 输出层级:outputLevel = Math.max(0, runs.get(runCount).level() - 1)
    • 逐步向高层推进,形成多层结构
  • Full Compaction

    • 合并所有层级的所有文件(runCount == runs.size()
    • 输出层级:outputLevel = maxLevel(源码明确定义)
    • 清空 L0、L1,所有数据集中在 maxLevel
    • 避免输出到 L0(L0 专用于新写入)
  • 触发条件

    • Minor:文件数达到 num-sorted-run.compaction-trigger
    • Full:提交次数达到 full-compaction.delta-commits 或时间间隔 compaction.optimization-interval
  • Read-Optimized 更新

    • 只有 Full Compaction 生成单层结构(仅 maxLevel 有数据)
    • 才会更新 $ro 系统表

二、Compaction 类型详解

2.1 Minor Compaction(增量压缩)

定义:选择性地合并部分文件,而不是全部文件。

特点

  • 写入影响小,适合流式场景
  • 合并速度快
  • 文件数量逐步减少
  • 默认用于 Flink 流式作业

适用场景

  • 持续写入的流式作业
  • 对写入性能要求高的场景
  • 不需要频繁全表扫描的表

配置示例

-- 使用 Minor Compaction(默认行为)
CREATE TABLE minor_compact_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    name STRING,
    age INT,
    update_time TIMESTAMP(3)
) TBLPROPERTIES (
    'num-sorted-run.compaction-trigger' = '5',  -- 5个文件触发压缩
    'num-sorted-run.stop-trigger' = '10'        -- 10个文件暂停写入
);

2.2 Full Compaction(全量压缩)

定义:将所有层级的文件合并成单层的连续文件。

特点

  • 彻底减少文件数(理想情况下合并为 1 个文件)
  • 写入性能影响大
  • 查询性能最优(无需读时合并)
  • 默认用于 Spark 批处理作业
  • 更新 Read-Optimized 系统表($ro

适用场景

  • 批处理作业
  • 读取密集型表
  • COW 模式表
  • 定期维护作业
  • 需要零合并成本的读优化查询

配置方式

方式一:每次写入后全量压缩(COW 模式)

CREATE TABLE cow_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'full-compaction.delta-commits' = '1'  -- 每次提交后触发全量压缩
);

方式二:定期优化压缩(推荐用于 $ro 表更新)

CREATE TABLE periodic_compact_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'compaction.optimization-interval' = '1 h'  -- 每小时执行一次优化压缩,更新 $ro 表
);

重要说明

  • compaction.optimization-interval 的主要作用是确保 Read-Optimized 系统表($ro)的查询时效性
  • 只有 Full Compaction 后,数据才会被包含在 $ro 表中
  • 如果不配置定期 Full Compaction,$ro 表可能包含旧数据

方式三:基于文件总大小触发

CREATE TABLE size_based_compact_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'compaction.total-size-threshold' = '100 MB'  -- 总大小小于100MB时触发全量压缩
);

2.3 Lookup Compaction(查找压缩)

定义:针对主键表的特殊压缩策略,应用于以下场景:

  • Lookup changelog producer 表
  • First-row merge engine 表
  • MOW 模式(使用 deletion vectors)的表

2.3.1 Radical 模式(激进模式,默认)

特点

  • 强制将 L0 文件立即提升到更高层级
  • 数据新鲜度高
  • 资源消耗大

配置

CREATE TABLE radical_lookup_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'changelog-producer' = 'lookup',
    'lookup-compact' = 'RADICAL',  -- 默认值,可省略
    'lookup-wait' = 'true'          -- 等待 lookup compaction 完成
);

2.3.2 Gentle 模式(温和模式)

特点

  • 使用 Universal Compaction 策略
  • 资源消耗较小
  • 数据新鲜度相对较低
  • 可配置最大间隔时间

配置

CREATE TABLE gentle_lookup_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'changelog-producer' = 'lookup',
    'lookup-compact' = 'GENTLE',
    'lookup-compact.max-interval' = '60'  -- 最多60次commit必须执行一次L0压缩
);

2.4 Append-Only 表 Compaction(文件合并)

定义:Append-Only 表的 Compaction 与主键表完全不同,它不涉及数据合并,只是将多个小文件拼接成大文件,以提高读取性能。

2.4.1 核心特点

与主键表 Compaction 的本质区别

特性 Append-Only Compaction 主键表 LSM Compaction
目的 小文件合并,提高读性能 数据去重、合并更新、删除旧版本
是否合并数据 ❌ 不合并,只是文件拼接 ✅ 合并相同 key 的多版本数据
数据顺序 ✅ 严格保持追加顺序(按 sequence number) ⚠️ 按 key 排序,破坏原始顺序
复杂度 低(顺序读写) 高(需要 merge 算法)
性能开销
默认启用 ✅ 是 ✅ 是
禁用方式 'write-only' = 'true' 'write-only' = 'true'

源码验证(基于 AppendOnlyFileStoreWrite.java:127-139):

CompactManager compactManager =
    skipCompaction
        ? new NoopCompactManager()  // write-only=true 时禁用
        : new AppendOnlyCompactManager(...);  // 默认启用

2.4.2 Compaction 触发条件

根据 AppendOnlyCompactManager.java:166-193,触发条件为:

方式一:Auto Compaction(自动压缩)

// 触发条件(满足其一即可)
if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
        || fileNum >= maxFileNum) {
    // 触发 compaction
}
  • 条件 1:总文件大小 >= target-file-size 文件数 >= compaction.min.file-num
  • 条件 2:文件数 >= compaction.max.file-num

方式二:Full Compaction(全量压缩)

  • 合并所有小文件(跳过已达到 target-file-size 的大文件)
  • 至少需要 3 个文件才触发
  • 仅在小文件数量多于大文件时执行

2.4.3 Compaction 处理逻辑

核心流程(源码:AppendOnlyFileStoreWrite.java:163-188):

public AppendOnlyCompactManager.CompactRewriter compactRewriter(...) {
    return toCompact -> {
        RowDataRollingFileWriter rewriter = new RowDataRollingFileWriter(...);
        // 按 sequence number 顺序读取,直接写入新文件
        rewriter.write(bucketReader(partition, bucket).read(toCompact));
        return rewriter.result();
    };
}

关键特性

  • 按 sequence number 顺序读取(保证追加顺序)
  • 不做数据合并计算(没有 key 去重或聚合)
  • 简单的数据复制:多个小文件 → 一个大文件

2.4.4 流式模式的 Compaction 拓扑

根据 UnawareBucketSink.java:64-78

boolean enableCompaction = !table.coreOptions().writeOnly();

if (enableCompaction && isStreamingMode && !boundedInput) {
    // 添加独立的 compaction 拓扑
    UnawareBucketCompactionTopoBuilder builder = ...;
    written = written.union(builder.fetchUncommitted(initialCommitUser));
}

拓扑结构

              ┌─────────────────┐
              │  Input Stream   │
              └────────┬────────┘
                       │
         ┌─────────────┴─────────────┐
         ▼                           ▼
  ┌─────────────┐         ┌──────────────────┐
  │   Writers   │         │ Compaction Tasks │
  │  (append)   │         │ (merge files)    │
  └──────┬──────┘         └─────────┬────────┘
         │                          │
         └──────────┬───────────────┘
                    ▼
           ┌─────────────────┐
           │   Committer     │
           └─────────────────┘

2.4.5 配置示例

启用 Compaction(默认)

CREATE TABLE logs (
    log_time TIMESTAMP(3),
    user_id BIGINT,
    message STRING
) TBLPROPERTIES (
    'bucket' = '-1',                    -- UNAWARE 模式
    'write-only' = 'false',             -- 启用 compaction(默认)
    'compaction.min.file-num' = '5',    -- 最小文件数
    'compaction.max.file-num' = '50',   -- 最大文件数
    'target-file-size' = '128MB'        -- 目标文件大小
);

禁用 Compaction(极高吞吐场景)

CREATE TABLE high_throughput_logs (
    event_time TIMESTAMP(3),
    event_data STRING
) TBLPROPERTIES (
    'bucket' = '-1',
    'write-only' = 'true'  -- 禁用 compaction,追求极致写入性能
);

2.4.6 适用场景

✅ 推荐启用 Compaction

  • 日志收集、事件流存储
  • 数据写入后会被查询
  • 需要减少小文件数量
  • HDFS NameNode 压力大

⚠️ 考虑禁用 Compaction

  • 极高吞吐写入(每秒数百万条)
  • 数据写入后几乎不读取
  • 追求极致写入性能
  • 可接受大量小文件

2.4.7 监控建议

关键指标

  • 文件数量:SELECT COUNT(*) FROM table$files WHERE bucket = 0
  • 平均文件大小:SELECT AVG(file_size_in_bytes) FROM table$files
  • Compaction 频率:查看 Flink Metrics

告警阈值

  • 单 bucket 文件数 > 1000(警告)
  • 单 bucket 文件数 > 10000(严重)
  • 平均文件大小 < 10MB(说明 compaction 不足)

2.5 Sort Compaction(排序压缩)

定义:在合并文件时,根据指定的列对数据重新排序,从而优化数据的物理布局,提升查询性能。

适用场景

  • 数据插入是无序的
  • 需要优化范围查询性能
  • 需要提升数据局部性

2.4.1 排序策略

Order(普通排序)

按指定列顺序排序:

CREATE TABLE order_sorted_table (
    id BIGINT,
    region STRING,
    city STRING,
    amount DECIMAL(10,2),
    create_time TIMESTAMP(3)
) TBLPROPERTIES (
    'bucket' = '4',
    'sort-compaction' = 'true',
    'sort-compaction.order-by' = 'region,city'  -- 先按region,再按city排序
);

Z-Order(多维索引)

使用 Z-Order 曲线进行多维排序:

CREATE TABLE zorder_table (
    id BIGINT,
    region STRING,
    city STRING,
    amount DECIMAL(10,2)
) TBLPROPERTIES (
    'bucket' = '4',
    'sort-compaction' = 'true',
    'sort-compaction.zorder-by' = 'region,city,amount'
);

优势

  • 多维查询性能好
  • 适合 WHERE 条件包含多个列的场景

Hilbert(希尔伯特曲线)

使用 Hilbert 曲线进行排序:

CREATE TABLE hilbert_table (
    id BIGINT,
    x DOUBLE,
    y DOUBLE
) TBLPROPERTIES (
    'bucket' = '4',
    'sort-compaction' = 'true',
    'sort-compaction.hilbert-by' = 'x,y'
);

优势

  • 空间局部性更好
  • 适合地理空间数据

2.4.2 使用 Action Jar 执行 Sort Compaction

./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --table my_table \
  --order_strategy zorder \
  --order_by region,city,amount

支持的排序策略

  • order:普通排序
  • zorder:Z-Order 排序
  • hilbert:Hilbert 排序

三、Read-Optimized 表($ro)详解

3.1 什么是 $ro

$ro(Read-Optimized)表是 Paimon 提供的一个系统表,它是数据表的一个特殊视图:

  • 只包含已完成 Full Compaction 的数据
  • 查询时无需合并多个文件,直接读取即可
  • 提供零合并成本(即无需在内存中合并多个文件,直接读取单层文件即可)的读优化性能

查询方式

-- 假设原表名为 my_table
SELECT * FROM my_table$ro WHERE id = 123;

3.2 为什么 Minor Compaction 不更新 $ro

核心原因:Minor Compaction 采用 Universal Compaction 策略,数据仍然分散在多个层级(L0、L1、L2...),查询时需要在内存中合并多层文件,无法提供"零合并成本"的读优化体验。

详细的 LSM Tree 层级结构说明,请参见 第一章 1.3 节

关键区别总结

Compaction 类型 层级结构 查询方式 是否 Read-Optimized
Minor Compaction 多层(L0+L1+L2...) 需要内存合并多层文件 ❌ 否
Full Compaction 单层(仅 maxLevel) 直接读取单层文件 ✅ 是

3.3 $ro 表的更新时机

只有 Full Compaction 才会更新 $ro

Compaction 类型 更新 $ro 原因
Minor Compaction ❌ 不更新 数据仍在多层,需要读时合并
Full Compaction ✅ 更新 数据合并到单层,无需读时合并

官方文档说明

compaction.optimization-interval: Implying how often to perform an optimization full compaction, this configuration is used to ensure the query timeliness of the read-optimized system table.

翻译:该配置用于确保 Read-Optimized 系统表的查询时效性

3.4 配置示例与性能权衡

场景 1:高时效性 $ro 表(频繁 Full Compaction)

CREATE TABLE high_timeliness_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING,
    update_time TIMESTAMP(3)
) TBLPROPERTIES (
    'compaction.optimization-interval' = '30 min'  -- 每30分钟更新 $ro 表
);

特点

  • $ro 表数据新鲜度高(最多延迟30分钟)
  • ❌ 写入性能下降(频繁 Full Compaction)
  • 适合:读密集型 + 需要实时 Read-Optimized 查询

场景 2:低时效性 $ro 表(不频繁 Full Compaction)

CREATE TABLE low_timeliness_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING,
    update_time TIMESTAMP(3)
) TBLPROPERTIES (
    'compaction.optimization-interval' = '6 h'  -- 每6小时更新 $ro 表
);

特点

  • ✅ 写入性能好(Full Compaction 不频繁)
  • $ro 表数据可能延迟6小时
  • 适合:写密集型 + $ro 表用于离线分析

场景 3:不配置 Full Compaction(仅 Minor Compaction)

CREATE TABLE minor_only_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'num-sorted-run.compaction-trigger' = '5'  -- 仅配置 Minor Compaction
    -- 未配置 compaction.optimization-interval
);

特点

  • ✅ 写入性能最优
  • $ro永远不会更新(或极少更新)
  • ❌ 查询主表时需要合并多层文件
  • 适合:写密集型 + 不使用 $ro

3.5 实际应用建议

何时使用 $ro 表?

推荐场景

  • ✅ 分析查询(BI 工具、报表)
  • ✅ 批处理读取(Spark 批量扫描)
  • ✅ 对查询延迟敏感的场景
  • ✅ 读多写少的表

不推荐场景

  • ❌ 实时查询(主表更实时)
  • ❌ 写入密集型表(Full Compaction 开销大)
  • ❌ 需要最新数据($ro 表有延迟)

配置建议

平衡配置(推荐):

CREATE TABLE balanced_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'num-sorted-run.compaction-trigger' = '5',        -- Minor Compaction:快速写入
    'compaction.optimization-interval' = '2 h'        -- Full Compaction:定期更新 $ro 表
);

监控 $ro 表时效性

-- 查看最新 snapshot 的提交时间
SELECT snapshot_id, commit_time
FROM my_table$snapshots
ORDER BY snapshot_id DESC
LIMIT 1;

-- 查看 $ro 表的数据范围
SELECT MIN(create_time), MAX(create_time)
FROM my_table$ro;

3.6 常见误解

误解 1:Minor Compaction 也会更新 $ro

  • 真相:只有 Full Compaction 才会更新

误解 2$ro 表自动包含所有数据

  • 真相:只包含已完成 Full Compaction 的数据

误解 3:查询 $ro 表和主表结果一样

  • 真相$ro 表可能缺少最新写入的数据

正确理解

  • $ro 表是主表的子集
  • 提供零合并成本的读优化性能
  • 通过 compaction.optimization-interval 控制时效性
  • 性能与时效性需要权衡

四、专用压缩作业(Dedicated Compaction Job)

4.1 为什么需要专用压缩作业

问题场景

  • 多个 Flink 作业同时写入同一张表
  • 如果每个作业都执行压缩,会产生冲突导致作业失败

冲突原因

  • 同一分区只能有一个作业执行压缩
  • 并发压缩会导致文件冲突和快照冲突

解决方案

  • 写入作业只负责写入(write-only = true
  • 单独启动一个专用压缩作业负责压缩

4.2 ⚠️ S3 等对象存储的重要注意事项

官方文档重要警告

For S3-like object store, its 'RENAME' does not have atomic semantic. We need to configure Hive metastore and enable 'lock.enabled' option for the catalog.

问题说明

S3 等对象存储的限制

  • S3、阿里云 OSS、华为云 OBS 等对象存储不支持原子性的 RENAME 操作
  • 文件重命名在对象存储中实际是"复制+删除"操作
  • 在并发压缩场景下,可能导致数据不一致作业失败

必须的配置

对于 S3/OSS/OBS 等对象存储,必须启用分布式锁

使用 Hive Metastore + 锁
-- 创建 Catalog 时启用锁
CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri' = 'thrift://hive-metastore:9083',
    'warehouse' = 's3://my-bucket/warehouse',
    'lock.enabled' = 'true'  -- ⚠️ 必须启用!
);

-- 创建表
CREATE TABLE my_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'write-only' = 'true'  -- 多写入者场景
);

不启用锁的风险

场景:多个作业同时写入 S3 上的 Paimon 表,未启用锁

作业 A:正在执行 Compaction
  ├─ 步骤 1:读取文件 [file1, file2, file3]
  ├─ 步骤 2:合并为 merged_file
  └─ 步骤 3:准备提交快照(RENAME 临时文件)
       ↓
作业 B:同时也在执行 Compaction
  ├─ 步骤 1:读取文件 [file1, file2, file4](部分重叠!)
  ├─ 步骤 2:合并为 merged_file2
  └─ 步骤 3:准备提交快照
       ↓
结果:文件冲突、数据丢失、作业失败 ❌

启用锁后的行为

作业 A:获取锁 → 执行 Compaction → 提交快照 → 释放锁 ✅
       ↓
作业 B:等待锁 → 获取锁 → 执行 Compaction → 提交快照 → 释放锁 ✅
       ↓
结果:串行执行,避免冲突 ✅

重要说明

  • 多作业并发压缩:多个作业同时执行压缩,S3 等对象存储必须启用锁
  • 专用压缩作业:只有一个专用压缩作业 + 写入作业设置 write-only=true,通常不需要锁
    • ✅ 架构本身保证了无并发冲突
    • ✅ 避免 Hive Metastore 锁的复杂性和稳定性问题
    • ⚠️ 但需要确保真的只有一个压缩作业运行

常见错误速查

错误 解决方案
ConflictException: Concurrent compaction 启用 lock.enabled=true
Connection refused (Metastore) 检查 Metastore 服务和网络
Lock timeout 增加 lock.timeout=120000

4.3 配置写入表为只写模式

CREATE TABLE multi_writer_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'write-only' = 'true'  -- 跳过压缩和快照过期
);

重要说明

  • write-only=true 时,写入作业完全跳过压缩逻辑(源码使用 NoopCompactManager
  • ✅ 压缩参数应在建表 DDL 中配置,供专用压缩作业读取:
    • 'num-sorted-run.compaction-trigger' = '5' —— 专用压缩作业触发阈值 ✅
    • 'full-compaction.delta-commits' = '10' —— Full Compaction 触发间隔 ✅
  • ⚠️ num-sorted-run.stop-trigger 在专用压缩模式下完全失效
    • 原因:写入作业使用 NoopCompactManagershouldWaitForLatestCompaction() 永远返回 false
    • 风险:若压缩速度 < 写入速度,sorted run 数会无限增长,最终导致:
      • 读性能严重恶化(需合并大量文件)
      • 文件数失控(如文档中提到的 90 万文件案例)
    • 缓解措施:
      • 监控 L0 文件数和 sorted run 数指标
      • 及时调整压缩作业并行度:'sink.parallelism' = '8'
      • 必要时临时暂停部分写入作业

4.4 启动专用压缩作业

-- 压缩单个表
CALL sys.compact('database.table_name');

-- 压缩指定分区
CALL sys.compact(
    `table` => 'database.table_name',
    `partitions` => 'dt=2025-12-08',
    `options` => 'sink.parallelism=4'
);
Minor Compaction(流式模式)
cd /Users/10281539/WorkSpace/software/flink-1.17.2

./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --table my_table \
  --compact_strategy minor
Full Compaction(批处理模式)
./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --table my_table \
  --compact_strategy full
压缩指定分区
./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --table my_table \
  --partition dt=2025-12-08 \
  --compact_strategy full
数据库级别压缩(Combined 模式)
./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --mode combined \
  --compact_strategy minor

Combined 模式特点

  • 单个 Flink 作业压缩整个数据库的所有表
  • 自动发现新表并压缩
  • 资源共享,避免启动多个作业

4.5 历史分区压缩

对于长期空闲的分区,可以使用专用作业进行全量压缩:

./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --table my_table \
  --partition_idle_time 7d \
  --compact_strategy full

参数说明

  • --partition_idle_time:指定分区空闲时长阈值,只压缩闲置时间超过该值的分区
    • 格式:7d(7天)、12h(12小时)、30m(30分钟)
    • 作用:过滤出最后接收数据时间 < (当前时间 - partition_idle_time) 的分区
    • 使用场景:
      • ✅ 对历史分区进行 Full Compaction,避免影响"热"分区
      • ✅ 减少不必要的压缩开销,只处理不再接收数据的分区
      • ✅ 配合 --compact_strategy full 进行深度优化
  • --compact_strategy:压缩策略(fullminor
    • full:全量压缩,合并所有文件到单层(仅 Batch 模式)
    • minor:增量压缩,选择性合并部分文件(默认由运行模式决定)

重要限制(基于源码验证):

  • ⚠️ partition_idle_time 仅支持 Batch 模式(必须添加 -Dexecution.runtime-mode=BATCH
  • ⚠️ Streaming 模式会抛出异常:Streaming mode does not support partitionIdleTime
  • ⚠️ 不能与 --order_strategy 参数一起使用(Sort Compaction 不支持)
  • ⚠️ 功能在 Paimon 1.0+ 版本引入,0.8 版本不支持

五、核心配置参数详解

5.1 基础触发参数

参数 默认值 类型 说明
num-sorted-run.compaction-trigger 5 Integer 触发压缩的最小 Sorted Run 数量(包含 L0 文件和高层 Run)
num-sorted-run.stop-trigger trigger+3 Integer 暂停写入的 Sorted Run 阈值
sort-spill-threshold (none) Integer 超过该数量的 Sort Reader 会溢出到磁盘,防止 OOM

调优建议

-- 写密集型场景(最大化写入性能)
'num-sorted-run.compaction-trigger' = '10'
'num-sorted-run.stop-trigger' = '2147483647'  -- 永不停止
'sort-spill-threshold' = '15'

-- 读密集型场景(最小化文件数)
'num-sorted-run.compaction-trigger' = '3'
'num-sorted-run.stop-trigger' = '5'
'full-compaction.delta-commits' = '5'

-- 平衡场景(推荐)
'num-sorted-run.compaction-trigger' = '5'      -- 默认
'num-sorted-run.stop-trigger' = '10'           -- 默认
'sort-spill-threshold' = '10'

5.2 压缩策略参数

主键表参数

参数 默认值 类型 说明
compaction.optimization-interval (none) Duration 优化压缩执行频率(如 '1h', '30min')
full-compaction.delta-commits (none) Integer 每N次增量提交后执行全量压缩(同步)
compaction.total-size-threshold (none) MemorySize 文件总大小低于该值时触发全量压缩
compaction.force-rewrite-all-files false Boolean 是否强制选择所有文件进行全量压缩

Append-Only 表参数

参数 默认值 类型 说明
write-only false Boolean 是否禁用 compaction(true 时使用 NoopCompactManager)
compaction.min.file-num 5 Integer 触发 compaction 的最小文件数(配合文件总大小条件)
compaction.max.file-num 50 Integer 触发 compaction 的最大文件数(单独触发条件)
target-file-size 128MB MemorySize 目标文件大小,compaction 会尽量合并到此大小
compaction.delete-ratio-threshold 0.2 Double 删除行比例超过该值时强制压缩(带删除向量的 Append 表)

5.3 Lookup Compaction 参数

参数 默认值 类型 说明
lookup-compact RADICAL Enum Lookup 压缩模式:RADICAL(激进)或 GENTLE(温和)
lookup-compact.max-interval (none) Integer Gentle 模式下,最多N次 commit 必须执行一次 L0 压缩
lookup-wait true Boolean Commit 是否等待 Lookup Compaction 完成
force-lookup false Boolean 是否强制使用 Lookup Compaction

5.4 Sort Compaction 参数

参数 默认值 类型 说明
sort-compaction false Boolean 是否启用排序压缩
sort-compaction.order-by (none) String 普通排序的列(逗号分隔)
sort-compaction.zorder-by (none) String Z-Order 排序的列(逗号分隔)
sort-compaction.hilbert-by (none) String Hilbert 排序的列(逗号分隔)
sort-compaction.range-strategy QUANTITY Enum 范围分布策略:SIZE 或 QUANTITY
sort-compaction.local-sample.magnification 1000 Integer 本地采样放大倍数

5.5 高级参数

参数 默认值 类型 说明
compaction.size-ratio 1 Integer Changelog 模式下比较 Sorted Run 大小的灵活百分比
compaction.max-size-amplification-percent 200 Integer Changelog 模式下存储放大的最大百分比
compaction.delete-ratio-threshold 0.2 Double Append-Only 表中删除行比例超过该值时强制压缩
compaction.force-up-level-0 false Boolean 是否始终将 L0 文件包含在压缩候选中
compaction.offpeak-ratio 0 Integer 离峰时间的压缩比例
compaction.offpeak.start.hour -1 Integer 离峰时间开始小时(0-23)
compaction.offpeak.end.hour -1 Integer 离峰时间结束小时(0-23)

5.6 离峰时间压缩

针对写入有明显高峰和低峰的场景,可以配置离峰时间更激进的压缩:

CREATE TABLE offpeak_compact_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'compaction.offpeak.start.hour' = '22',   -- 晚上10点开始
    'compaction.offpeak.end.hour' = '6',      -- 早上6点结束
    'compaction.offpeak-ratio' = '50'         -- 离峰时间压缩比例提升50%
);

六、不同场景的最佳实践

6.1 场景一:流式写入 + 实时查询(推荐 MOW 模式)

需求

  • Flink 流式作业持续写入
  • 需要支持实时查询
  • 读写性能需要平衡

配置方案

CREATE TABLE realtime_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    user_id BIGINT,
    event_type STRING,
    event_time TIMESTAMP(3)
) TBLPROPERTIES (
    'bucket' = '-1',                              -- 动态桶
    'deletion-vectors.enabled' = 'true',          -- MOW 模式
    'changelog-producer' = 'lookup',
    'lookup-compact' = 'GENTLE',                  -- 温和压缩,降低资源消耗
    'lookup-compact.max-interval' = '100',
    'num-sorted-run.compaction-trigger' = '5',
    'num-sorted-run.stop-trigger' = '12',
    'sort-spill-threshold' = '10'
);

6.2 场景二:批量写入 + 读密集型(推荐 COW 模式)

需求

  • Spark 批处理定期写入(如每小时一次)
  • 查询频繁,对读性能要求高
  • 写入不频繁,可接受写入开销

配置方案

CREATE TABLE batch_cow_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'bucket' = '8',
    'full-compaction.delta-commits' = '1',        -- 每次写入后全量压缩
    'file-index.bloom-filter.columns' = 'id'      -- 添加索引优化查询
);

6.3 场景三:多写入者 + 专用压缩

需求

  • 多个 Flink 作业写入同一张表
  • 避免压缩冲突
  • 需要统一的压缩策略

配置方案

步骤 1:创建表,启用 write-only

CREATE TABLE multi_writer_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'write-only' = 'true',                        -- 写入作业不执行压缩
    'bucket' = '-1'
);

步骤 2:启动专用压缩作业

# 使用 Combined 模式压缩整个数据库
cd /Users/10281539/WorkSpace/software/flink-1.17.2

./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --mode combined \
  --compact_strategy minor

6.4 场景四:大表 + Sort Compaction

需求

  • 表数据量大(TB 级)
  • 查询经常包含范围过滤和多维条件
  • 需要优化数据局部性

配置方案

CREATE TABLE large_table_with_sort (
    id BIGINT,
    region STRING,
    city STRING,
    category STRING,
    amount DECIMAL(10,2),
    create_time TIMESTAMP(3)
) TBLPROPERTIES (
    'bucket' = '16',
    'sort-compaction' = 'true',
    'sort-compaction.zorder-by' = 'region,city,category',  -- Z-Order 多维优化
    'compaction.optimization-interval' = '2 h',             -- 每2小时优化一次
    'file-index.bloom-filter.columns' = 'id',
    'file-index.bitmap.columns' = 'region,category'
);

6.5 场景五:高吞吐写入 + 异步压缩

需求

  • 峰值写入 QPS 极高
  • 可接受查询性能在高峰期下降
  • 低峰期资源充足

配置方案

CREATE TABLE high_throughput_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'bucket' = '32',                               -- 增加并行度
    'num-sorted-run.stop-trigger' = '2147483647',  -- 永不停止写入
    'sort-spill-threshold' = '15',                 -- 防止 OOM
    'lookup-wait' = 'false',                       -- 不等待 lookup 压缩
    'compaction.optimization-interval' = '30 min'  -- 低峰期定期优化
);

6.6 场景六:Append-Only 日志表(推荐配置)

需求

  • 日志收集、事件流存储
  • 只追加不更新
  • 需要查询历史数据
  • 避免小文件问题

配置方案

CREATE TABLE event_logs (
    event_time TIMESTAMP(3),
    event_type STRING,
    user_id BIGINT,
    event_data STRING,
    dt STRING
) PARTITIONED BY (dt)
TBLPROPERTIES (
    'bucket' = '-1',                    -- UNAWARE 模式
    'write-only' = 'false',             -- ✅ 启用 compaction(推荐)
    'compaction.min.file-num' = '10',   -- 至少 10 个文件才触发
    'compaction.max.file-num' = '100',  -- 最多 100 个文件必须触发
    'target-file-size' = '256MB'        -- 合并到 256MB
);

特点

  • ✅ 后台自动合并小文件,减少 HDFS NameNode 压力
  • ✅ 提高查询性能(减少文件打开次数)
  • ✅ 保持数据追加顺序(按 sequence number)
  • ✅ 不影响写入性能(异步执行)

6.7 场景七:极高吞吐 Append-Only 表(禁用 Compaction)

需求

  • 每秒数百万条日志写入
  • 数据写入后几乎不查询
  • 追求极致写入性能
  • 可接受大量小文件

配置方案

CREATE TABLE ultra_high_throughput_logs (
    log_time TIMESTAMP(3),
    log_level STRING,
    log_message STRING
) TBLPROPERTIES (
    'bucket' = '-1',
    'write-only' = 'true'  -- ❌ 禁用 compaction,最大化写入性能
);

注意事项

  • ⚠️ 会产生大量小文件
  • ⚠️ 需要定期手动执行 compaction 或使用专用 compaction 作业
  • ⚠️ 查询性能会随文件数增长而下降
  • ✅ 适合"写多读少"的场景

七、监控与调优

7.1 关键监控指标

指标 说明 监控方法
文件数量 当前 Bucket 的 Sorted Run 数量 查看 manifest 文件
压缩频率 Compaction 触发次数和间隔 Flink Metrics
压缩延迟 Compaction 完成耗时 Flink Metrics
写入延迟 是否因压缩导致写入暂停 应用日志
查询性能 查询响应时间变化 查询日志

7.2 性能问题排查

常见问题速查

问题 原因 解决方案
写入频繁暂停 stop-trigger 太低 增加 stop-trigger=20
查询性能下降 文件数过多 降低 compaction-trigger=3 或手动压缩
OOM 错误 同时打开文件过多 设置 sort-spill-threshold=8
压缩作业冲突 多作业并发压缩 使用 write-only=true + 专用压缩作业

问题 5:Hive 锁超时导致 L0 文件疯狂膨胀(生产案例)

真实生产案例

场景描述

  • 多个写入作业,配置 write-only = true
  • 使用 Hive Metastore(启用了锁)
  • 单独启动 Dedicated Compaction 作业
  • 正常运行几天后突然出现问题

症状

错误信息:
Failed to acquire lock after timeout

现象:
1. Compaction 作业获取 Hive 锁超时
2. Compaction 失败后任务"跳过"(可能是静默失败或重试放弃)
3. L0 文件数量疯狂增长:从几十个膨胀到 90+ 万个 ❌
4. 查询性能急剧下降,甚至无法查询
5. 存储空间浪费严重(大量小文件)

根本原因分析

  1. lock.enabled 配置来源排查

    • Paimon 0.8/0.9 默认值lock.enabled 默认为 false
    • Hive Metastore 不会自动启用锁:仅使用 metastore=hive 不会自动启用锁机制 ✅
    • 可能的锁来源
      • Flink 0.9 写入作业的 catalog 配置中显式设置了 lock.enabled=true
      • flink-conf.yaml 全局配置中设置了锁
      • Dedicated Compaction 作业启动时的 catalog 配置继承或默认启用了锁
      • 跨版本配置迁移(Spark 0.8 → Flink 0.9)可能导致配置不一致
    • 建议排查方向
      # 检查 Flink 写入作业的 catalog 配置
      # 查看 CREATE CATALOG 语句或作业提交参数
      
      # 检查 Flink 全局配置
      cat $FLINK_HOME/conf/flink-conf.yaml | grep lock
      
      # 检查 Compaction 作业启动日志
      # 查看实际使用的 catalog 配置
      
  2. Hive Metastore 锁机制问题

    • Hive 锁依赖于 Metastore 的性能和稳定性
    • 如果 Metastore 繁忙、网络抖动、或锁表损坏,会导致获取锁超时
    • 长时间运行后,Metastore 可能积累大量锁记录
  3. Compaction 失败处理不当

    • 获取锁失败后,Compaction 作业可能静默失败跳过本次压缩
    • 并未明确报错导致任务失败,而是继续运行
    • 写入作业持续产生 L0 文件,但没有 Compaction 清理
  4. L0 文件膨胀恶性循环

    写入作业 → 持续产生 L0 文件
         ↓
    Compaction 作业获取锁超时 → 压缩失败
         ↓
    L0 文件继续增长(几天内到 90+ 万)
         ↓
    查询需要读取海量文件 → 性能急剧下降
         ↓
    甚至 OOM、查询超时
    

为什么 lock.enabled=false 解决了问题?

  1. 绕过 Hive Metastore 锁

    • 不再依赖 Hive Metastore 的锁机制
    • 避免锁超时问题
  2. 专用压缩作业本身就是单例

    • 只有一个 Compaction 作业运行,不存在并发冲突
    • 不需要额外的分布式锁保护
  3. 写入作业已设置 write-only=true

    • 写入作业不执行压缩
    • 所有压缩由专用作业负责
    • 天然避免了并发压缩

解决方案

✅ 推荐方案:禁用 Hive 锁

./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse hdfs://namenode:8020/warehouse \
  --database my_db \
  --mode combined \
  --catalog_conf metastore=hive \
  --catalog_conf uri=thrift://hive-metastore:9083 \
  --catalog_conf lock.enabled=false  # 关键:禁用锁

原因:专用压缩作业本身就是单例,配合 write-only=true 不会有并发冲突。

⚠️ 预防措施:监控 L0 文件数量

  • 告警阈值:1000(警告)、10000(严重)、100000(危险)
  • 监控方式:SELECT COUNT(*) FROM my_table$files WHERE level = 0

配置建议速查表

场景 lock.enabled 说明
专用压缩 + write-only false(推荐) 不会有并发冲突,无需锁
多写入者 + 每个都压缩 true(必须) 存在并发冲突,必须用锁
单写入者 ❌ false 单作业本身不冲突
S3 存储 + 专用压缩 ❓ 视情况 如果只有一个压缩作业,可以不启用

经验总结

  1. 专用压缩作业 + write-only 模式下,通常不需要 Hive 锁

    • 架构本身已经保证了不会有并发压缩
    • 启用锁反而引入了复杂性和不稳定性
  2. 监控是关键

    • 定期检查 L0 文件数量
    • 监控 Compaction 作业是否正常执行
    • 设置告警阈值
  3. L0 文件膨胀恢复方案

    • 如果已经膨胀到 90+ 万,直接修复需要很长时间
    • 可以考虑:
      • 停止写入,专注清理 L0 文件
      • 增加 Compaction 并行度
      • 或者备份数据,重建表
  4. Hive Metastore 不是万能的

    • Hive 锁依赖 Metastore 的稳定性
    • 生产环境需要考虑 Metastore 的性能和可靠性
    • 对于专用压缩场景,锁不是必需的

7.3 调优原则

  1. 写密集型场景优先

    • 提高 stop-trigger
    • 启用异步压缩
    • 使用 Minor Compaction
  2. 读密集型场景优先

    • 降低 compaction-trigger
    • 启用 Full Compaction
    • 考虑 COW 模式
  3. 平衡型场景

    • 使用 MOW 模式(推荐)
    • 保持默认 trigger 值
    • 定期优化压缩
  4. 资源受限场景

    • 使用 Gentle Lookup Compaction
    • 设置离峰时间压缩
    • 控制 sort-spill-threshold

八、常见问题 FAQ

Q1: Compaction 是同步还是异步的?

:取决于引擎和配置:

场景 同步/异步 说明
Flink 流式作业 异步 后台线程执行,不阻塞写入
Spark 批处理作业 同步 作业提交前执行,阻塞提交
full-compaction.delta-commits 同步 每次提交时执行,影响写入性能
compaction.optimization-interval 异步 定时触发,后台执行

Q2: 为什么设置了 write-only = true 还会压缩?

:可能原因:

  1. 使用了旧版本 Paimon,该参数未生效
  2. 同时运行了其他未设置 write-only 的作业
  3. 手动触发了压缩(CALL sys.compact()

Q3: Minor Compaction 和 Full Compaction 可以同时配置吗?配置之间有相关性吗?

:可以同时配置,且推荐同时配置。两者配置相互独立,各自触发互不影响:

独立触发机制

  • Minor Compaction:由文件数量驱动(num-sorted-run.compaction-trigger),持续工作
  • Full Compaction:由提交次数或时间间隔驱动(full-compaction.delta-commitscompaction.optimization-interval),定期执行

配置示例

CREATE TABLE mixed_compact_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'num-sorted-run.compaction-trigger' = '5',        -- Minor: 5个文件触发
    'compaction.optimization-interval' = '1 h'        -- Full: 每小时触发
);

运行效果

  • Minor Compaction 持续工作,保持文件数在可控范围(避免 OOM)
  • Full Compaction 定期执行,生成单层文件,更新 $ro 表(优化查询)
  • 两者协同工作,实现性能平衡

只配置一个的缺点

  • 只配置 Minor:文件数控制住了,但 $ro 表永远不更新,查询仍需内存合并
  • 只配置 Full:Full Compaction 间隔期间,文件数可能失控(依赖默认 Minor 配置)
  • 两者都配置:获得最佳性能平衡

Q4: Lookup Compaction 一定要配置吗?

:不一定,只在以下情况下需要:

  • 使用 changelog-producer = 'lookup'
  • 使用 merge-engine = 'first-row'
  • 使用 MOW 模式(deletion-vectors.enabled = true

Q5: 如何知道当前表有多少个文件?

方法 1:查询系统表

SELECT * FROM my_table$files;

方法 2:查看存储目录

# 查看某个 bucket 的文件
ls /Users/10281539/WorkSpace/warehouse/my_db.db/my_table/bucket-0/

方法 3:使用统计命令

CALL sys.query_statistics('database.table');

Q6: Compaction 会删除旧数据吗?

:不会直接删除,但会产生以下效果:

  1. 合并重复数据:根据 Merge Engine 合并同主键的多条记录
  2. 过期记录删除:如果配置了 record-level.expire-time,压缩时会删除过期记录
  3. 旧文件清理:压缩后旧文件成为历史快照的一部分,需要通过 expire_snapshots 清理

Q7: 如何强制执行一次全量压缩?

:有以下几种方式:

方式 1:使用 SQL 过程

CALL sys.compact('database.table');

方式 2:使用 Action Jar

./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///Users/10281539/WorkSpace/warehouse \
  --database my_db \
  --table my_table \
  --compact_strategy full

方式 3:临时修改表属性

ALTER TABLE my_table SET ('full-compaction.delta-commits' = '1');
-- 写入一条数据触发压缩
INSERT INTO my_table VALUES (1, 'test');
-- 改回原配置
ALTER TABLE my_table RESET ('full-compaction.delta-commits');

Q8: 使用 S3 存储时,为什么作业频繁失败并报文件冲突?

:这是 S3/OSS/OBS 等对象存储的 RENAME 操作非原子性导致的典型问题。

核心原因

  • S3 等对象存储的 RENAME 是"复制+删除",不是原子操作
  • 多作业并发压缩时会读取到中间状态的文件,导致冲突

解决方案

  • 多作业并发压缩:必须启用分布式锁(lock.enabled=true
  • 专用压缩作业(write-only模式):通常不需要启用锁

存储类型对比

存储类型 需要 lock.enabled 原因
HDFS/本地文件系统 ❌ 不需要 RENAME 是原子操作
S3/OSS/OBS/COS 等 多作业压缩时 ✅必须
专用压缩作业 ❓ 可选
RENAME 非原子

详细配置方案和生产案例,请参见 第四章 4.2 节:S3 等对象存储的重要注意事项

Q9: Minor 和 Full Compaction 怎么区分?

:Paimon 会根据配置和文件状态自动决定

Minor Compaction(默认)

  • 触发方式:num-sorted-run.compaction-trigger = 5(文件数达到阈值)
  • 行为:Paimon 使用 Universal Compaction 算法,选择性合并部分文件
  • 结果:文件数减少,但不会合并到 1 个

Full Compaction(需显式配置)

  • 触发方式 1:full-compaction.delta-commits = 10(每 10 次提交)
  • 触发方式 2:compaction.optimization-interval = '1 h'(每小时)
  • 触发方式 3:手动执行 CALL sys.compact()
  • 行为:合并所有文件
  • 结果:理想情况下合并为 1 个大文件

两者可以共存

CREATE TABLE hybrid_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'num-sorted-run.compaction-trigger' = '5',     -- Minor 持续执行
    'compaction.optimization-interval' = '2 h'     -- Full 每2小时执行
);

如何判断当前使用的是哪种?

  • 观察文件数变化:Minor 减少但不会到 1,Full 通常减少到 1-2 个
  • 查看 Flink 日志:Compaction task triggered: type=MINOR/FULL
  • 查询系统表:SELECT * FROM my_table$files

详细说明:参见第二章:Compaction 类型详解

Q10: 为什么 Minor Compaction 不更新 $ro 表?

:因为 $ro(Read-Optimized)表的设计目标是提供零合并成本的读优化性能。

核心原因

  1. Universal Compaction 的多层结构

    • Minor Compaction 后,数据仍分散在多个层级(L0、L1、L2...)
    • 查询时需要读取并合并多层文件
    • 不符合 "Read-Optimized" 的定义
  2. Full Compaction 的单层结构

    • 将所有数据合并到单层连续文件
    • 查询时直接读取,无需合并
    • 真正实现零合并成本

配置 $ro 表更新频率

CREATE TABLE my_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'compaction.optimization-interval' = '1 h'  -- 每小时更新 $ro 表
);

权衡

  • 频繁 Full Compaction → $ro 表实时性好,但写入性能下降
  • 不频繁 Full Compaction → 写入性能好,但 $ro 表数据延迟

详细说明:参见第三章:Read-Optimized 表详解

Q11: Append-Only 表会有 Compaction 吗?

会有,且默认启用,但与主键表的 Compaction 完全不同。

核心区别

  • 主键表 Compaction:合并数据(数据去重、版本合并)
  • Append-Only 表 Compaction:合并文件(文件拼接,无数据合并)

触发条件

-- 满足以下任一条件即触发
条件1: (总文件大小 >= target-file-size) AND (文件数 >= compaction.min.file-num)
条件2: 文件数 >= compaction.max.file-num

禁用方式

CREATE TABLE logs (...) TBLPROPERTIES (
    'write-only' = 'true'  -- 禁用 compaction
);

推荐配置

  • 默认启用 compactionwrite-only = false):减少小文件,提高查询性能
  • ⚠️ 禁用 compactionwrite-only = true):仅适用于极高吞吐 + 几乎不查询的场景

详细说明:参见第二章 2.4 节:Append-Only 表 Compaction

Q12: 专用压缩作业 + Hive Metastore,需要启用 lock.enabled 吗?

:取决于具体场景,大多数情况下不需要

重要说明

  • Paimon 0.8/0.9 默认值lock.enabled 默认为 false
  • Hive Metastore 不会自动启用锁:仅配置 metastore=hive 不会自动启用锁机制 ✅
  • 锁必须显式配置:如果遇到锁超时问题,说明某处配置中显式启用了锁

不需要启用锁的场景(推荐)

  • ✅ 只有一个专用压缩作业运行
  • ✅ 所有写入作业都设置了 write-only = true
  • ✅ 使用 Hive Metastore 仅用于元数据管理

配置示例

# 启动专用压缩作业(禁用锁)
./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse hdfs://namenode:8020/warehouse \
  --database my_db \
  --mode combined \
  --catalog_conf metastore=hive \
  --catalog_conf uri=thrift://hive-metastore:9083 \
  --catalog_conf lock.enabled=false  # 禁用锁

为什么可以禁用?

  • 架构本身已经保证了不会有并发压缩(只有一个压缩作业)
  • 启用锁反而可能引入 Hive Metastore 的稳定性问题

需要启用锁的场景

  • ⚠️ 多个作业都会执行压缩(未使用 write-only)
  • ⚠️ 使用 S3 等对象存储(RENAME 非原子性)
  • ⚠️ 存在并发写入同一分区的可能

真实生产教训
某生产环境在启用 Hive 锁后,运行几天后遇到锁超时问题,导致 L0 文件膨胀到 90+ 万个,最终通过禁用锁(lock.enabled=false)解决。

详细案例:参见第九章问题5:Hive 锁超时导致 L0 文件疯狂膨胀

配置建议

架构模式 lock.enabled 理由
专用压缩 + write-only false 无并发冲突,无需锁
多写入者 + 每个都压缩 true 有并发冲突,必须用锁
S3 + 专用压缩 ❓ 视情况 单压缩作业可不启用
HDFS + 专用压缩 ❌ false RENAME 原子性 + 单压缩作业

九、版本差异对比(0.8 vs 1.3)

9.1 关于 delete-file.thread-num 参数

你提到的 delete-file.thread-num=100 参数,经过详细搜索 在 Paimon 0.8 和 1.3 的官方文档中均未找到

可能的情况

  • 内部参数,仅存在于源码中,未在官方文档公开
  • 特定发行版或 fork 版本添加的自定义参数
  • 0.8 早期版本存在,后续版本已移除
  • 已重命名为其他参数

在 1.3 版本中,控制文件删除并发度的推荐配置

CREATE TABLE my_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'file-operation.thread-num' = '100',           -- 文件操作线程数(包括删除)
    'snapshot.expire.limit' = '50',                -- 每次过期快照数量
    'snapshot.expire.execution-mode' = 'async'     -- 异步删除文件(1.3 新增)
);

迁移建议(从 0.8 到 1.3)

-- 如果 0.8 中使用了(假设存在)
'delete-file.thread-num' = '100'

-- 迁移到 1.3,等效配置
'file-operation.thread-num' = '100'                -- 替代参数
'snapshot.expire.execution-mode' = 'async'         -- 异步删除,提升性能
'snapshot.expire.limit' = '50'                     -- 控制批次大小

9.2 Compaction 相关新增特性(1.3)

1. Lookup Compaction(全新功能)

0.8:不支持 Lookup Compaction

1.3:新增 Radical 和 Gentle 两种模式

-- 1.3 新增配置
CREATE TABLE lookup_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'changelog-producer' = 'lookup',
    'lookup-compact' = 'GENTLE',                -- ⭐ 新增
    'lookup-compact.max-interval' = '100',      -- ⭐ 新增
    'lookup-wait' = 'true'                      -- ⭐ 增强
);

2. Sort Compaction(全新功能)

0.8:不支持 Sort Compaction

1.3:支持 Order、Z-Order、Hilbert 三种排序策略

-- 1.3 新增配置
CREATE TABLE sort_table (
    id BIGINT,
    region STRING,
    amount DECIMAL(10,2)
) TBLPROPERTIES (
    'bucket' = '4',
    'sort-compaction' = 'true',                      -- ⭐ 新增
    'sort-compaction.zorder-by' = 'region,amount'    -- ⭐ 新增
);

3. 异步快照过期(全新功能)

0.8:快照过期和文件删除是同步的,可能阻塞写入

1.3:支持异步快照过期,避免阻塞

-- 0.8 配置
'snapshot.time-retained' = '1 h'
'snapshot.num-retained.min' = '10'

-- 1.3 增强配置
'snapshot.time-retained' = '1 h'
'snapshot.num-retained.min' = '10'
'snapshot.expire.execution-mode' = 'async'    -- ⭐ 新增
'snapshot.expire.limit' = '50'                -- ⭐ 新增

4. 专用压缩作业 Combined 模式(增强)

0.8:基本的专用压缩支持

1.3:新增 Combined 模式,单作业压缩整个数据库

# 1.3 新增 Combined 模式
./bin/flink run \
  /path/to/paimon-flink-action-1.3.1.jar compact \
  --warehouse file:///path/to/warehouse \
  --database my_db \
  --mode combined \          # ⭐ 新增:整库压缩
  --compact_strategy minor

5. 离峰时间压缩(可能是新增)

-- 1.3 支持离峰时间更激进的压缩
CREATE TABLE offpeak_table (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    data STRING
) TBLPROPERTIES (
    'compaction.offpeak.start.hour' = '22',   -- ⭐ 可能是新增
    'compaction.offpeak.end.hour' = '6',      -- ⭐ 可能是新增
    'compaction.offpeak-ratio' = '50'         -- ⭐ 可能是新增
);

9.3 性能提升对比

场景 0.8 基准 1.3 提升 原因
写入吞吐量 1x 1.2-1.5x MOW 模式 + Gentle Lookup
DELETE 性能 1x 2-5x Deletion Vectors 优化
范围查询 1x 1.5-3x Sort Compaction (Z-Order)
快照清理 1x 1.3-2x 异步快照过期
多写入者 ⚠️ 冲突风险 ✅ 稳定 专用压缩作业改进

9.4 关键参数变化对比

参数 0.8 1.3 说明
num-sorted-run.compaction-trigger ✅ 默认 5 ✅ 默认 5 无变化
num-sorted-run.stop-trigger ✅ 默认 trigger+3 ✅ 默认 trigger+3 无变化
lookup-compact ❌ 不支持 ✅ RADICAL/GENTLE 新增
sort-compaction ❌ 不支持 ✅ 支持 新增
snapshot.expire.execution-mode ❌ 不支持 ✅ sync/async 新增
file-operation.thread-num ❓ 不确定 ✅ 支持 替代 delete-file.thread-num
write-only ❓ 不确定 ✅ 明确支持 增强
deletion-vectors.enabled ⚠️ 实验性 ✅ 稳定 增强

9.5 升级建议

适合升级到 1.3 的场景

场景 理由
MOW 模式表 1.3 中 MOW 更稳定,性能提升明显
多写入者 专用压缩作业支持更好,冲突问题改善
写入密集型 Gentle Lookup Compaction 降低资源消耗
大表范围查询 Sort Compaction 显著提升查询性能
快照清理慢 异步快照过期避免阻塞

9.6 版本选择建议

项目类型 推荐版本 理由
新项目 1.3 功能更丰富,性能更好,MOW 模式稳定
生产环境 1.3 更稳定,Lookup/Sort Compaction 成熟
0.8 升级 评估后升级 性能提升 20-50%,需充分测试
实验性功能 1.3+/Master 最新特性,需注意稳定性

十、总结与建议

10.1 核心要点总结

  1. Compaction 机制因表类型而异
    • 主键表:基于 LSM Tree,合并数据(去重、版本合并)
    • Append-Only 表:基于文件数量,合并文件(文件拼接,保持追加顺序)
  2. 主键表 Compaction 策略
    • Minor Compaction 适合写密集型场景,Full Compaction 适合读密集型场景
    • MOW 模式 + Gentle Lookup Compaction 是大多数场景的推荐配置(1.3 稳定支持)
  3. Append-Only 表 Compaction 推荐
    • ✅ 默认启用 compaction,减少小文件,提高查询性能
    • ⚠️ 仅在极高吞吐 + 几乎不查询的场景下禁用
  4. 多写入者必须使用专用压缩作业,避免冲突
  5. 异步压缩策略可以最大化写入性能,但需监控文件数
  6. Sort Compaction 对大表的范围查询有显著优化效果(1.3 新增)
  7. 从 0.8 升级到 1.3 可获得显著性能提升,特别是 MOW 和多写入者场景

10.2 配置建议速查表

主键表

场景 推荐配置
通用推荐(1.3) MOW 模式 + Gentle Lookup + 默认 Trigger 值 + 异步快照过期
写密集型 stop-trigger=MAX + sort-spill=10 + lookup-wait=false
读密集型 COW 模式 +full-compaction.delta-commits=1
多写入者 write-only=true + 专用压缩作业(Combined 模式)
大表优化 Sort Compaction (Z-Order) + 定期优化
资源受限 Gentle Lookup + 离峰时间压缩

Append-Only 表

场景 推荐配置
日志收集/事件流 bucket=-1 + write-only=false + compaction.min.file-num=10 + target-file-size=256MB
极高吞吐写入 bucket=-1 + write-only=true(禁用 compaction)
需要查询历史数据 启用 compaction + 合理的 target-file-size
HDFS NameNode 压力大 启用 compaction + 较大的 target-file-size

10.3 运维建议

  1. 监控文件数:定期检查 Sorted Run 数量,避免失控

    • L0 文件数量告警阈值:1000(警告)、10000(严重)、100000(危险)
    • 建议每小时检查一次
  2. 定期全量压缩:对于重要表,建议每天或每周执行一次全量压缩

  3. 快照清理:配合 expire_snapshots 清理历史文件,释放存储空间

  4. 测试验证:修改压缩配置后,务必在测试环境验证性能影响

  5. 分区策略:合理设计分区,避免单分区过大导致压缩耗时过长

  6. 版本升级:评估从 0.8 升级到 1.3 的收益,制定灰度升级计划

  7. ⭐ 锁配置建议(重要)

    • 专用压缩作业场景:不启用 Hive 锁(lock.enabled=false),避免锁超时问题
    • 并发压缩场景:必须启用锁(lock.enabled=true),防止文件冲突
    • 监控 Compaction 作业:确保压缩作业持续正常执行,避免静默失败
  8. Hive Metastore 维护

    • 定期检查 Metastore 性能和稳定性
    • 必要时清理过期的锁记录
    • 监控 Metastore 连接数和响应时间

10.4 进一步学习

  • 官方文档:https://paimon.apache.org/docs/1.3/primary-key-table/compaction/
  • 专用压缩作业:https://paimon.apache.org/docs/1.3/maintenance/dedicated-compaction/
  • 配置参数大全:https://paimon.apache.org/docs/1.3/maintenance/configurations/
  • 性能调优指南:https://paimon.apache.org/docs/1.3/performance/overview/
  • 0.8 文档(对比参考):https://paimon.apache.org/docs/0.8/
Watermark与allowedLateness详解 2025-12-30
Apache Paimon Partial-Update 2025-12-12

评论区