Flink 底层原理系列(三):数据流模型
2021-01-21·10 分钟阅读
前言
Flink 的数据流模型是其核心抽象。理解 DataStream API 如何转换为底层执行图,以及 Operator Chain 的形成机制,对于优化作业性能至关重要。本章将从源码层面深入解析数据流模型。
技术亮点
| 技术点 | 难度 | 面试价值 | 本文覆盖 |
|---|---|---|---|
| StreamGraph 构建 | ⭐⭐⭐⭐⭐ | 高频考点 | ✅ |
| Transformation 转换 | ⭐⭐⭐⭐ | 进阶考点 | ✅ |
| Operator Chain | ⭐⭐⭐⭐⭐ | 高频考点 | ✅ |
| 数据交换模式 | ⭐⭐⭐⭐ | 进阶考点 | ✅ |
面试考点
- DataStream API 如何转换为执行图?
- Transformation 和 StreamNode 的关系是什么?
- Operator Chain 是如何形成的?有哪些条件?
- Forward 和 Hash 分区的底层实现有什么区别?
DataStream API 概述
核心 API 层级
┌─────────────────────────────────────────────────────────────────────────┐
│ Flink API 层级 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 应用层 │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ SQL / Table API(高级 API) │ │ │
│ │ │ 声明式、易于使用 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ DataStream API(核心 API) │ │ │
│ │ │ 流式处理、灵活控制 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ ProcessFunction(底层 API) │ │ │
│ │ │ 事件处理、状态管理、定时器 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
DataStream 类型
┌─────────────────────────────────────────────────────────────────────────┐
│ DataStream 类型 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ DataStream<T> │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ • 最基本的数据流类型 │ │ │
│ │ │ • 表示一个无界或有限的数据元素流 │ │ │
│ │ │ • 支持各种转换操作 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ KeyedStream<K, T> │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ • DataStream.keyBy() 后的结果 │ │ │
│ │ │ • 按 Key 分区的数据流 │ │ │
│ │ │ • 支持 Keyed State 和定时器 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ WindowedStream<K, T, W> │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ • KeyedStream.window() 后的结果 │ │ │
│ │ │ • 支持窗口聚合操作 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ConnectedStream<T1, T2> │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ • connect() 两个 DataStream 的结果 │ │ │
│ │ │ • 两个流可以共享状态 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
StreamGraph 构建源码分析
核心类关系
┌─────────────────────────────────────────────────────────────────────────┐
│ 核心类关系图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 用户代码 内部表示 │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ DataStream API │────────►│ Transformation │ │ │
│ │ │ .map() │ │ (逻辑算子树) │ │ │
│ │ │ .filter() │ └────────┬────────┘ │ │
│ │ │ .keyBy() │ │ │ │
│ │ └─────────────────┘ │ StreamGraphGenerator │ │
│ │ ▼ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ StreamGraph │ │ │
│ │ │ (执行图) │ │ │
│ │ │ │ │ │
│ │ │ StreamNode │ │ │
│ │ │ StreamEdge │ │ │
│ │ └─────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Transformation 类层次
// Transformation 是 DataStream API 的逻辑表示
// 位于 org.apache.flink.api.dag.Transformation
/**
* Transformation 抽象基类
* 每个 DataStream 操作都会创建一个 Transformation
*/
@Internal
public abstract class Transformation<T> {
// 唯一标识符
protected final int id;
// 输出类型信息
protected TypeInformation<T> outputType;
// 名称
protected String name;
// 并行度
protected Integer parallelism;
// 上游输入 Transformation
// 用于构建 DAG
}
// 具体实现类:
// 1. SourceTransformation - 数据源
public class SourceTransformation<OUT> extends Transformation<OUT> {
private final Source<OUT, Split, Enum> source;
private final WatermarkStrategy<OUT> watermarkStrategy;
}
// 2. OneInputTransformation - 单输入算子(map, filter 等)
public class OneInputTransformation<IN, OUT> extends Transformation<OUT> {
private final Transformation<IN> input; // 上游输入
private final StreamOperatorFactory<OUT> operatorFactory; // 算子工厂
}
// 3. TwoInputTransformation - 双输入算子(connect, join)
public class TwoInputTransformation<IN1, IN2, OUT> extends Transformation<OUT> {
private final Transformation<IN1> input1;
private final Transformation<IN2> input2;
private final StreamOperatorFactory<OUT> operatorFactory;
}
// 4. PartitionTransformation - 分区转换(keyBy, rebalance)
public class PartitionTransformation<T> extends Transformation<T> {
private final Transformation<T> input;
private final StreamPartitioner<T> partitioner; // 分区器
}
// 5. SinkTransformation - 数据输出
public class SinkTransformation<IN> extends Transformation<Void> {
private final Transformation<IN> input;
private final Sink<IN> sink;
}
StreamGraphGenerator 核心逻辑
// 位于 org.apache.flink.streaming.api.graph.StreamGraphGenerator
public class StreamGraphGenerator {
// Transformation 到 StreamGraph 的转换入口
public StreamGraph generate() {
// 创建空的 StreamGraph
streamGraph = new StreamGraph(
configuration,
executionConfig,
checkpointConfig);
// 递归处理所有 Transformation
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
return streamGraph;
}
// 核心:Transformation 转换为 StreamNode/StreamEdge
private Collection<Integer> transform(Transformation<?> transform) {
// 检查是否已处理(避免重复处理)
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 根据 Transformation 类型分发处理
Collection<Integer> transformedIds;
switch (transform.getTransformType()) {
case SOURCE:
transformedIds = transformSource((SourceTransformation<?>) transform);
break;
case ONE_INPUT:
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
break;
case TWO_INPUT:
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
break;
case PARTITION:
transformedIds = transformPartition((PartitionTransformation<?>) transform);
break;
case SINK:
transformedIds = transformSink((SinkTransformation<?>) transform);
break;
// ... 其他类型
}
return transformedIds;
}
// OneInputTransformation 转换示例
private <IN, OUT> Collection<Integer> transformOneInputTransform(
OneInputTransformation<IN, OUT> transform) {
// 1. 先递归处理上游输入
Collection<Integer> inputIds = transform(transform.getInput());
// 2. 创建 StreamNode
String operatorName = transform.getName();
StreamOperatorFactory<OUT> operatorFactory = transform.getOperatorFactory();
int vertexId = streamGraph.addOperator(
transform.getId(),
operatorName,
operatorFactory,
transform.getInputType(),
transform.getOutputType()
);
// 3. 创建 StreamEdge 连接上下游
for (Integer inputId : inputIds) {
streamGraph.addEdge(
inputId,
vertexId,
0, // 输出端口
0, // 输入端口
// 分区器、输出标签等
);
}
return Collections.singletonList(vertexId);
}
}
StreamGraph 核心数据结构
// 位于 org.apache.flink.streaming.api.graph.StreamGraph
public class StreamGraph implements Pipeline {
// StreamNode 集合:算子节点
private final Map<Integer, StreamNode> streamNodes;
// StreamEdge 集合:算子之间的边
private final Set<StreamEdge> streamEdges;
// Source 节点 ID
private final Set<Integer> sources;
// Sink 节点 ID
private final Set<Integer> sinks;
// 添加算子节点
public <IN, OUT> int addOperator(
int vertexId,
String operatorName,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo) {
StreamNode vertex = new StreamNode(
vertexId,
operatorName,
operatorFactory,
inTypeInfo,
outTypeInfo
);
streamNodes.put(vertexId, vertex);
return vertexId;
}
// 添加边
public void addEdge(
Integer upStreamVertexID,
Integer downStreamVertexID,
int outputIndex,
int inputIndex,
StreamPartitioner<?> partitioner,
OutputTag outputTag) {
StreamEdge edge = new StreamEdge(
getSourceVertex(upStreamVertexID),
getTargetVertex(downStreamVertexID),
outputIndex,
inputIndex,
partitioner,
outputTag
);
streamEdges.add(edge);
// 建立节点间的连接关系
streamNodes.get(upStreamVertexID).addOutEdge(edge);
streamNodes.get(downStreamVertexID).addInEdge(edge);
}
}
StreamNode 与 StreamEdge
// StreamNode:算子节点
public class StreamNode {
private final int id;
private final String operatorName;
private final StreamOperatorFactory<?> operatorFactory;
// 输入输出边
private List<StreamEdge> inEdges = new ArrayList<>();
private List<StreamEdge> outEdges = new ArrayList<>();
// 类型信息
private TypeInformation<?> inputTypeInfo;
private TypeInformation<?> outputTypeInfo;
// 资源配置
private ResourceSpec minResources;
private ResourceSpec preferredResources;
// Operator Chain 相关
private int outputIndex; // 在 Chain 中的输出位置
}
// StreamEdge:算子之间的连接
public class StreamEdge {
private final StreamNode sourceVertex; // 上游节点
private final StreamNode targetVertex; // 下游节点
// 分区器(决定数据如何分发)
private final StreamPartitioner<?> outputPartitioner;
// 输出标签(侧输出)
private final OutputTag outputTag;
// 端口索引
private final int outputIndex;
private final int inputIndex;
}
完整转换流程示例
┌─────────────────────────────────────────────────────────────────────────┐
│ DataStream API → StreamGraph 转换流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 用户代码: │
│ DataStream<String> stream = env.addSource(new MySource()) │
│ .map(new MyMap()) │
│ .filter(new MyFilter()) │
│ .keyBy(x -> x.getKey()) │
│ .sum("value") │
│ .addSink(new MySink()); │
│ │
│ 转换过程: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Step 1: 构建 Transformation 链 │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ SourceTransformation(id=1) │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ OneInputTransformation(id=2, Map) │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ OneInputTransformation(id=3, Filter) │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ PartitionTransformation(id=4, KeyBy) │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ OneInputTransformation(id=5, Sum) │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ SinkTransformation(id=6) │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Step 2: StreamGraphGenerator.transform() 递归转换 │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ StreamNode[1] ──► StreamNode[2] ──► StreamNode[3] │ │ │
│ │ │ (Source) (Map) (Filter) │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ └──────────────┴──────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ (HashPartition) │ │ │
│ │ │ StreamNode[4] ──► StreamNode[5] │ │ │
│ │ │ (KeyedProcess) (Sink) │ │ │
│ │ │ │ │ │
│ │ │ StreamEdge 携带分区器信息 │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Operator Chain 形成机制
Chain 条件判断
// 位于 org.apache.flink.streaming.api.graph.StreamGraph
public class StreamGraph {
// 判断两个算子是否可以 Chain
public boolean isChainable(StreamEdge edge) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
// 1. 下游算子的输入只有一个(一对一连接)
if (downStreamVertex.getInEdges().size() != 1) {
return false;
}
// 2. 分区器必须是 Forward(一对一,不重分区)
if (!(edge.getOutputPartitioner() instanceof ForwardPartitioner)) {
return false;
}
// 3. 上下游并行度必须相同
if (upStreamVertex.getParallelism() != downStreamVertex.getParallelism()) {
return false;
}
// 4. 下游算子没有显式禁用 Chain
if (downStreamVertex.isDisableChain()) {
return false;
}
// 5. 上游算子允许链接到下游
if (!upStreamVertex.getOperatorFactory().isOutputTypeBlocking()) {
// 检查类型兼容性
}
// 6. Slot Sharing Group 相同
if (!upStreamVertex.getSlotSharingGroup().equals(
downStreamVertex.getSlotSharingGroup())) {
return false;
}
return true;
}
}
Chain 形成过程
// 位于 org.apache.flink.streaming.api.graph.StreamGraph
public class StreamGraph {
// 构建 Operator Chain
public void setOperatorChain() {
// 收集所有可以 Chain 的边
Map<Integer, List<StreamEdge>> chains = new HashMap<>();
for (StreamEdge edge : streamEdges) {
if (isChainable(edge)) {
int startNodeId = edge.getSourceVertex().getId();
chains.computeIfAbsent(startNodeId, k -> new ArrayList<>())
.add(edge);
}
}
// 构建 Chain
for (StreamNode node : streamNodes.values()) {
if (chains.containsKey(node.getId())) {
// 设置 Chain 头
node.setChainStart(node.getId());
// 设置 Chain 尾
for (StreamEdge edge : chains.get(node.getId())) {
StreamNode targetNode = edge.getTargetVertex();
targetNode.setChainStart(node.getId());
}
}
}
}
}
Chain 数据结构
// OperatorChain 类
public class OperatorChain<IN, OUT> implements StreamOperator<OUT> {
// Chain 中的所有算子
private final StreamOperator<?>[] allOperators;
// 输出收集器(Chain 内部传递)
private final Output<StreamRecord<OUT>> chainEntryPoint;
// Chain 输出(发送到下游)
private final Output<StreamRecord<?>> chainEndOutput;
// 初始化 Chain
public OperatorChain(
StreamTask<OUT, ?> containingTask,
StreamOperatorFactory<OUT> operatorFactory,
MailboxExecutor mailboxExecutor) {
// 构建算子链
// 从 Chain 头开始,依次创建算子
// 每个算子的输出连接到下一个算子的输入
// 最后一个算子的输出连接到 chainEndOutput
}
// Chain 内部数据传递
// 避免了序列化和网络传输
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
private final Input<T> input; // 下游算子的输入
private final Counter numRecordsIn;
@Override
public void collect(StreamRecord<T> record) {
// 直接调用下游算子的 processElement
input.processElement(record);
}
}
}
Chain 示意图
┌─────────────────────────────────────────────────────────────────────────┐
│ Operator Chain 形成 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 原 StreamGraph: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Source(2) ──Forward──► Map(2) ──Forward──► Filter(2) │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ [Node 1] [Node 2] [Node 3] │ │
│ │ │ │ │
│ │ HashPartition │ │
│ │ ▼ │ │
│ │ KeyBy(2) ──► Sink(2) │ │
│ │ [Node 4] [Node 5] │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ Chain 优化 │
│ 优化后的 JobGraph: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ JobVertex 1 JobVertex 2 │ │
│ │ ┌──────────────────────┐ ┌──────────────────────┐ │ │
│ │ │ Operator Chain: │ │ Operator Chain: │ │ │
│ │ │ Source → Map → Filter│ │ KeyBy → Sink │ │ │
│ │ │ (parallelism=2) │ │ (parallelism=2) │ │ │
│ │ │ │ │ │ │ │
│ │ │ 内部直接方法调用 │ │ Hash分区后写入 │ │ │
│ │ │ 无序列化/网络传输 │ │ 需要网络传输 │ │ │
│ │ └──────────────────────┘ └──────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ Chain 条件分析: │
│ • Source → Map:Forward + 并行度相同 = 可 Chain ✓ │
│ • Map → Filter:Forward + 并行度相同 = 可 Chain ✓ │
│ • Filter → KeyBy:Hash 分区 = 不可 Chain ✗ │
│ • KeyBy → Sink:Forward + 并行度相同 = 可 Chain ✓ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Chain 的性能优势
┌─────────────────────────────────────────────────────────────────────────┐
│ Chain 性能优势 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 无 Chain 模式: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌────────┐ 序列化 ┌────────┐ 序列化 ┌────────┐ │ │
│ │ │Source │ ──────────► │ Map │ ──────────► │ Filter │ │ │
│ │ └────────┘ 网络传输 └────────┘ 网络传输 └────────┘ │ │
│ │ │ │
│ │ 开销: │ │
│ │ • 数据序列化/反序列化 │ │
│ │ • 网络传输延迟 │ │
│ │ • 线程切换开销 │ │
│ │ • 缓冲区分配 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 有 Chain 模式: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Operator Chain │ │ │
│ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │
│ │ │ │Source │ ──────► │ Map │ ──────► │ Filter │ │ │ │
│ │ │ └────────┘ 直接调用 └────────┘ 直接调用 └────────┘ │ │ │
│ │ │ │ │ │
│ │ │ 同一线程,无序列化,无网络 │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 优势: │ │
│ │ • 无序列化/反序列化 ✓ │ │
│ │ • 无网络传输 ✓ │ │
│ │ • 无线程切换 ✓ │ │
│ │ • 内存访问友好 ✓ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
StreamOperator 生命周期
StreamOperator 接口
// 位于 org.apache.flink.streaming.api.operators.StreamOperator
public interface StreamOperator<OUT> {
// 初始化(Task 启动时调用)
void open() throws Exception;
// 关闭(Task 结束时调用)
void close() throws Exception;
// 生命周期事件处理
void finish() throws Exception;
// 保存算子状态
OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception;
// 初始化算子状态
void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception;
// 设置度量组
void setMetricGroup(MetricGroup metricGroup);
// 设置 KeyContext(用于 KeyedStream)
void setKeyContextElement1(StreamRecord<?> record) throws Exception;
}
AbstractStreamOperator 基类
// 位于 org.apache.flink.streaming.api.operators.AbstractStreamOperator
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Serializable {
// 运行时上下文
protected StreamingRuntimeContext runtimeContext;
// 状态后端
protected KeyedStateBackend<?> keyedStateBackend;
protected OperatorStateBackend operatorStateBackend;
// 度量组
protected MetricGroup metrics;
// 输出收集器
protected Output<StreamRecord<OUT>> output;
// 初始化
@Override
public void open() throws Exception {
// 子类可以覆盖此方法进行初始化
}
// 关闭
@Override
public void close() throws Exception {
// 清理资源
}
// 状态快照
@Override
public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = keyedStateBackend != null
? keyedStateBackend.getKeyGroupRange()
: null;
OperatorSnapshotFutures snapshotFutures = new OperatorSnapshotFutures();
// 快照 Keyed State
if (keyedStateBackend != null) {
snapshotFutures.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
// 快照 Operator State
if (operatorStateBackend != null) {
snapshotFutures.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
return snapshotFutures;
}
}
生命周期完整流程
┌─────────────────────────────────────────────────────────────────────────┐
│ StreamOperator 生命周期 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Task 启动流程: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. 创建 Operator │ │
│ │ StreamOperatorFactory.createOperator() │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 2. setup() - 设置运行时上下文 │ │
│ │ operator.setup(containingTask, config, output) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 3. initializeState() - 初始化状态 │ │
│ │ 从 Checkpoint 恢复或初始化空状态 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 4. open() - 打开算子 │ │
│ │ 用户初始化逻辑 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 5. 处理数据 │ │
│ │ processElement() / processWatermark() │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 6. snapshotState() - Checkpoint 时快照状态 │ │
│ │ 保存算子状态 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 7. finish() - 完成处理 │ │
│ │ 处理完所有输入数据 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 8. close() - 关闭算子 │ │
│ │ 清理资源 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 关键时机: │
│ • open() 和 close() 成对调用 │
│ • snapshotState() 在 Checkpoint 期间调用 │
│ • finish() 在输入结束后、close() 之前调用 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
数据交换模式底层实现
StreamPartitioner 接口
// 位于 org.apache.flink.runtime.plugable.StreamPartitioner
public interface StreamPartitioner<T> extends Serializable {
// 选择目标通道
int[] select(T record, int numChannels);
// 分区器类型
StreamPartitionerType getPartitionerType();
}
// 分区器类型枚举
public enum StreamPartitionerType {
FORWARD, // 前向传递
KEYBY, // 哈希分区
SHUFFLE, // 随机分区
REBALANCE, // 轮询分区
RESCALE, // 缩放分区
BROADCAST, // 广播
GLOBAL, // 全局分区
CUSTOM // 自定义
}
ForwardPartitioner 实现
// 前向分区:一对一,用于 Chain 内部
public class ForwardPartitioner<T> implements StreamPartitioner<T> {
@Override
public int[] select(T record, int numChannels) {
// 始终返回第一个通道
// 因为上游和下游是一一对应的
return new int[]{0};
}
@Override
public StreamPartitionerType getPartitionerType() {
return StreamPartitionerType.FORWARD;
}
@Override
public String toString() {
return "FORWARD";
}
}
// 使用场景:
// 1. Operator Chain 内部
// 2. 上下游并行度相同且不需要重分区
// 3. 本地数据传递,无网络传输
KeyGroupStreamPartitioner 实现
// KeyBy 分区:哈希分区到特定 Key Group
public class KeyGroupStreamPartitioner<T, K> implements StreamPartitioner<T> {
// Key 选择器
private final KeySelector<T, K> keySelector;
// Key Group 数量(等于最大并行度)
private final int maxParallelism;
@Override
public int[] select(T record, int numChannels) {
K key;
try {
key = keySelector.getKey(record);
} catch (Exception e) {
throw new RuntimeException("Failed to extract key", e);
}
// 计算 Key Group ID
int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
// 将 Key Group 映射到物理通道(并行度)
int channel = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, numChannels, keyGroupId);
return new int[]{channel};
}
}
// Key Group 分配算法
public class KeyGroupRangeAssignment {
// 将 Key 分配到 Key Group
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
// 根据哈希值计算 Key Group
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
// 使用 MurmurHash 进行均匀分布
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
// 将 Key Group 映射到 Operator 实例
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId) {
int keyGroupRangeSize = maxParallelism / parallelism;
int start = keyGroupId / keyGroupRangeSize * keyGroupRangeSize;
// 确保均匀分布
int rangeIndex = keyGroupId / keyGroupRangeSize;
return rangeIndex;
}
}
BroadcastPartitioner 实现
// 广播分区:发送到所有下游通道
public class BroadcastPartitioner<T> implements StreamPartitioner<T> {
@Override
public int[] select(T record, int numChannels) {
// 返回所有通道
int[] channels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
channels[i] = i;
}
return channels;
}
@Override
public StreamPartitionerType getPartitionerType() {
return StreamPartitionerType.BROADCAST;
}
}
// 使用场景:
// 1. 广播状态(维度表)
// 2. 配置信息分发
// 3. 规则引擎规则广播
RebalancePartitioner 实现
// 轮询分区:均匀分配到下游通道
public class RebalancePartitioner<T> implements StreamPartitioner<T> {
// 当前通道索引(使用 Atomic 保证线程安全)
private final AtomicInteger currentChannel = new AtomicInteger(0);
@Override
public int[] select(T record, int numChannels) {
// 轮询选择下一个通道
int nextChannel = currentChannel.incrementAndGet() % numChannels;
return new int[]{nextChannel};
}
@Override
public StreamPartitionerType getPartitionerType() {
return StreamPartitionerType.REBALANCE;
}
}
// 使用场景:
// 1. 负载均衡
// 2. 数据重新分布
// 3. 消除数据倾斜
分区模式对比
┌─────────────────────────────────────────────────────────────────────────┐
│ 分区模式底层实现对比 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 分区器 目标通道 使用场景 网络传输 │ │
│ │ ────────────────────────────────────────────────────────────── │ │
│ │ Forward 单通道(固定 0) Chain 内部 无 │ │
│ │ KeyBy Key 哈希取模 聚合/Join 有 │ │
│ │ Broadcast 所有通道 维度广播 有(N份) │ │
│ │ Rebalance 轮询 负载均衡 有 │ │
│ │ Shuffle 随机 数据打散 有 │ │
│ │ Rescale 局部轮询 缩放 有(部分) │ │
│ │ Global 单通道(最后) 全局聚合 有 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 性能排序(低到高): │
│ Broadcast > Shuffle > Rebalance > Rescale > KeyBy > Forward │
│ │
│ 关键点: │
│ • Forward 完全本地,无网络开销 │
│ • KeyBy 保证相同 Key 到同一通道,支持状态 │
│ • Broadcast 数据复制,网络开销最大 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Source 与 Sink 实现原理
Source 实现架构
// Flink 1.12+ 新 Source API
public interface Source<T, SplitT extends SourceSplit, EnumChkT>
extends SourceReaderFactory<T, SplitT> {
// 创建 Split 枚举器(负责发现和分配 Split)
SplitEnumerator<SplitT, EnumChkT> createEnumerator(
SplitEnumeratorContext<SplitT> enumContext) throws Exception;
// 从 Checkpoint 恢复枚举器
SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
SplitEnumeratorContext<SplitT> enumContext,
EnumChkT checkpoint) throws Exception;
// 创建 SourceReader(负责读取数据)
SourceReader<T, SplitT> createReader(SourceReaderContext readerContext)
throws Exception;
// 获取 Split 序列化器
SimpleVersionedSerializer<SplitT> getSplitSerializer();
// 获取枚举器状态序列化器
SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}
// SourceReader 接口
public interface SourceReader<T, SplitT extends SourceSplit> {
// 启动 Reader
void start();
// 获取下一个记录
InputStatus pollNext(RecordEmitter<T> output) throws Exception;
// 处理来自枚举器的 Split 分配
void addSplits(List<SplitT> splits);
// 通知没有更多 Split
void notifyNoMoreSplits();
// 创建状态快照
List<SplitT> snapshotState(long checkpointId);
}
KafkaSource 核心实现
// KafkaSource 核心组件
public class KafkaSource<T> implements Source<T, KafkaPartitionSplit, KafkaSourceEnumState> {
// 配置
private final KafkaSourceReaderFactory<T> readerFactory;
private final KafkaPartitionSplitEnumeratorFactory enumFactory;
// 创建 Split 枚举器
@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
// 发现 Kafka Topic 分区
KafkaPartitionDiscoverer discoverer = new KafkaPartitionDiscoverer(
topics, kafkaProperties);
// 创建枚举器
return new KafkaPartitionSplitEnumerator(
enumContext,
discoverer,
assignmentStrategy);
}
}
// KafkaPartitionSplitEnumerator
public class KafkaPartitionSplitEnumerator
implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
// 已发现的分区
private final Set<TopicPartition> discoveredPartitions;
// 已分配的分区
private final Map<Integer, Set<TopicPartition>> assignedPartitions;
@Override
public void start() {
// 定期发现新分区
enumContext.callAsync(
this::discoverPartitions,
this::handleDiscoveredPartitions,
partitionDiscoveryInterval,
partitionDiscoveryInterval);
}
@Override
public void handleSplitRequest(int subtaskId, String requesterHostname) {
// 分配分区给 SourceReader
Set<TopicPartition> partitionsToAssign = selectPartitionsForSubtask(subtaskId);
if (!partitionsToAssign.isEmpty()) {
enumContext.assignSplit(
new KafkaPartitionSplit(partitionsToAssign),
subtaskId);
}
}
}
Sink 实现架构
// Flink 1.12+ 新 Sink API
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT>
extends Serializable {
// 创建 Writer
SinkWriter<InputT, CommT, WriterStateT> createWriter(
InitContext context) throws IOException;
// 创建提交器(用于两阶段提交)
Optional<Committer<CommT, GlobalCommT>> createCommitter() throws IOException;
// 创建全局提交器
Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter()
throws IOException;
}
// SinkWriter 接口
public interface SinkWriter<InputT, CommT, WriterStateT> {
// 写入数据
void write(InputT element, Context context) throws IOException;
// 准备提交(返回待提交的 CommT)
List<CommT> prepareCommit(boolean flush) throws IOException;
// 创建状态快照
List<WriterStateT> snapshotState(long checkpointId) throws IOException;
// 关闭 Writer
void close() throws IOException;
}
总结
本章从源码层面深入解析了 Flink 数据流模型:
| 概念 | 源码位置 | 核心机制 |
|---|---|---|
| Transformation | api.dag.Transformation | DataStream API 的逻辑表示 |
| StreamGraph | streaming.api.graph.StreamGraph | 执行图,包含 StreamNode 和 StreamEdge |
| Operator Chain | StreamGraph.isChainable() | Forward 连接、相同并行度的算子合并 |
| StreamOperator | streaming.api.operators.StreamOperator | 实际执行数据处理的算子 |
| StreamPartitioner | runtime.plugable.StreamPartitioner | 数据分区策略 |
关键要点:
- DataStream API 通过 Transformation 构建逻辑执行图
- StreamGraphGenerator 递归转换 Transformation 为 StreamGraph
- Operator Chain 减少 GC 和网络开销,提升性能
- 分区器决定数据如何分发到下游算子
参考资料
下一章预告
下一章将深入解析 时间与窗口,包括:
- Watermark 生成与传播源码分析
- 窗口分配器与触发器实现
- 窗口状态管理