Watermark与allowedLateness详解
概述
在Flink流处理中,处理乱序数据需要两层机制:
- Watermark:决定何时触发窗口计算
- allowedLateness:决定窗口触发后是否继续接受迟到数据
一、两者的关系
它们是两层不同的乱序容忍机制:
1. Watermark(第一层防护)
- 作用时机:数据进入窗口前
- 容忍度:
forBoundedOutOfOrderness(2秒) - 判定逻辑:数据是否能进入窗口
- 公式:
Watermark = 当前最大事件时间 - 容忍时间
2. allowedLateness(第二层防护)
- 作用时机:窗口触发后
- 容忍度:例如
allowedLateness(Time.seconds(5)) - 判定逻辑:窗口关闭后,是否还接受迟到数据并重新计算
- 窗口真正关闭时间:
窗口结束时间 + allowedLateness
二、完整的数据判定流程
以窗口[0-5000)为例(Watermark容忍2秒,allowedLateness=5秒):
情况1️⃣:正常数据(在Watermark容忍内)
事件时间2000ms,在7000ms到达
→ Watermark = 7000 - 2000 = 5000ms
→ 2000 < 5000,进入窗口[0-5000)
→ ✅ 正常处理
情况2️⃣:迟到数据(超出Watermark但在allowedLateness内)
事件时间3000ms,在12000ms到达
→ Watermark已经是10000ms,窗口[0-5000)已触发
→ 但窗口真正关闭时间 = 5000 + 5000 = 10000ms
→ 当前Watermark(10000) <= 关闭时间(10000),数据仍被接受
→ ✅ 窗口重新计算并再次输出结果
情况3️⃣:真正丢弃的数据(超出Watermark + allowedLateness)
事件时间4000ms,在15000ms到达
→ 窗口真正关闭时间 = 10000ms < 当前Watermark(15000-2000=13000)
→ ❌ 数据进入late data侧输出流
三、为什么需要allowedLateness?
问题场景
假设只有Watermark(容忍2秒乱序):
时间线:
1000ms → 3000ms → 7000ms (Watermark=5000) → 窗口[0-5000)触发并关闭
此时2000ms的数据迟到了(延迟很久才到达)
→ 虽然2000属于[0-5000)窗口
→ 但窗口已经关闭了
→ 数据只能丢弃到late data
矛盾点:
- 如果增大Watermark容忍度(比如10秒),能减少数据丢失
- 但会导致窗口触发延迟增加(要等更久才能看到结果)
allowedLateness的解决方案
核心思想:窗口触发 ≠ 窗口关闭
窗口生命周期:
1. Watermark到达窗口结束时间 → 首次触发,输出第一版结果
2. 窗口继续保持打开 allowedLateness 时间
3. 这期间到达的迟到数据 → 窗口重新计算,输出更新后的结果
4. 超过allowedLateness → 窗口真正关闭,late data进入侧输出流
四、实际例子对比
场景:实时大屏统计UV
方案A:只用Watermark(容忍10秒)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
)
缺点:
- 窗口[0-5000)要等到Watermark=15000才触发
- 用户要等15秒才能看到第一个5秒窗口的结果
- 延迟太高!
方案B:Watermark(2秒) + allowedLateness(10秒)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
)
.allowedLateness(Time.seconds(10))
优点:
- 7000ms时就触发窗口,用户快速看到初步结果(延迟2秒)
- 后续7-17秒内到达的迟到数据会触发窗口更新
- 用户看到的是逐步修正的结果,而非漫长等待
权衡:
- ✅ 低延迟:首次结果快速返回
- ✅ 高准确性:允许后续修正
- ⚠️ 代价:同一窗口可能输出多次(需要下游处理更新)
五、两者的职责分工
| 特性 | Watermark | allowedLateness |
|---|---|---|
| 主要职责 | 触发窗口计算 | 延长窗口生命周期 |
| 优化目标 | 平衡乱序与延迟 | 允许结果修正 |
| 影响 | 首次结果延迟 | 窗口关闭时间 |
| 适用场景 | 大部分乱序容忍 | 处理极端迟到数据 |
| 数据处理 | 决定何时触发 | 决定是否重算 |
| 输出特点 | 触发首次输出 | 可能触发多次更新 |
六、典型使用场景
场景1:实时监控告警
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
)
.allowedLateness(Time.seconds(10))
设计思路:
- Watermark控制快速告警(容忍5秒乱序)
- allowedLateness给10秒缓冲,避免漏告警
- 用户先看到初步告警,10秒内可能有修正
场景2:实时报表
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(1))
)
.allowedLateness(Time.minutes(5))
设计思路:
- 窗口结束后1分钟立即出报表
- 5分钟内允许补录数据并更新报表
- 用户看到的是"准实时修正"的结果
场景3:严格准确性场景(金融结算)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(30))
)
.allowedLateness(Time.hours(2))
设计思路:
- 每天零点后30分钟触发结算
- 2小时内允许补账
- 之后完全关闭窗口,保证数据一致性
七、窗口状态管理
窗口的三个关键时间点
窗口[0-5000)的生命周期(Watermark=2s, allowedLateness=5s):
0ms ─────────── 5000ms ────────── 10000ms ───────── 13000ms
窗口范围 ↑ 首次触发 ↑ 真正关闭 ↑ 迟到数据被丢弃
(WM>=5000) (WM>=10000)
首次触发时间 = 窗口结束时间 (当Watermark >= 5000ms)
窗口保持打开 = 窗口结束时间 ~ 窗口结束时间 + allowedLateness
真正关闭时间 = 窗口结束时间 + allowedLateness (10000ms)
窗口触发次数
// 窗口可能触发多次
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<Event> elements, Collector<String> out) {
// 每次有迟到数据到达,这个函数都会被调用
// 下游需要处理重复/更新的结果
}
});
八、最佳实践
1. 如何选择Watermark容忍度?
原则:覆盖95-99%的正常乱序情况
监控指标:
- P95延迟:大部分数据的延迟分布
- P99延迟:包含少量异常但可接受的延迟
示例:
如果P95延迟是3秒,设置 forBoundedOutOfOrderness(Duration.ofSeconds(3))
2. 如何选择allowedLateness?
原则:根据业务对"迟到修正"的容忍度
考虑因素:
- 下游系统能否处理更新?
- 业务能否接受结果被修正?
- 状态存储压力(窗口保持打开会占用资源)
示例:
- 实时大屏:允许5-10秒修正
- 告警系统:允许10-30秒修正
- 离线报表:允许小时级修正
3. 处理侧输出流(Late Data)
// 定义侧输出标签
final OutputTag<Event> lateDataOutputTag = new OutputTag<Event>("late-data"){};
// 配置窗口
SingleOutputStreamOperator<String> result = stream
.window(...)
.allowedLateness(Time.seconds(10))
.sideOutputLateData(lateDataOutputTag) // 捕获真正的late data
.process(...);
// 处理late data
DataStream<Event> lateData = result.getSideOutput(lateDataOutputTag);
// 方案1:记录到日志/监控
lateData.map(event -> "Late: " + event).print();
// 方案2:写入离线存储,后续人工处理
lateData.addSink(new LateDataSink());
// 方案3:发送告警
lateData.map(event -> new Alert("数据严重延迟: " + event));
九、总结
为什么需要allowedLateness?
- 解耦延迟与准确性:Watermark控制延迟,allowedLateness控制准确性
- 支持渐进式结果:先快速返回,后逐步修正
- 处理长尾延迟:网络抖动、系统故障等导致的极端迟到
- 业务灵活性:不同业务对"多久算真正迟到"有不同定义
如果没有allowedLateness会怎样?
- 要么选择大Watermark(高延迟,用户体验差)
- 要么选择小Watermark(丢数据,准确性差)
- 无法同时满足"低延迟 + 高准确性"
核心公式
数据是否被处理 =
(事件时间在Watermark容忍内)
OR
(窗口触发后但在allowedLateness时间内)
窗口真正关闭时间 = 窗口结束时间 + allowedLateness
Late Data条件 = Watermark > (窗口结束时间 + allowedLateness)