← 返回文章列表

Flink 底层原理系列(五):状态管理

2021-02-12·9 分钟阅读

前言

状态管理是 Flink 的核心特性之一。通过状态管理,Flink 能够实现精确一次语义、支持复杂的业务逻辑,并在故障恢复后继续处理。本章将从源码层面深入解析状态访问和存储的底层实现。

技术亮点

技术点难度面试价值本文覆盖
Keyed State 源码⭐⭐⭐⭐高频考点
Operator State⭐⭐⭐高频考点
State Backend 架构⭐⭐⭐⭐高频考点
RocksDB 写入流程⭐⭐⭐⭐⭐进阶考点

面试考点

  1. Keyed State 和 Operator State 的底层实现有什么区别?
  2. State Backend 的架构是怎样的?
  3. RocksDB 如何管理 Flink 状态?
  4. 状态快照是如何实现的?

状态类型与架构

状态类型对比

┌─────────────────────────────────────────────────────────────────────────┐
│                        Flink 状态类型架构                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  Keyed State(键控状态)                                        │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │                                                           │ │   │
│  │  │  特点:                                                    │ │   │
│  │  │  • 绑定到 Key,只能在 KeyedStream 上使用                   │ │   │
│  │  │  • 每个 Key 有独立的状态实例                               │ │   │
│  │  │  • 自动分区,随 Key 迁移                                   │ │   │
│  │  │                                                           │ │   │
│  │  │  类型:                                                    │ │   │
│  │  │  • ValueState<T>: 单值状态                                 │ │   │
│  │  │  • ListState<T>: 列表状态                                  │ │   │
│  │  │  • MapState<K, V>: 映射状态                                │ │   │
│  │  │  • ReducingState<T>: 归约状态                              │ │   │
│  │  │  • AggregatingState<IN, OUT>: 聚合状态                     │ │   │
│  │  │                                                           │ │   │
│  │  │  存储位置:State Backend 决定                              │ │   │
│  │  │  • HeapStateBackend: JVM 堆内存                           │ │   │
│  │  │  • RocksDBStateBackend: RocksDB(堆外)                   │ │   │
│  │  │                                                           │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  Operator State(算子状态)                                     │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │                                                           │ │   │
│  │  │  特点:                                                    │ │   │
│  │  │  • 绑定到算子并行度,与 Key 无关                           │ │   │
│  │  │  • 每个并行实例有独立的状态                                 │ │   │
│  │  │  • 需要手动管理恢复方式                                     │ │   │
│  │  │                                                           │ │   │
│  │  │  类型:                                                    │ │   │
│  │  │  • ListState<T>: 列表状态                                  │ │   │
│  │  │  • UnionListState<T>: 联合列表状态                         │ │   │
│  │  │  • BroadcastState<K, V>: 广播状态                          │ │   │
│  │  │                                                           │ │   │
│  │  │  恢复策略:                                                │ │   │
│  │  │  • EVEN_DISTRIBUTION: 均匀分配                            │ │   │
│  │  │  • UNION: 联合(所有实例获取全量状态)                     │ │   │
│  │  │  • BROADCAST: 广播(所有实例获取相同状态)                 │ │   │
│  │  │                                                           │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

状态管理架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        状态管理架构                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  StreamOperator                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  RuntimeContext                                                 │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │                                                           │ │   │
│  │  │  getKeyedStateStore()     getOperatorStateStore()         │ │   │
│  │  │         │                         │                        │ │   │
│  │  └─────────┼─────────────────────────┼────────────────────────┘ │   │
│  │            │                         │                          │   │
│  │            ▼                         ▼                          │   │
│  │  ┌─────────────────┐       ┌─────────────────┐                │   │
│  │  │ KeyedStateStore │       │OperatorStateStore│                │   │
│  │  └────────┬────────┘       └────────┬────────┘                │   │
│  │           │                         │                          │   │
│  │           └───────────┬─────────────┘                          │   │
│  │                       │                                        │   │
│  │                       ▼                                        │   │
│  │              ┌─────────────────┐                               │   │
│  │              │ StateBackend    │                               │   │
│  │              │                 │                               │   │
│  │              │ • HashMapStateBE│                               │   │
│  │              │ • RocksDBStateBE│                               │   │
│  │              └────────┬────────┘                               │   │
│  │                       │                                        │   │
│  │                       ▼                                        │   │
│  │              ┌─────────────────┐                               │   │
│  │              │ CheckpointStorage│                              │   │
│  │              │                 │                               │   │
│  │              │ • FileSystem    │                               │   │
│  │              │ • Memory        │                               │   │
│  │              └─────────────────┘                               │   │
│  │                                                               │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Keyed State 底层实现

StateDescriptor 与 State 绑定

// 位于 org.apache.flink.api.common.state.StateDescriptor

/**
 * StateDescriptor 是状态的描述符
 * 定义状态的名称、类型、默认值等
 */
public abstract class StateDescriptor<S extends State, T> implements Serializable {
    
    // 状态名称
    protected final String name;
    
    // 类型序列化器
    protected TypeSerializer<T> serializer;
    
    // 默认值
    protected final T defaultValue;
    
    // TTL 配置
    protected StateTtlConfig ttlConfig;
    
    protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
        this.name = name;
        this.serializer = serializer;
        this.defaultValue = defaultValue;
    }
    
    // 获取状态名称
    public String getName() {
        return name;
    }
    
    // 获取序列化器
    public TypeSerializer<T> getSerializer() {
        return serializer;
    }
    
    // 启用 TTL
    public void enableTimeToLive(StateTtlConfig ttlConfig) {
        this.ttlConfig = ttlConfig;
    }
}

// ValueStateDescriptor 实现
public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
    
    public ValueStateDescriptor(String name, Class<T> typeClass) {
        super(name, new TypeInformation<T>(){}.createSerializer(null), null);
    }
    
    public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
        super(name, typeSerializer, null);
    }
}

// ListStateDescriptor 实现
public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
    
    public ListStateDescriptor(String name, Class<T> elementTypeClass) {
        super(name, new ListSerializer<>(...), null);
    }
}

// MapStateDescriptor 实现
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
    
    public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
        super(name, new MapSerializer<>(...), null);
    }
}

KeyedStateStore 接口

// 位于 org.apache.flink.runtime.state.KeyedStateStore

/**
 * KeyedStateStore 提供获取 Keyed State 的接口
 */
public interface KeyedStateStore {
    
    // 获取 ValueState
    <T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
    
    // 获取 ListState
    <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
    
    // 获取 MapState
    <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
    
    // 获取 ReducingState
    <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
    
    // 获取 AggregatingState
    <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
        AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
}

// DefaultKeyedStateStore 实现
public class DefaultKeyedStateStore implements KeyedStateStore {
    
    private final KeyedStateBackend<?> keyedStateBackend;
    private final ExecutionConfig executionConfig;
    
    @Override
    public <T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception {
        // 初始化序列化器
        if (stateDescriptor.getSerializer() == null) {
            stateDescriptor.initializeSerializerUnlessSet(executionConfig);
        }
        
        // 从 StateBackend 获取状态
        return keyedStateBackend.getPartitionedState(
            VoidNamespace.INSTANCE,
            VoidNamespaceSerializer.INSTANCE,
            stateDescriptor);
    }
}

KeyedStateBackend 接口

// 位于 org.apache.flink.runtime.state.KeyedStateBackend

/**
 * KeyedStateBackend 是 Keyed State 的核心接口
 */
public interface KeyedStateBackend<K> extends KeyedStateStore, Closeable {
    
    // 获取当前 Key
    K getCurrentKey();
    
    // 设置当前 Key
    void setCurrentKey(K key);
    
    // 获取分区状态
    <N, S extends State, T> S getPartitionedState(
        N namespace,
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, T> stateDescriptor) throws Exception;
    
    // 应用状态
    void applyToAllKeys(
        N namespace,
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, KV> stateDescriptor,
        KeyedStateFunction<K, S> function) throws Exception;
    
    // 创建状态快照
    RunnableFuture<KeyedStateHandle> snapshot(
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions) throws Exception;
}

HeapKeyedStateBackend 实现

// 位于 org.apache.flink.runtime.state.heap.HeapKeyedStateBackend

/**
 * HeapKeyedStateBackend 将状态存储在 JVM 堆内存
 */
public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    
    // 状态表:Key -> (Namespace -> State)
    private final Map<String, StateTable<K, N, ?>> registeredKVStates;
    
    // 状态名称到状态的映射
    private final Map<String, InternalKvState<K, N, ?>> keyValueStatesByName;
    
    // Key 序列化器
    private final TypeSerializer<K> keySerializer;
    
    // Key Group 数量
    private final int numberOfKeyGroups;
    
    // 当前 Key Group
    private KeyGroupRange keyGroupRange;
    
    @Override
    public <N, S extends State, T> S getPartitionedState(
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, T> stateDescriptor) throws Exception {
        
        // 1. 获取状态名称
        String stateName = stateDescriptor.getName();
        
        // 2. 检查是否已创建
        StateTable<K, N, ?> stateTable = registeredKVStates.get(stateName);
        
        if (stateTable == null) {
            // 3. 创建新的状态表
            stateTable = createStateTable(stateDescriptor, namespaceSerializer);
            registeredKVStates.put(stateName, stateTable);
        }
        
        // 4. 创建状态访问器
        return (S) createStateAccessor(stateTable, stateDescriptor, namespace);
    }
    
    // 创建状态表
    private <N, T> StateTable<K, N, T> createStateTable(
            StateDescriptor<?, T> stateDescriptor,
            TypeSerializer<N> namespaceSerializer) {
        
        TypeSerializer<T> stateSerializer = stateDescriptor.getSerializer();
        
        return new StateTable<>(
            keySerializer,
            namespaceSerializer,
            stateSerializer,
            numberOfKeyGroups);
    }
    
    // 创建 ValueState 访问器
    private <N, T> ValueState<T> createValueState(
            StateTable<K, N, T> stateTable,
            ValueStateDescriptor<T> stateDescriptor,
            N namespace) {
        
        return new HeapValueState<>(
            stateTable,
            stateDescriptor,
            namespace,
            stateTable.getKeySerializer(),
            stateTable.getNamespaceSerializer(),
            stateTable.getStateSerializer());
    }
}

// HeapValueState 实现
public class HeapValueState<K, N, T> 
        extends AbstractHeapState<K, N, T>
        implements InternalValueState<K, N, T> {
    
    public HeapValueState(
            StateTable<K, N, T> stateTable,
            ValueStateDescriptor<T> stateDescriptor,
            N namespace) {
        super(stateTable, stateDescriptor, namespace);
    }
    
    @Override
    public T value() {
        // 从状态表获取值
        return stateTable.get(currentKey, namespace);
    }
    
    @Override
    public void update(T value) {
        // 更新状态表
        if (value == null) {
            stateTable.remove(currentKey, namespace);
        } else {
            stateTable.put(currentKey, namespace, value);
        }
    }
    
    @Override
    public void clear() {
        stateTable.remove(currentKey, namespace);
    }
}

StateTable 实现

// 位于 org.apache.flink.runtime.state.heap.StateTable

/**
 * StateTable 是堆内存状态的核心数据结构
 * 使用嵌套 Map 实现:Key -> Namespace -> State
 */
public class StateTable<K, N, S> implements StateSnapshotRestore {
    
    // Key 序列化器
    private final TypeSerializer<K> keySerializer;
    
    // Namespace 序列化器
    private final TypeSerializer<N> namespaceSerializer;
    
    // State 序列化器
    private final TypeSerializer<S> stateSerializer;
    
    // 状态数据结构
    // 使用 Key Group 分组,支持增量快照
    private final KeyGroupedInternalMap<K, N, S> stateMap;
    
    // Key Group 数量
    private final int numberOfKeyGroups;
    
    public StateTable(
            TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            TypeSerializer<S> stateSerializer,
            int numberOfKeyGroups) {
        
        this.keySerializer = keySerializer;
        this.namespaceSerializer = namespaceSerializer;
        this.stateSerializer = stateSerializer;
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.stateMap = new KeyGroupedInternalMap<>(numberOfKeyGroups);
    }
    
    // 获取状态
    public S get(K key, N namespace) {
        return stateMap.get(key, namespace);
    }
    
    // 设置状态
    public void put(K key, N namespace, S state) {
        stateMap.put(key, namespace, state);
    }
    
    // 删除状态
    public void remove(K key, N namespace) {
        stateMap.remove(key, namespace);
    }
    
    // 获取指定 Key Group 的所有状态
    public Iterator<Tuple2<K, S>> iterator(int keyGroup) {
        return stateMap.iterator(keyGroup);
    }
    
    // 创建快照
    @Override
    public StateSnapshot createSnapshot() {
        return new StateTableSnapshot(this);
    }
}

// Key Group 分组的内部 Map
public class KeyGroupedInternalMap<K, N, S> {
    
    // 每个 Key Group 一个 Map
    private final List<Map<N, Map<K, S>>> keyGroupMaps;
    
    public KeyGroupedInternalMap(int numberOfKeyGroups) {
        this.keyGroupMaps = new ArrayList<>(numberOfKeyGroups);
        for (int i = 0; i < numberOfKeyGroups; i++) {
            keyGroupMaps.add(new HashMap<>());
        }
    }
    
    // 计算 Key 所属的 Key Group
    private int getKeyGroup(K key) {
        return Math.abs(key.hashCode()) % keyGroupMaps.size();
    }
    
    public S get(K key, N namespace) {
        int keyGroup = getKeyGroup(key);
        Map<N, Map<K, S>> namespaceMap = keyGroupMaps.get(keyGroup);
        if (namespaceMap == null) {
            return null;
        }
        Map<K, S> keyMap = namespaceMap.get(namespace);
        if (keyMap == null) {
            return null;
        }
        return keyMap.get(key);
    }
    
    public void put(K key, N namespace, S state) {
        int keyGroup = getKeyGroup(key);
        Map<N, Map<K, S>> namespaceMap = keyGroupMaps.get(keyGroup);
        if (namespaceMap == null) {
            namespaceMap = new HashMap<>();
            keyGroupMaps.set(keyGroup, namespaceMap);
        }
        Map<K, S> keyMap = namespaceMap.computeIfAbsent(namespace, k -> new HashMap<>());
        keyMap.put(key, state);
    }
}

RocksDB State Backend

RocksDB 架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        RocksDB State Backend 架构                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  RocksDBKeyedStateBackend                                               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  Column Family (每个状态类型一个)                               │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │                                                           │ │   │
│  │  │  ValueState Column Family                                 │ │   │
│  │  │  ┌─────────────────────────────────────────────────────┐ │ │   │
│  │  │  │ Key: KeyGroup + Key + Namespace                      │ │ │   │
│  │  │  │ Value: State Value (序列化后)                        │ │ │   │
│  │  │  └─────────────────────────────────────────────────────┘ │ │   │
│  │  │                                                           │ │   │
│  │  │  MapState Column Family                                   │ │   │
│  │  │  ┌─────────────────────────────────────────────────────┐ │ │   │
│  │  │  │ Key: KeyGroup + Key + Namespace + MapKey            │ │ │   │
│  │  │  │ Value: MapValue (序列化后)                           │ │ │   │
│  │  │  └─────────────────────────────────────────────────────┘ │ │   │
│  │  │                                                           │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  Write Buffer (MemTable)                                       │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │  内存中的写入缓冲区,先写入这里                             │ │   │
│  │  │  当 MemTable 满后,Flush 到 SST 文件                      │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  SST Files (磁盘)                                              │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │  Sorted String Table,有序的磁盘文件                       │ │   │
│  │  │  通过 Compaction 合并和清理                                │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  │  Block Cache                                                   │   │
│  │  ┌───────────────────────────────────────────────────────────┐ │   │
│  │  │  读取缓存,缓存热数据块                                     │ │   │
│  │  │  使用 LRU 算法淘汰                                         │ │   │
│  │  └───────────────────────────────────────────────────────────┘ │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

RocksDBKeyedStateBackend 源码

// 位于 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend

/**
 * RocksDBKeyedStateBackend 将状态存储在 RocksDB
 */
public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    
    // RocksDB 实例
    private final RocksDB db;
    
    // Column Family 描述符
    private final Map<String, ColumnFamilyHandle> columnFamilyHandles;
    
    // 默认 Column Family
    private final ColumnFamilyHandle defaultColumnFamily;
    
    // 写入选项
    private final WriteOptions writeOptions;
    
    // 读取选项
    private final ReadOptions readOptions;
    
    // Key 序列化器
    private final RocksDBKeySerializationUtils.KeySerializer<K> keySerializer;
    
    // 托管内存
    private final RocksDBMemoryController memoryController;
    
    public RocksDBKeyedStateBackend(
            String operatorIdentifier,
            ClassLoader userCodeClassLoader,
            File instanceBasePath,
            RocksDBOptions options,
            TypeSerializer<K> keySerializer,
            int numberOfKeyGroups) throws Exception {
        
        super(operatorIdentifier, userCodeClassLoader, keySerializer, numberOfKeyGroups);
        
        // 初始化 RocksDB
        this.db = openDB(instanceBasePath, options);
        
        // 初始化 Column Family
        this.columnFamilyHandles = new HashMap<>();
        this.defaultColumnFamily = db.getDefaultColumnFamily();
        
        // 初始化写入选项
        this.writeOptions = new WriteOptions()
            .setSync(false)
            .setDisableWAL(false);
        
        // 初始化读取选项
        this.readOptions = new ReadOptions();
        
        // 初始化内存控制器
        this.memoryController = new RocksDBMemoryController(options);
    }
    
    // 打开 RocksDB
    private RocksDB openDB(File basePath, RocksDBOptions options) throws RocksDBException {
        
        // 配置 Column Family
        List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(
            RocksDB.DEFAULT_COLUMN_FAMILY,
            getColumnFamilyOptions(options)));
        
        // 配置 DB 选项
        DBOptions dbOptions = new DBOptions()
            .setCreateIfMissing(true)
            .setUseFsync(false)
            .setMaxOpenFiles(options.getMaxOpenFiles());
        
        // 打开 DB
        List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
        return RocksDB.open(dbOptions, basePath.getAbsolutePath(), 
            columnFamilyDescriptors, columnFamilyHandles);
    }
    
    // 获取或创建状态
    @Override
    public <N, S extends State, T> S getPartitionedState(
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, T> stateDescriptor) throws Exception {
        
        String stateName = stateDescriptor.getName();
        
        // 获取或创建 Column Family
        ColumnFamilyHandle columnFamily = columnFamilyHandles.get(stateName);
        if (columnFamily == null) {
            columnFamily = createColumnFamily(stateName, stateDescriptor);
            columnFamilyHandles.put(stateName, columnFamily);
        }
        
        // 创建状态访问器
        return createRocksDBState(columnFamily, stateDescriptor, namespace, namespaceSerializer);
    }
    
    // 创建 Column Family
    private ColumnFamilyHandle createColumnFamily(
            String name, 
            StateDescriptor<?, ?> stateDescriptor) throws RocksDBException {
        
        ColumnFamilyDescriptor descriptor = new ColumnFamilyDescriptor(
            name.getBytes(),
            getColumnFamilyOptions(stateDescriptor));
        
        return db.createColumnFamily(descriptor);
    }
    
    // 创建 ValueState
    private <N, T> ValueState<T> createRocksDBValueState(
            ColumnFamilyHandle columnFamily,
            ValueStateDescriptor<T> stateDescriptor,
            N namespace,
            TypeSerializer<N> namespaceSerializer) {
        
        return new RocksDBValueState<>(
            db,
            columnFamily,
            writeOptions,
            readOptions,
            stateDescriptor,
            namespace,
            namespaceSerializer,
            keySerializer);
    }
}

RocksDBValueState 实现

// 位于 org.apache.flink.contrib.streaming.state.RocksDBValueState

/**
 * RocksDB 中的 ValueState 实现
 */
public class RocksDBValueState<K, N, T>
        extends AbstractRocksDBState<K, N, T>
        implements InternalValueState<K, N, T> {
    
    private final TypeSerializer<T> valueSerializer;
    
    public RocksDBValueState(
            RocksDB db,
            ColumnFamilyHandle columnFamily,
            WriteOptions writeOptions,
            ReadOptions readOptions,
            ValueStateDescriptor<T> stateDescriptor,
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            RocksDBKeySerializationUtils.KeySerializer<K> keySerializer) {
        
        super(db, columnFamily, writeOptions, readOptions, 
              stateDescriptor, namespace, namespaceSerializer, keySerializer);
        
        this.valueSerializer = stateDescriptor.getSerializer();
    }
    
    @Override
    public T value() throws IOException {
        try {
            // 1. 构造 Key
            byte[] keyBytes = serializeKey(currentKey, namespace);
            
            // 2. 从 RocksDB 读取
            byte[] valueBytes = db.get(columnFamily, readOptions, keyBytes);
            
            // 3. 反序列化
            if (valueBytes == null) {
                return stateDescriptor.getDefaultValue();
            }
            
            return valueSerializer.deserialize(new DataInputViewStreamWrapper(
                new ByteArrayInputStream(valueBytes)));
            
        } catch (RocksDBException | IOException e) {
            throw new IOException("Error while getting value from RocksDB", e);
        }
    }
    
    @Override
    public void update(T value) throws IOException {
        if (value == null) {
            clear();
            return;
        }
        
        try {
            // 1. 构造 Key
            byte[] keyBytes = serializeKey(currentKey, namespace);
            
            // 2. 序列化 Value
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
            valueSerializer.serialize(value, out);
            byte[] valueBytes = baos.toByteArray();
            
            // 3. 写入 RocksDB
            db.put(columnFamily, writeOptions, keyBytes, valueBytes);
            
        } catch (RocksDBException | IOException e) {
            throw new IOException("Error while putting value to RocksDB", e);
        }
    }
    
    @Override
    public void clear() {
        try {
            // 构造 Key 并删除
            byte[] keyBytes = serializeKey(currentKey, namespace);
            db.delete(columnFamily, writeOptions, keyBytes);
        } catch (Exception e) {
            // 忽略删除错误
        }
    }
    
    // 序列化 Key
    private byte[] serializeKey(K key, N namespace) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
        
        // 写入 Key Group
        int keyGroup = getKeyGroup(key);
        out.writeInt(keyGroup);
        
        // 写入 Key
        keySerializer.serialize(key, out);
        
        // 写入 Namespace
        namespaceSerializer.serialize(namespace, out);
        
        return baos.toByteArray();
    }
}

RocksDB 写入流程

┌─────────────────────────────────────────────────────────────────────────┐
│                        RocksDB 写入流程                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Flink State Update                                                     │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │  1. state.update(value)                                         │   │
│  │     │                                                           │   │
│  │     ▼                                                           │   │
│  │  2. 序列化 Key 和 Value                                         │   │
│  │     │  Key = KeyGroup + Key + Namespace                         │   │
│  │     │  Value = 序列化的状态值                                    │   │
│  │     │                                                           │   │
│  │     ▼                                                           │   │
│  │  3. db.put(columnFamily, key, value)                            │   │
│  │     │                                                           │   │
│  │     ▼                                                           │   │
│  │  ┌─────────────────────────────────────────────────────────────┐│   │
│  │  │                     RocksDB 内部                            ││   │
│  │  │                                                             ││   │
│  │  │  4. 写入 MemTable (内存)                                    ││   │
│  │  │     │                                                       ││   │
│  │  │     │ MemTable 满                                           ││   │
│  │  │     ▼                                                       ││   │
│  │  │  5. 切换到 Immutable MemTable                               ││   │
│  │  │     │                                                       ││   │
│  │  │     │ 后台 Flush                                            ││   │
│  │  │     ▼                                                       ││   │
│  │  │  6. Flush 到 SST 文件 (磁盘)                                ││   │
│  │  │     │                                                       ││   │
│  │  │     │ SST 文件过多                                          ││   │
│  │  │     ▼                                                       ││   │
│  │  │  7. Compaction (合并)                                       ││   │
│  │  │     • 合并多个 SST 文件                                     ││   │
│  │  │     • 清理过期/删除的数据                                    ││   │
│  │  │     • 生成新的 SST 文件                                     ││   │
│  │  │                                                             ││   │
│  │  └─────────────────────────────────────────────────────────────┘│   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  性能考量:                                                             │
│  • MemTable 写入:极快(内存)                                          │
│  • Flush:后台异步,不阻塞写入                                          │
│  • Compaction:可能影响性能,需要调优                                   │
│  • Block Cache:读取性能的关键                                          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

状态快照机制

CheckpointedFunction 接口

// 位于 org.apache.flink.api.common.state.CheckpointedFunction

/**
 * CheckpointedFunction 接口用于自定义状态快照
 */
@Public
public interface CheckpointedFunction {
    
    // 创建快照
    void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    // 初始化状态
    void initializeState(FunctionInitializationContext context) throws Exception;
}

// FunctionSnapshotContext 接口
public interface FunctionSnapshotContext {
    
    // 获取 Checkpoint ID
    long getCheckpointId();
    
    // 获取 Checkpoint 时间戳
    long getCheckpointTimestamp();
    
    // 获取 OperatorStateStore
    OperatorStateStore getOperatorStateStore();
    
    // 获取 KeyedStateStore
    KeyedStateStore getKeyedStateStore();
}

状态快照流程

// 位于 org.apache.flink.runtime.state.AbstractStreamOperator

/**
 * 算子状态快照实现
 */
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Triggerable<Object, VoidNamespace> {
    
    // 快照状态
    public OperatorSnapshotFutures snapshotState(
            long checkpointId,
            long timestamp,
            CheckpointOptions checkpointOptions,
            CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        
        OperatorSnapshotFutures snapshotFutures = new OperatorSnapshotFutures();
        
        // 1. 快照 Keyed State
        if (keyedStateBackend != null) {
            RunnableFuture<KeyedStateHandle> keyedStateFuture = 
                keyedStateBackend.snapshot(
                    checkpointId,
                    timestamp,
                    checkpointStreamFactory,
                    checkpointOptions);
            
            snapshotFutures.setKeyedStateFuture(keyedStateFuture);
        }
        
        // 2. 快照 Operator State
        if (operatorStateBackend != null) {
            RunnableFuture<OperatorStateHandle> operatorStateFuture = 
                operatorStateBackend.snapshot(
                    checkpointId,
                    timestamp,
                    checkpointStreamFactory,
                    checkpointOptions);
            
            snapshotFutures.setOperatorStateFuture(operatorStateFuture);
        }
        
        // 3. 快照 Timer
        if (internalTimeServiceManager != null) {
            snapshotFutures.setTimerFuture(
                internalTimeServiceManager.snapshot(
                    checkpointId,
                    timestamp,
                    checkpointStreamFactory));
        }
        
        return snapshotFutures;
    }
}

RocksDB 增量快照

// 位于 org.apache.flink.contrib.streaming.state.RocksIncrementalSnapshotUtils

/**
 * RocksDB 增量快照实现
 */
public class RocksIncrementalSnapshotUtils {
    
    // 创建增量快照
    public static IncrementalKeyedStateHandle createIncrementalSnapshot(
            RocksDB db,
            long checkpointId,
            long timestamp,
            Path checkpointPath,
            KeyGroupRange keyGroupRange) throws Exception {
        
        // 1. 触发 Checkpoint(RocksDB 内部)
        db.getSnapshot();
        
        // 2. 记录自上次快照以来的变化
        //    使用 RocksDB 的 Checkpoint 功能
        Checkpoint checkpoint = Checkpoint.create(db, checkpointPath.toString());
        
        // 3. 收集新增的 SST 文件
        List<StateHandle> stateHandles = new ArrayList<>();
        for (File sstFile : collectNewSstFiles(checkpointPath)) {
            // 上传到分布式存储
            StateHandle handle = uploadToDfs(sstFile, checkpointPath);
            stateHandles.add(handle);
        }
        
        // 4. 创建增量状态句柄
        return new IncrementalKeyedStateHandle(
            checkpointId,
            timestamp,
            keyGroupRange,
            stateHandles,
            db.getLatestSequenceNumber());
    }
    
    // 从增量快照恢复
    public static void restoreFromIncrementalSnapshot(
            RocksDB db,
            IncrementalKeyedStateHandle stateHandle,
            Path restorePath) throws Exception {
        
        // 1. 下载 SST 文件
        for (StateHandle handle : stateHandle.getStateHandles()) {
            downloadFromDfs(handle, restorePath);
        }
        
        // 2. RocksDB 自动加载 SST 文件
        //    (通过 Checkpoint 创建的目录)
    }
}

状态 TTL 实现

// 位于 org.apache.flink.api.common.state.StateTtlConfig

/**
 * 状态 TTL 配置
 */
public class StateTtlConfig implements Serializable {
    
    // TTL 时长
    private final Time ttl;
    
    // 更新类型
    private final UpdateType updateType;
    
    // 状态可见性
    private final StateVisibility stateVisibility;
    
    // 清理策略
    private final CleanupStrategies cleanupStrategies;
    
    public enum UpdateType {
        Disabled,              // 禁用 TTL
        OnCreateAndWrite,      // 创建和写入时更新
        OnReadAndWrite         // 读取和写入时更新
    }
    
    public enum StateVisibility {
        ReturnExpiredIfNotCleanedUp,  // 返回过期但未清理的状态
        NeverReturnExpired            // 不返回过期状态
    }
    
    public static class CleanupStrategies {
        
        // 全量快照时清理
        private final CleanupStrategy fullSnapshotCleanup;
        
        // 增量清理
        private final CleanupStrategy incrementalCleanup;
        
        // RocksDB Compaction 时清理
        private final CleanupStrategy inRocksdbCompactionFilter;
    }
}

// Heap 状态 TTL 实现
public class TtlValueState<K, N, T> implements ValueState<T> {
    
    private final ValueState<TtlValue<T>> wrappedState;
    private final TtlConfig ttlConfig;
    private final TtlTimeProvider timeProvider;
    
    @Override
    public T value() throws IOException {
        TtlValue<T> ttlValue = wrappedState.value();
        
        if (ttlValue == null) {
            return null;
        }
        
        // 检查是否过期
        if (isExpired(ttlValue)) {
            // 根据配置决定是否返回过期值
            if (ttlConfig.getStateVisibility() == StateVisibility.NeverReturnExpired) {
                return null;
            }
        }
        
        return ttlValue.getValue();
    }
    
    @Override
    public void update(T value) throws IOException {
        // 包装值并设置时间戳
        long timestamp = timeProvider.currentTimestamp();
        wrappedState.update(new TtlValue<>(value, timestamp));
    }
    
    private boolean isExpired(TtlValue<?> ttlValue) {
        long now = timeProvider.currentTimestamp();
        long expirationTime = ttlValue.getLastAccessTimestamp() + ttlConfig.getTtl().toMilliseconds();
        return now > expirationTime;
    }
}

总结

本章从源码层面深入解析了 Flink 状态管理:

概念源码位置核心机制
StateDescriptorapi.common.state.StateDescriptor状态描述与序列化
KeyedStateBackendruntime.state.KeyedStateBackend状态访问接口
StateTableruntime.state.heap.StateTable堆内存状态存储
RocksDBKeyedStateBackendcontrib.streaming.state.RocksDBKeyedStateBackendRocksDB 状态存储
StateTtlConfigapi.common.state.StateTtlConfigTTL 配置与清理

关键要点

  1. Keyed State 绑定到 Key,自动分区管理
  2. HeapStateBackend 使用 StateTable 存储状态
  3. RocksDB 使用 Column Family 隔离不同状态
  4. 增量快照减少数据传输量

参考资料

下一章预告

下一章将深入解析 容错机制,包括:

  • Checkpoint 原理
  • Barrier 对齐算法
  • 两阶段提交
分享: