Flink Source耗尽判定机制详解
一、核心原理
Flink通过监控SourceFunction.run()方法是否返回来判定Source是否耗尽。
SourceFunction接口契约
public interface SourceFunction<T> extends Function, Serializable {
/**
* 数据生成的主方法
*
* 契约规则:
* 1. 有界源:当所有数据发送完毕后,run()方法返回
* 2. 无界源:run()方法进入无限循环,永不返回(除非cancel()被调用)
*/
void run(SourceContext<T> ctx) throws Exception;
/**
* 取消数据源的方法
* 实现应该设置标志位,让run()方法尽快退出
*/
void cancel();
}
二、Flink内部执行流程
1. Source Task启动
flowchart TD
A[TaskManager接收到Source Task] --> B[创建SourceStreamTask实例]
B --> C[调用StreamSource.run<br/>Object lock, StreamStatusMaintainer]
C --> D[执行用户定义的<br/>SourceFunction.run SourceContext ctx]
D --> E[关键点<br/>Task线程阻塞在这里<br/>直到run 返回]
style E fill:#ffeb3b,stroke:#f57c00,stroke-width:3px
2. Flink内部伪代码
// Flink内部的StreamSource实现(简化版)
public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>> {
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
// 调用用户的SourceFunction
userFunction.run(new SourceContext<OUT>() {
@Override
public void collect(OUT element) {
output.collect(element); // 发送数据到下游
}
// ... 其他方法
});
// ✅ 关键点:当run()返回时,Flink执行以下逻辑
// 1. 标记此Source已完成
markSourceAsFinished();
// 2. 向下游发送"结束标记"(EndOfPartitionEvent)
sendEndOfPartitionEvent();
// 3. 如果所有Source都完成,触发最终的Checkpoint
// 4. 等待所有算子处理完毕
// 5. 任务优雅退出
}
}
3. 多并行度Source的协调
如果Source有多个并行度,Flink需要等待所有并行实例都耗尽:
场景1:部分实例完成
graph TD
A[Source 并行度=3] --> B[Source实例1: run 返回 ✅]
A --> C[Source实例2: run 返回 ✅]
A --> D[Source实例3: 仍在运行... ⏳]
B --> E[Flink判定]
C --> E
D --> E
E --> F[Source未完全耗尽<br/>任务继续运行]
style D fill:#fff9c4,stroke:#f57c00
style F fill:#ffcdd2,stroke:#c62828
场景2:所有实例完成
flowchart TD
A[Source 并行度=3] --> B[Source实例1: run 返回 ✅]
A --> C[Source实例2: run 返回 ✅]
A --> D[Source实例3: run 返回 ✅]
B --> E[Flink判定]
C --> E
D --> E
E --> F[所有Source都耗尽]
F --> G[向下游发送结束标记]
G --> H[触发窗口最终计算]
H --> I[任务进入关闭流程]
style F fill:#c8e6c9,stroke:#388e3c
style I fill:#c8e6c9,stroke:#388e3c
三、实际代码示例
示例1:有界源(会耗尽)
public class BoundedSource implements SourceFunction<Event> {
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Event[] events = {
new Event("user1", "click", 1000L),
new Event("user2", "view", 2000L)
};
for (Event event : events) {
ctx.collect(event);
}
// ✅ 方法返回 → Flink认为Source耗尽
System.out.println("有界源:数据发送完毕,run()即将返回");
}
@Override
public void cancel() {
// 有界源通常不需要实现
}
}
执行流程:
flowchart LR
A[Task线程执行run ] --> B[发送2条数据]
B --> C[for循环结束]
C --> D[run 返回]
D --> E[Flink检测到返回<br/>标记Source为已耗尽]
E --> F[向下游发送<br/>EndOfPartitionEvent]
F --> G[下游算子处理完<br/>所有数据后结束]
G --> H[任务退出]
style E fill:#c8e6c9,stroke:#388e3c
style H fill:#c8e6c9,stroke:#388e3c
示例2:无界源(永不耗尽)
public class UnboundedSource implements SourceFunction<Event> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// ✅ 无限循环,永不返回
while (isRunning) {
Event event = generateEvent();
ctx.collect(event);
Thread.sleep(1000);
}
// ⚠️ 只有cancel()被调用,isRunning=false,才会执行到这里
System.out.println("无界源:收到取消信号,run()即将返回");
}
@Override
public void cancel() {
System.out.println("无界源:cancel()被调用");
isRunning = false; // 让while循环退出
}
}
正常运行流程:
flowchart LR
A[Task线程执行run ] --> B[进入while isRunning 循环]
B --> C[持续生成和发送数据]
C --> D[run 永不返回]
D --> E[Flink认为Source仍在运行]
E --> F[任务永不自动结束]
style D fill:#fff9c4,stroke:#f57c00
style F fill:#fff9c4,stroke:#f57c00
取消流程:
flowchart TD
A[用户触发取消<br/>Web UI / CLI] --> B[Flink调用source.cancel ]
B --> C[isRunning = false]
C --> D[while循环退出]
D --> E[run 返回]
E --> F[Flink标记Source已完成]
F --> G[任务进入关闭流程]
style A fill:#ffcdd2,stroke:#c62828
style G fill:#c8e6c9,stroke:#388e3c
四、Source耗尽后的连锁反应
完整的任务结束流程
flowchart TD
A[Source: run 返回] --> B[Flink内部标记<br/>sourceExhausted = true]
B --> C[向下游Operator发送<br/>EndOfPartitionEvent]
C --> D[下游Operator收到EndOfPartitionEvent]
D --> E1[窗口Operator<br/>触发所有未触发的窗口]
D --> E2[Process算子<br/>调用close 方法]
D --> E3[Sink算子<br/>刷新缓冲区关闭外部连接]
E1 --> F[所有Operator处理完毕]
E2 --> F
E3 --> F
F --> G[触发最终Checkpoint<br/>如果配置了]
G --> H[清理状态]
H --> I[TaskManager向JobManager<br/>报告Task完成]
I --> J{JobManager检查<br/>所有Task都完成了吗}
J -->|是| K[Job完成退出]
J -->|否| L[继续等待]
style A fill:#ffeb3b,stroke:#f57c00,stroke-width:2px
style K fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style L fill:#fff9c4,stroke:#f57c00
代码追踪(Flink 1.18源码)
// org.apache.flink.streaming.runtime.tasks.SourceStreamTask
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>>
extends StreamTask<OUT, SRC> {
@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
// ✅ 当headOperator.run()返回时,执行到这里
// 说明Source已经耗尽所有数据
// 等待所有数据被下游处理完
synchronized (getCheckpointLock()) {
// 发送最终的水位线(如果有)
// 关闭输出
operatorChain.flushOutputs();
}
}
}
// org.apache.flink.streaming.api.operators.StreamSource
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
try {
// 调用用户定义的SourceFunction
userFunction.run(new SourceContextImpl());
// ✅ run()返回到这里
} finally {
// 标记输出已结束
if (isStopped) {
output.emitWatermark(Watermark.MAX_WATERMARK);
}
}
}
五、如何观察Source是否耗尽
方法1:通过日志
public class LoggingSource implements SourceFunction<Event> {
@Override
public void run(SourceContext<Event> ctx) throws Exception {
System.out.println("[Source] 开始运行");
for (int i = 0; i < 10; i++) {
ctx.collect(new Event("user", "click", i));
System.out.println("[Source] 发送数据: " + i);
}
System.out.println("[Source] 所有数据发送完毕,run()即将返回");
// run()返回后,Flink会继续执行StreamSource的finally块
}
@Override
public void cancel() {
System.out.println("[Source] cancel()被调用");
}
}
日志输出:
sequenceDiagram
participant Source
participant Flink
participant Downstream as 下游算子
participant Job
Source->>Source: 开始运行
loop 发送数据
Source->>Source: 发送数据 0-9
end
Source->>Source: 所有数据发送完毕<br/>run()即将返回
Source->>Flink: run()返回
Flink->>Flink: 检测到Source耗尽
Flink->>Downstream: 发送EndOfPartitionEvent
Downstream->>Downstream: 收到结束信号<br/>触发最终计算
Downstream->>Job: 处理完成
Job->>Job: 所有任务完成,退出
方法2:通过Web UI观察
访问 http://localhost:8081 (本地运行):
Job详情页面
└─ Source算子
├─ 状态:FINISHED ✅ (run()已返回)
└─ 状态:RUNNING ⏰ (run()仍在执行)
方法3:通过Metrics
public class MetricsSource implements SourceFunction<Event> {
private transient Counter recordCounter;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 获取Metrics
RuntimeContext runtimeContext = getRuntimeContext();
recordCounter = runtimeContext.getMetricGroup().counter("numRecords");
for (int i = 0; i < 100; i++) {
ctx.collect(new Event("user", "click", i));
recordCounter.inc();
}
// run()返回时,numRecords停止增长
}
}
六、常见问题
Q1: 为什么有界源需要waitAfterFinish?
问题场景:
// OutOfOrderEventSource
public void run(SourceContext<Event> ctx) throws Exception {
for (Event event : events) {
ctx.collect(event);
Thread.sleep(500);
}
// run()立即返回
}
问题:
- 最后一条数据发送后,run()立即返回
- Flink开始关闭流程
- 但ProcessingTime窗口可能还没触发(需要等系统时钟)
- 任务可能在窗口触发前就退出了
解决方案:
public void run(SourceContext<Event> ctx) throws Exception {
for (Event event : events) {
ctx.collect(event);
}
// ✅ 等待一段时间,让窗口有机会触发
Thread.sleep(waitAfterFinish);
// 然后才返回
}
Q2: cancel()和run()返回有什么区别?
| 场景 | run()返回 | cancel()调用 |
|---|---|---|
| 触发条件 | 数据自然耗尽 | 用户主动取消 |
| 是否正常 | ✅ 正常结束 | ⚠️ 被中断 |
| 数据完整性 | 所有数据已发送 | 可能有数据未发送 |
| 下游处理 | 正常处理完所有数据 | 可能有数据丢失 |
Q3: 如何实现"发送N条数据后自动结束"的Source?
public class LimitedSource implements SourceFunction<Event> {
private final int maxRecords;
private volatile boolean isRunning = true;
public LimitedSource(int maxRecords) {
this.maxRecords = maxRecords;
}
@Override
public void run(SourceContext<Event> ctx) throws Exception {
int count = 0;
// 结合有界和无界的特点
while (isRunning && count < maxRecords) {
ctx.collect(generateEvent());
count++;
}
System.out.println("发送了 " + count + " 条数据,Source耗尽");
// run()返回 → Flink认为Source耗尽
}
@Override
public void cancel() {
isRunning = false;
}
}
七、总结
Source耗尽的本质
SourceFunction.run()方法返回
=
Flink认为该Source已耗尽所有数据
判定规则
- 有界源:run()方法返回 → Source耗尽
- 无界源:run()方法永不返回 → Source永不耗尽
- 多并行度:所有并行实例的run()都返回 → Source耗尽
关键点
- ✅ 有界源要确保run()返回
- ✅ 无界源要确保run()永不返回(除非cancel)
- ✅ ProcessingTime窗口需要额外时间触发
- ✅ EventTime窗口依赖Watermark,不受Source耗尽直接影响
实践建议
| 场景 | 建议 |
|---|---|
| 测试/演示 | 使用有界源,添加waitAfterFinish |
| 生产环境 | 使用无界源(Kafka/Socket) |
| 调试 | 添加日志,观察run()返回时机 |
| 单元测试 | 使用env.fromElements()(自动耗尽) |