← 返回文章列表

Flink 底层原理系列(三):数据流模型

2021-01-21·10 分钟阅读

前言

Flink 的数据流模型是其核心抽象。理解 DataStream API 如何转换为底层执行图,以及 Operator Chain 的形成机制,对于优化作业性能至关重要。本章将从源码层面深入解析数据流模型。

技术亮点

技术点难度面试价值本文覆盖
StreamGraph 构建⭐⭐⭐⭐⭐高频考点
Transformation 转换⭐⭐⭐⭐进阶考点
Operator Chain⭐⭐⭐⭐⭐高频考点
数据交换模式⭐⭐⭐⭐进阶考点

面试考点

  1. DataStream API 如何转换为执行图?
  2. Transformation 和 StreamNode 的关系是什么?
  3. Operator Chain 是如何形成的?有哪些条件?
  4. Forward 和 Hash 分区的底层实现有什么区别?

DataStream API 概述

核心 API 层级

┌─────────────────────────────────────────────────────────────────────────┐
│                        Flink API 层级                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │                     应用层                                       │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │         SQL / Table API(高级 API)                        │ │   │
│  │  │         声明式、易于使用                                    │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                              │                                  │   │
│  │                              ▼                                  │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │         DataStream API(核心 API)                         │ │   │
│  │  │         流式处理、灵活控制                                  │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                              │                                  │   │
│  │                              ▼                                  │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │         ProcessFunction(底层 API)                        │ │   │
│  │  │         事件处理、状态管理、定时器                          │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

DataStream 类型

┌─────────────────────────────────────────────────────────────────────────┐
│                        DataStream 类型                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  DataStream<T>                                                  │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │ • 最基本的数据流类型                                       │ │   │
│  │  │ • 表示一个无界或有限的数据元素流                           │ │   │
│  │  │ • 支持各种转换操作                                         │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  KeyedStream<K, T>                                              │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │ • DataStream.keyBy() 后的结果                              │ │   │
│  │  │ • 按 Key 分区的数据流                                      │ │   │
│  │  │ • 支持 Keyed State 和定时器                                │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  WindowedStream<K, T, W>                                       │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │ • KeyedStream.window() 后的结果                            │ │   │
│  │  │ • 支持窗口聚合操作                                         │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  ConnectedStream<T1, T2>                                        │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │ • connect() 两个 DataStream 的结果                         │ │   │
│  │  │ • 两个流可以共享状态                                       │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

StreamGraph 构建源码分析

核心类关系

┌─────────────────────────────────────────────────────────────────────────┐
│                        核心类关系图                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  用户代码                     内部表示                          │   │
│  │  ┌─────────────────┐         ┌─────────────────┐               │   │
│  │  │ DataStream API  │────────►│ Transformation  │               │   │
│  │  │ .map()          │         │ (逻辑算子树)     │               │   │
│  │  │ .filter()       │         └────────┬────────┘               │   │
│  │  │ .keyBy()        │                  │                         │   │
│  │  └─────────────────┘                  │ StreamGraphGenerator    │   │
│  │                                       ▼                         │   │
│  │                          ┌─────────────────┐                    │   │
│  │                          │   StreamGraph   │                    │   │
│  │                          │  (执行图)        │                    │   │
│  │                          │                 │                    │   │
│  │                          │  StreamNode     │                    │   │
│  │                          │  StreamEdge     │                    │   │
│  │                          └─────────────────┘                    │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Transformation 类层次

// Transformation 是 DataStream API 的逻辑表示
// 位于 org.apache.flink.api.dag.Transformation

/**
 * Transformation 抽象基类
 * 每个 DataStream 操作都会创建一个 Transformation
 */
@Internal
public abstract class Transformation<T> {
    
    // 唯一标识符
    protected final int id;
    
    // 输出类型信息
    protected TypeInformation<T> outputType;
    
    // 名称
    protected String name;
    
    // 并行度
    protected Integer parallelism;
    
    // 上游输入 Transformation
    // 用于构建 DAG
}

// 具体实现类:

// 1. SourceTransformation - 数据源
public class SourceTransformation<OUT> extends Transformation<OUT> {
    private final Source<OUT, Split, Enum> source;
    private final WatermarkStrategy<OUT> watermarkStrategy;
}

// 2. OneInputTransformation - 单输入算子(map, filter 等)
public class OneInputTransformation<IN, OUT> extends Transformation<OUT> {
    private final Transformation<IN> input;  // 上游输入
    private final StreamOperatorFactory<OUT> operatorFactory;  // 算子工厂
}

// 3. TwoInputTransformation - 双输入算子(connect, join)
public class TwoInputTransformation<IN1, IN2, OUT> extends Transformation<OUT> {
    private final Transformation<IN1> input1;
    private final Transformation<IN2> input2;
    private final StreamOperatorFactory<OUT> operatorFactory;
}

// 4. PartitionTransformation - 分区转换(keyBy, rebalance)
public class PartitionTransformation<T> extends Transformation<T> {
    private final Transformation<T> input;
    private final StreamPartitioner<T> partitioner;  // 分区器
}

// 5. SinkTransformation - 数据输出
public class SinkTransformation<IN> extends Transformation<Void> {
    private final Transformation<IN> input;
    private final Sink<IN> sink;
}

StreamGraphGenerator 核心逻辑

// 位于 org.apache.flink.streaming.api.graph.StreamGraphGenerator

public class StreamGraphGenerator {
    
    // Transformation 到 StreamGraph 的转换入口
    public StreamGraph generate() {
        // 创建空的 StreamGraph
        streamGraph = new StreamGraph(
            configuration,
            executionConfig,
            checkpointConfig);
        
        // 递归处理所有 Transformation
        for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }
        
        return streamGraph;
    }
    
    // 核心:Transformation 转换为 StreamNode/StreamEdge
    private Collection<Integer> transform(Transformation<?> transform) {
        
        // 检查是否已处理(避免重复处理)
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }
        
        // 根据 Transformation 类型分发处理
        Collection<Integer> transformedIds;
        switch (transform.getTransformType()) {
            case SOURCE:
                transformedIds = transformSource((SourceTransformation<?>) transform);
                break;
            case ONE_INPUT:
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
                break;
            case TWO_INPUT:
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
                break;
            case PARTITION:
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
                break;
            case SINK:
                transformedIds = transformSink((SinkTransformation<?>) transform);
                break;
            // ... 其他类型
        }
        
        return transformedIds;
    }
    
    // OneInputTransformation 转换示例
    private <IN, OUT> Collection<Integer> transformOneInputTransform(
            OneInputTransformation<IN, OUT> transform) {
        
        // 1. 先递归处理上游输入
        Collection<Integer> inputIds = transform(transform.getInput());
        
        // 2. 创建 StreamNode
        String operatorName = transform.getName();
        StreamOperatorFactory<OUT> operatorFactory = transform.getOperatorFactory();
        
        int vertexId = streamGraph.addOperator(
            transform.getId(),
            operatorName,
            operatorFactory,
            transform.getInputType(),
            transform.getOutputType()
        );
        
        // 3. 创建 StreamEdge 连接上下游
        for (Integer inputId : inputIds) {
            streamGraph.addEdge(
                inputId,
                vertexId,
                0,  // 输出端口
                0,  // 输入端口
                // 分区器、输出标签等
            );
        }
        
        return Collections.singletonList(vertexId);
    }
}

StreamGraph 核心数据结构

// 位于 org.apache.flink.streaming.api.graph.StreamGraph

public class StreamGraph implements Pipeline {
    
    // StreamNode 集合:算子节点
    private final Map<Integer, StreamNode> streamNodes;
    
    // StreamEdge 集合:算子之间的边
    private final Set<StreamEdge> streamEdges;
    
    // Source 节点 ID
    private final Set<Integer> sources;
    
    // Sink 节点 ID
    private final Set<Integer> sinks;
    
    // 添加算子节点
    public <IN, OUT> int addOperator(
            int vertexId,
            String operatorName,
            StreamOperatorFactory<OUT> operatorFactory,
            TypeInformation<IN> inTypeInfo,
            TypeInformation<OUT> outTypeInfo) {
        
        StreamNode vertex = new StreamNode(
            vertexId,
            operatorName,
            operatorFactory,
            inTypeInfo,
            outTypeInfo
        );
        
        streamNodes.put(vertexId, vertex);
        return vertexId;
    }
    
    // 添加边
    public void addEdge(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int outputIndex,
            int inputIndex,
            StreamPartitioner<?> partitioner,
            OutputTag outputTag) {
        
        StreamEdge edge = new StreamEdge(
            getSourceVertex(upStreamVertexID),
            getTargetVertex(downStreamVertexID),
            outputIndex,
            inputIndex,
            partitioner,
            outputTag
        );
        
        streamEdges.add(edge);
        
        // 建立节点间的连接关系
        streamNodes.get(upStreamVertexID).addOutEdge(edge);
        streamNodes.get(downStreamVertexID).addInEdge(edge);
    }
}

StreamNode 与 StreamEdge

// StreamNode:算子节点
public class StreamNode {
    
    private final int id;
    private final String operatorName;
    private final StreamOperatorFactory<?> operatorFactory;
    
    // 输入输出边
    private List<StreamEdge> inEdges = new ArrayList<>();
    private List<StreamEdge> outEdges = new ArrayList<>();
    
    // 类型信息
    private TypeInformation<?> inputTypeInfo;
    private TypeInformation<?> outputTypeInfo;
    
    // 资源配置
    private ResourceSpec minResources;
    private ResourceSpec preferredResources;
    
    // Operator Chain 相关
    private int outputIndex;  // 在 Chain 中的输出位置
}

// StreamEdge:算子之间的连接
public class StreamEdge {
    
    private final StreamNode sourceVertex;    // 上游节点
    private final StreamNode targetVertex;    // 下游节点
    
    // 分区器(决定数据如何分发)
    private final StreamPartitioner<?> outputPartitioner;
    
    // 输出标签(侧输出)
    private final OutputTag outputTag;
    
    // 端口索引
    private final int outputIndex;
    private final int inputIndex;
}

完整转换流程示例

┌─────────────────────────────────────────────────────────────────────────┐
│                    DataStream API → StreamGraph 转换流程                 │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  用户代码:                                                              │
│  DataStream<String> stream = env.addSource(new MySource())             │
│      .map(new MyMap())                                                 │
│      .filter(new MyFilter())                                           │
│      .keyBy(x -> x.getKey())                                           │
│      .sum("value")                                                     │
│ .addSink(new MySink());                                                │
│                                                                         │
│  转换过程:                                                              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  Step 1: 构建 Transformation 链                                 │   │
│  │  ┌─────────────────────────────────────────────────────────┐   │   │
│  │  │                                                         │   │   │
│  │  │  SourceTransformation(id=1)                             │   │   │
│  │  │         │                                              │   │   │
│  │  │         ▼                                              │   │   │
│  │  │  OneInputTransformation(id=2, Map)                      │   │   │
│  │  │         │                                              │   │   │
│  │  │         ▼                                              │   │   │
│  │  │  OneInputTransformation(id=3, Filter)                   │   │   │
│  │  │         │                                              │   │   │
│  │  │         ▼                                              │   │   │
│  │  │  PartitionTransformation(id=4, KeyBy)                   │   │   │
│  │  │         │                                              │   │   │
│  │  │         ▼                                              │   │   │
│  │  │  OneInputTransformation(id=5, Sum)                      │   │   │
│  │  │         │                                              │   │   │
│  │  │         ▼                                              │   │   │
│  │  │  SinkTransformation(id=6)                               │   │   │
│  │  │                                                         │   │   │
│  │  └─────────────────────────────────────────────────────────┘   │   │
│  │                                                                 │   │
│  │  Step 2: StreamGraphGenerator.transform() 递归转换             │   │
│  │  ┌─────────────────────────────────────────────────────────┐   │   │
│  │  │                                                         │   │   │
│  │  │  StreamNode[1] ──► StreamNode[2] ──► StreamNode[3]      │   │   │
│  │  │  (Source)      (Map)         (Filter)                   │   │   │
│  │  │       │              │              │                    │   │   │
│  │  │       └──────────────┴──────────────┘                    │   │   │
│  │  │                    │                                     │   │   │
│  │  │                    ▼ (HashPartition)                     │   │   │
│  │  │              StreamNode[4] ──► StreamNode[5]             │   │   │
│  │  │              (KeyedProcess)   (Sink)                     │   │   │
│  │  │                                                         │   │   │
│  │  │  StreamEdge 携带分区器信息                               │   │   │
│  │  │                                                         │   │   │
│  │  └─────────────────────────────────────────────────────────┘   │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Operator Chain 形成机制

Chain 条件判断

// 位于 org.apache.flink.streaming.api.graph.StreamGraph

public class StreamGraph {
    
    // 判断两个算子是否可以 Chain
    public boolean isChainable(StreamEdge edge) {
        StreamNode upStreamVertex = edge.getSourceVertex();
        StreamNode downStreamVertex = edge.getTargetVertex();
        
        // 1. 下游算子的输入只有一个(一对一连接)
        if (downStreamVertex.getInEdges().size() != 1) {
            return false;
        }
        
        // 2. 分区器必须是 Forward(一对一,不重分区)
        if (!(edge.getOutputPartitioner() instanceof ForwardPartitioner)) {
            return false;
        }
        
        // 3. 上下游并行度必须相同
        if (upStreamVertex.getParallelism() != downStreamVertex.getParallelism()) {
            return false;
        }
        
        // 4. 下游算子没有显式禁用 Chain
        if (downStreamVertex.isDisableChain()) {
            return false;
        }
        
        // 5. 上游算子允许链接到下游
        if (!upStreamVertex.getOperatorFactory().isOutputTypeBlocking()) {
            // 检查类型兼容性
        }
        
        // 6. Slot Sharing Group 相同
        if (!upStreamVertex.getSlotSharingGroup().equals(
                downStreamVertex.getSlotSharingGroup())) {
            return false;
        }
        
        return true;
    }
}

Chain 形成过程

// 位于 org.apache.flink.streaming.api.graph.StreamGraph

public class StreamGraph {
    
    // 构建 Operator Chain
    public void setOperatorChain() {
        // 收集所有可以 Chain 的边
        Map<Integer, List<StreamEdge>> chains = new HashMap<>();
        
        for (StreamEdge edge : streamEdges) {
            if (isChainable(edge)) {
                int startNodeId = edge.getSourceVertex().getId();
                chains.computeIfAbsent(startNodeId, k -> new ArrayList<>())
                      .add(edge);
            }
        }
        
        // 构建 Chain
        for (StreamNode node : streamNodes.values()) {
            if (chains.containsKey(node.getId())) {
                // 设置 Chain 头
                node.setChainStart(node.getId());
                
                // 设置 Chain 尾
                for (StreamEdge edge : chains.get(node.getId())) {
                    StreamNode targetNode = edge.getTargetVertex();
                    targetNode.setChainStart(node.getId());
                }
            }
        }
    }
}

Chain 数据结构

// OperatorChain 类
public class OperatorChain<IN, OUT> implements StreamOperator<OUT> {
    
    // Chain 中的所有算子
    private final StreamOperator<?>[] allOperators;
    
    // 输出收集器(Chain 内部传递)
    private final Output<StreamRecord<OUT>> chainEntryPoint;
    
    // Chain 输出(发送到下游)
    private final Output<StreamRecord<?>> chainEndOutput;
    
    // 初始化 Chain
    public OperatorChain(
            StreamTask<OUT, ?> containingTask,
            StreamOperatorFactory<OUT> operatorFactory,
            MailboxExecutor mailboxExecutor) {
        
        // 构建算子链
        // 从 Chain 头开始,依次创建算子
        // 每个算子的输出连接到下一个算子的输入
        // 最后一个算子的输出连接到 chainEndOutput
    }
    
    // Chain 内部数据传递
    // 避免了序列化和网络传输
    private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
        
        private final Input<T> input;  // 下游算子的输入
        private final Counter numRecordsIn;
        
        @Override
        public void collect(StreamRecord<T> record) {
            // 直接调用下游算子的 processElement
            input.processElement(record);
        }
    }
}

Chain 示意图

┌─────────────────────────────────────────────────────────────────────────┐
│                        Operator Chain 形成                               │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  原 StreamGraph:                                                       │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  Source(2) ──Forward──► Map(2) ──Forward──► Filter(2)          │   │
│  │      │                       │                      │            │   │
│  │      ▼                       ▼                      ▼            │   │
│  │  [Node 1]                 [Node 2]              [Node 3]        │   │
│  │                                                         │         │   │
│  │                                              HashPartition         │   │
│  │                                                         ▼         │   │
│  │                                               KeyBy(2) ──► Sink(2) │   │
│  │                                              [Node 4]     [Node 5] │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼ Chain 优化                               │
│  优化后的 JobGraph:                                                    │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  JobVertex 1                    JobVertex 2                     │   │
│  │  ┌──────────────────────┐      ┌──────────────────────┐        │   │
│  │  │ Operator Chain:      │      │ Operator Chain:      │        │   │
│  │  │ Source → Map → Filter│      │ KeyBy → Sink         │        │   │
│  │  │ (parallelism=2)      │      │ (parallelism=2)      │        │   │
│  │  │                      │      │                      │        │   │
│  │  │ 内部直接方法调用      │      │ Hash分区后写入       │        │   │
│  │  │ 无序列化/网络传输     │      │ 需要网络传输         │        │   │
│  │  └──────────────────────┘      └──────────────────────┘        │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  Chain 条件分析:                                                       │
│  • Source → Map:Forward + 并行度相同 = 可 Chain ✓                     │
│  • Map → Filter:Forward + 并行度相同 = 可 Chain ✓                     │
│  • Filter → KeyBy:Hash 分区 = 不可 Chain ✗                            │
│  • KeyBy → Sink:Forward + 并行度相同 = 可 Chain ✓                     │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Chain 的性能优势

┌─────────────────────────────────────────────────────────────────────────┐
│                        Chain 性能优势                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  无 Chain 模式:                                                        │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  ┌────────┐    序列化    ┌────────┐    序列化    ┌────────┐   │   │
│  │  │Source  │ ──────────► │  Map   │ ──────────► │ Filter │   │   │
│  │  └────────┘   网络传输   └────────┘   网络传输   └────────┘   │   │
│  │                                                                 │   │
│  │  开销:                                                         │   │
│  │  • 数据序列化/反序列化                                          │   │
│  │  • 网络传输延迟                                                 │   │
│  │  • 线程切换开销                                                 │   │
│  │  • 缓冲区分配                                                   │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  有 Chain 模式:                                                        │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  ┌─────────────────────────────────────────────────────────┐   │   │
│  │  │              Operator Chain                             │   │   │
│  │  │  ┌────────┐         ┌────────┐         ┌────────┐      │   │   │
│  │  │  │Source  │ ──────► │  Map   │ ──────► │ Filter │      │   │   │
│  │  │  └────────┘ 直接调用 └────────┘ 直接调用 └────────┘      │   │   │
│  │  │                                                         │   │   │
│  │  │         同一线程,无序列化,无网络                       │   │   │
│  │  └─────────────────────────────────────────────────────────┘   │   │
│  │                                                                 │   │
│  │  优势:                                                         │   │
│  │  • 无序列化/反序列化 ✓                                          │   │
│  │  • 无网络传输 ✓                                                 │   │
│  │  • 无线程切换 ✓                                                 │   │
│  │  • 内存访问友好 ✓                                               │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

StreamOperator 生命周期

StreamOperator 接口

// 位于 org.apache.flink.streaming.api.operators.StreamOperator

public interface StreamOperator<OUT> {
    
    // 初始化(Task 启动时调用)
    void open() throws Exception;
    
    // 关闭(Task 结束时调用)
    void close() throws Exception;
    
    // 生命周期事件处理
    void finish() throws Exception;
    
    // 保存算子状态
    OperatorSnapshotFutures snapshotState(
        long checkpointId,
        long timestamp,
        CheckpointOptions checkpointOptions,
        CheckpointStreamFactory factory) throws Exception;
    
    // 初始化算子状态
    void initializeState(StreamTaskStateInitializer streamTaskStateManager) 
        throws Exception;
    
    // 设置度量组
    void setMetricGroup(MetricGroup metricGroup);
    
    // 设置 KeyContext(用于 KeyedStream)
    void setKeyContextElement1(StreamRecord<?> record) throws Exception;
}

AbstractStreamOperator 基类

// 位于 org.apache.flink.streaming.api.operators.AbstractStreamOperator

public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Serializable {
    
    // 运行时上下文
    protected StreamingRuntimeContext runtimeContext;
    
    // 状态后端
    protected KeyedStateBackend<?> keyedStateBackend;
    protected OperatorStateBackend operatorStateBackend;
    
    // 度量组
    protected MetricGroup metrics;
    
    // 输出收集器
    protected Output<StreamRecord<OUT>> output;
    
    // 初始化
    @Override
    public void open() throws Exception {
        // 子类可以覆盖此方法进行初始化
    }
    
    // 关闭
    @Override
    public void close() throws Exception {
        // 清理资源
    }
    
    // 状态快照
    @Override
    public OperatorSnapshotFutures snapshotState(
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {
        
        KeyGroupRange keyGroupRange = keyedStateBackend != null
            ? keyedStateBackend.getKeyGroupRange()
            : null;
        
        OperatorSnapshotFutures snapshotFutures = new OperatorSnapshotFutures();
        
        // 快照 Keyed State
        if (keyedStateBackend != null) {
            snapshotFutures.setKeyedStateManagedFuture(
                keyedStateBackend.snapshot(
                    checkpointId, timestamp, factory, checkpointOptions));
        }
        
        // 快照 Operator State
        if (operatorStateBackend != null) {
            snapshotFutures.setOperatorStateManagedFuture(
                operatorStateBackend.snapshot(
                    checkpointId, timestamp, factory, checkpointOptions));
        }
        
        return snapshotFutures;
    }
}

生命周期完整流程

┌─────────────────────────────────────────────────────────────────────────┐
│                        StreamOperator 生命周期                           │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Task 启动流程:                                                        │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  1. 创建 Operator                                               │   │
│  │     StreamOperatorFactory.createOperator()                      │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  2. setup() - 设置运行时上下文                                   │   │
│  │     operator.setup(containingTask, config, output)              │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  3. initializeState() - 初始化状态                               │   │
│  │     从 Checkpoint 恢复或初始化空状态                             │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  4. open() - 打开算子                                           │   │
│  │     用户初始化逻辑                                               │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  5. 处理数据                                                    │   │
│  │     processElement() / processWatermark()                       │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  6. snapshotState() - Checkpoint 时快照状态                      │   │
│  │     保存算子状态                                                 │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  7. finish() - 完成处理                                         │   │
│  │     处理完所有输入数据                                           │   │
│  │         │                                                       │   │
│  │         ▼                                                       │   │
│  │  8. close() - 关闭算子                                          │   │
│  │     清理资源                                                     │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  关键时机:                                                             │
│  • open() 和 close() 成对调用                                          │
│  • snapshotState() 在 Checkpoint 期间调用                              │
│  • finish() 在输入结束后、close() 之前调用                              │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

数据交换模式底层实现

StreamPartitioner 接口

// 位于 org.apache.flink.runtime.plugable.StreamPartitioner

public interface StreamPartitioner<T> extends Serializable {
    
    // 选择目标通道
    int[] select(T record, int numChannels);
    
    // 分区器类型
    StreamPartitionerType getPartitionerType();
}

// 分区器类型枚举
public enum StreamPartitionerType {
    FORWARD,        // 前向传递
    KEYBY,          // 哈希分区
    SHUFFLE,        // 随机分区
    REBALANCE,      // 轮询分区
    RESCALE,        // 缩放分区
    BROADCAST,      // 广播
    GLOBAL,         // 全局分区
    CUSTOM          // 自定义
}

ForwardPartitioner 实现

// 前向分区:一对一,用于 Chain 内部

public class ForwardPartitioner<T> implements StreamPartitioner<T> {
    
    @Override
    public int[] select(T record, int numChannels) {
        // 始终返回第一个通道
        // 因为上游和下游是一一对应的
        return new int[]{0};
    }
    
    @Override
    public StreamPartitionerType getPartitionerType() {
        return StreamPartitionerType.FORWARD;
    }
    
    @Override
    public String toString() {
        return "FORWARD";
    }
}

// 使用场景:
// 1. Operator Chain 内部
// 2. 上下游并行度相同且不需要重分区
// 3. 本地数据传递,无网络传输

KeyGroupStreamPartitioner 实现

// KeyBy 分区:哈希分区到特定 Key Group

public class KeyGroupStreamPartitioner<T, K> implements StreamPartitioner<T> {
    
    // Key 选择器
    private final KeySelector<T, K> keySelector;
    
    // Key Group 数量(等于最大并行度)
    private final int maxParallelism;
    
    @Override
    public int[] select(T record, int numChannels) {
        K key;
        try {
            key = keySelector.getKey(record);
        } catch (Exception e) {
            throw new RuntimeException("Failed to extract key", e);
        }
        
        // 计算 Key Group ID
        int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
        
        // 将 Key Group 映射到物理通道(并行度)
        int channel = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
            maxParallelism, numChannels, keyGroupId);
        
        return new int[]{channel};
    }
}

// Key Group 分配算法
public class KeyGroupRangeAssignment {
    
    // 将 Key 分配到 Key Group
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
    
    // 根据哈希值计算 Key Group
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        // 使用 MurmurHash 进行均匀分布
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }
    
    // 将 Key Group 映射到 Operator 实例
    public static int computeOperatorIndexForKeyGroup(
            int maxParallelism, int parallelism, int keyGroupId) {
        
        int keyGroupRangeSize = maxParallelism / parallelism;
        int start = keyGroupId / keyGroupRangeSize * keyGroupRangeSize;
        
        // 确保均匀分布
        int rangeIndex = keyGroupId / keyGroupRangeSize;
        return rangeIndex;
    }
}

BroadcastPartitioner 实现

// 广播分区:发送到所有下游通道

public class BroadcastPartitioner<T> implements StreamPartitioner<T> {
    
    @Override
    public int[] select(T record, int numChannels) {
        // 返回所有通道
        int[] channels = new int[numChannels];
        for (int i = 0; i < numChannels; i++) {
            channels[i] = i;
        }
        return channels;
    }
    
    @Override
    public StreamPartitionerType getPartitionerType() {
        return StreamPartitionerType.BROADCAST;
    }
}

// 使用场景:
// 1. 广播状态(维度表)
// 2. 配置信息分发
// 3. 规则引擎规则广播

RebalancePartitioner 实现

// 轮询分区:均匀分配到下游通道

public class RebalancePartitioner<T> implements StreamPartitioner<T> {
    
    // 当前通道索引(使用 Atomic 保证线程安全)
    private final AtomicInteger currentChannel = new AtomicInteger(0);
    
    @Override
    public int[] select(T record, int numChannels) {
        // 轮询选择下一个通道
        int nextChannel = currentChannel.incrementAndGet() % numChannels;
        return new int[]{nextChannel};
    }
    
    @Override
    public StreamPartitionerType getPartitionerType() {
        return StreamPartitionerType.REBALANCE;
    }
}

// 使用场景:
// 1. 负载均衡
// 2. 数据重新分布
// 3. 消除数据倾斜

分区模式对比

┌─────────────────────────────────────────────────────────────────────────┐
│                        分区模式底层实现对比                              │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  分区器           目标通道          使用场景       网络传输     │   │
│  │  ────────────────────────────────────────────────────────────── │   │
│  │  Forward          单通道(固定 0)   Chain 内部     无           │   │
│  │  KeyBy            Key 哈希取模      聚合/Join      有           │   │
│  │  Broadcast        所有通道          维度广播       有(N份)     │   │
│  │  Rebalance        轮询              负载均衡       有           │   │
│  │  Shuffle          随机              数据打散       有           │   │
│  │  Rescale          局部轮询          缩放           有(部分)    │   │
│  │  Global           单通道(最后)     全局聚合       有           │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  性能排序(低到高):                                                    │
│  Broadcast > Shuffle > Rebalance > Rescale > KeyBy > Forward           │
│                                                                         │
│  关键点:                                                               │
│  • Forward 完全本地,无网络开销                                         │
│  • KeyBy 保证相同 Key 到同一通道,支持状态                              │
│  • Broadcast 数据复制,网络开销最大                                     │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Source 与 Sink 实现原理

Source 实现架构

// Flink 1.12+ 新 Source API

public interface Source<T, SplitT extends SourceSplit, EnumChkT> 
        extends SourceReaderFactory<T, SplitT> {
    
    // 创建 Split 枚举器(负责发现和分配 Split)
    SplitEnumerator<SplitT, EnumChkT> createEnumerator(
        SplitEnumeratorContext<SplitT> enumContext) throws Exception;
    
    // 从 Checkpoint 恢复枚举器
    SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
        SplitEnumeratorContext<SplitT> enumContext,
        EnumChkT checkpoint) throws Exception;
    
    // 创建 SourceReader(负责读取数据)
    SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) 
        throws Exception;
    
    // 获取 Split 序列化器
    SimpleVersionedSerializer<SplitT> getSplitSerializer();
    
    // 获取枚举器状态序列化器
    SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}

// SourceReader 接口
public interface SourceReader<T, SplitT extends SourceSplit> {
    
    // 启动 Reader
    void start();
    
    // 获取下一个记录
    InputStatus pollNext(RecordEmitter<T> output) throws Exception;
    
    // 处理来自枚举器的 Split 分配
    void addSplits(List<SplitT> splits);
    
    // 通知没有更多 Split
    void notifyNoMoreSplits();
    
    // 创建状态快照
    List<SplitT> snapshotState(long checkpointId);
}

KafkaSource 核心实现

// KafkaSource 核心组件

public class KafkaSource<T> implements Source<T, KafkaPartitionSplit, KafkaSourceEnumState> {
    
    // 配置
    private final KafkaSourceReaderFactory<T> readerFactory;
    private final KafkaPartitionSplitEnumeratorFactory enumFactory;
    
    // 创建 Split 枚举器
    @Override
    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(
            SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
        
        // 发现 Kafka Topic 分区
        KafkaPartitionDiscoverer discoverer = new KafkaPartitionDiscoverer(
            topics, kafkaProperties);
        
        // 创建枚举器
        return new KafkaPartitionSplitEnumerator(
            enumContext,
            discoverer,
            assignmentStrategy);
    }
}

// KafkaPartitionSplitEnumerator
public class KafkaPartitionSplitEnumerator 
        implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
    
    // 已发现的分区
    private final Set<TopicPartition> discoveredPartitions;
    
    // 已分配的分区
    private final Map<Integer, Set<TopicPartition>> assignedPartitions;
    
    @Override
    public void start() {
        // 定期发现新分区
        enumContext.callAsync(
            this::discoverPartitions,
            this::handleDiscoveredPartitions,
            partitionDiscoveryInterval,
            partitionDiscoveryInterval);
    }
    
    @Override
    public void handleSplitRequest(int subtaskId, String requesterHostname) {
        // 分配分区给 SourceReader
        Set<TopicPartition> partitionsToAssign = selectPartitionsForSubtask(subtaskId);
        if (!partitionsToAssign.isEmpty()) {
            enumContext.assignSplit(
                new KafkaPartitionSplit(partitionsToAssign), 
                subtaskId);
        }
    }
}

Sink 实现架构

// Flink 1.12+ 新 Sink API

public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> 
        extends Serializable {
    
    // 创建 Writer
    SinkWriter<InputT, CommT, WriterStateT> createWriter(
        InitContext context) throws IOException;
    
    // 创建提交器(用于两阶段提交)
    Optional<Committer<CommT, GlobalCommT>> createCommitter() throws IOException;
    
    // 创建全局提交器
    Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() 
        throws IOException;
}

// SinkWriter 接口
public interface SinkWriter<InputT, CommT, WriterStateT> {
    
    // 写入数据
    void write(InputT element, Context context) throws IOException;
    
    // 准备提交(返回待提交的 CommT)
    List<CommT> prepareCommit(boolean flush) throws IOException;
    
    // 创建状态快照
    List<WriterStateT> snapshotState(long checkpointId) throws IOException;
    
    // 关闭 Writer
    void close() throws IOException;
}

总结

本章从源码层面深入解析了 Flink 数据流模型:

概念源码位置核心机制
Transformationapi.dag.TransformationDataStream API 的逻辑表示
StreamGraphstreaming.api.graph.StreamGraph执行图,包含 StreamNode 和 StreamEdge
Operator ChainStreamGraph.isChainable()Forward 连接、相同并行度的算子合并
StreamOperatorstreaming.api.operators.StreamOperator实际执行数据处理的算子
StreamPartitionerruntime.plugable.StreamPartitioner数据分区策略

关键要点

  1. DataStream API 通过 Transformation 构建逻辑执行图
  2. StreamGraphGenerator 递归转换 Transformation 为 StreamGraph
  3. Operator Chain 减少 GC 和网络开销,提升性能
  4. 分区器决定数据如何分发到下游算子

参考资料

下一章预告

下一章将深入解析 时间与窗口,包括:

  • Watermark 生成与传播源码分析
  • 窗口分配器与触发器实现
  • 窗口状态管理
分享: