Flink 底层原理系列(五):状态管理
2021-02-12·9 分钟阅读
前言
状态管理是 Flink 的核心特性之一。通过状态管理,Flink 能够实现精确一次语义、支持复杂的业务逻辑,并在故障恢复后继续处理。本章将从源码层面深入解析状态访问和存储的底层实现。
技术亮点
| 技术点 | 难度 | 面试价值 | 本文覆盖 |
|---|---|---|---|
| Keyed State 源码 | ⭐⭐⭐⭐ | 高频考点 | ✅ |
| Operator State | ⭐⭐⭐ | 高频考点 | ✅ |
| State Backend 架构 | ⭐⭐⭐⭐ | 高频考点 | ✅ |
| RocksDB 写入流程 | ⭐⭐⭐⭐⭐ | 进阶考点 | ✅ |
面试考点
- Keyed State 和 Operator State 的底层实现有什么区别?
- State Backend 的架构是怎样的?
- RocksDB 如何管理 Flink 状态?
- 状态快照是如何实现的?
状态类型与架构
状态类型对比
┌─────────────────────────────────────────────────────────────────────────┐
│ Flink 状态类型架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Keyed State(键控状态) │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ 特点: │ │ │
│ │ │ • 绑定到 Key,只能在 KeyedStream 上使用 │ │ │
│ │ │ • 每个 Key 有独立的状态实例 │ │ │
│ │ │ • 自动分区,随 Key 迁移 │ │ │
│ │ │ │ │ │
│ │ │ 类型: │ │ │
│ │ │ • ValueState<T>: 单值状态 │ │ │
│ │ │ • ListState<T>: 列表状态 │ │ │
│ │ │ • MapState<K, V>: 映射状态 │ │ │
│ │ │ • ReducingState<T>: 归约状态 │ │ │
│ │ │ • AggregatingState<IN, OUT>: 聚合状态 │ │ │
│ │ │ │ │ │
│ │ │ 存储位置:State Backend 决定 │ │ │
│ │ │ • HeapStateBackend: JVM 堆内存 │ │ │
│ │ │ • RocksDBStateBackend: RocksDB(堆外) │ │ │
│ │ │ │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Operator State(算子状态) │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ 特点: │ │ │
│ │ │ • 绑定到算子并行度,与 Key 无关 │ │ │
│ │ │ • 每个并行实例有独立的状态 │ │ │
│ │ │ • 需要手动管理恢复方式 │ │ │
│ │ │ │ │ │
│ │ │ 类型: │ │ │
│ │ │ • ListState<T>: 列表状态 │ │ │
│ │ │ • UnionListState<T>: 联合列表状态 │ │ │
│ │ │ • BroadcastState<K, V>: 广播状态 │ │ │
│ │ │ │ │ │
│ │ │ 恢复策略: │ │ │
│ │ │ • EVEN_DISTRIBUTION: 均匀分配 │ │ │
│ │ │ • UNION: 联合(所有实例获取全量状态) │ │ │
│ │ │ • BROADCAST: 广播(所有实例获取相同状态) │ │ │
│ │ │ │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
状态管理架构
┌─────────────────────────────────────────────────────────────────────────┐
│ 状态管理架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ StreamOperator │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ RuntimeContext │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ getKeyedStateStore() getOperatorStateStore() │ │ │
│ │ │ │ │ │ │ │
│ │ └─────────┼─────────────────────────┼────────────────────────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ KeyedStateStore │ │OperatorStateStore│ │ │
│ │ └────────┬────────┘ └────────┬────────┘ │ │
│ │ │ │ │ │
│ │ └───────────┬─────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ StateBackend │ │ │
│ │ │ │ │ │
│ │ │ • HashMapStateBE│ │ │
│ │ │ • RocksDBStateBE│ │ │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ CheckpointStorage│ │ │
│ │ │ │ │ │
│ │ │ • FileSystem │ │ │
│ │ │ • Memory │ │ │
│ │ └─────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Keyed State 底层实现
StateDescriptor 与 State 绑定
// 位于 org.apache.flink.api.common.state.StateDescriptor
/**
* StateDescriptor 是状态的描述符
* 定义状态的名称、类型、默认值等
*/
public abstract class StateDescriptor<S extends State, T> implements Serializable {
// 状态名称
protected final String name;
// 类型序列化器
protected TypeSerializer<T> serializer;
// 默认值
protected final T defaultValue;
// TTL 配置
protected StateTtlConfig ttlConfig;
protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
this.name = name;
this.serializer = serializer;
this.defaultValue = defaultValue;
}
// 获取状态名称
public String getName() {
return name;
}
// 获取序列化器
public TypeSerializer<T> getSerializer() {
return serializer;
}
// 启用 TTL
public void enableTimeToLive(StateTtlConfig ttlConfig) {
this.ttlConfig = ttlConfig;
}
}
// ValueStateDescriptor 实现
public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
public ValueStateDescriptor(String name, Class<T> typeClass) {
super(name, new TypeInformation<T>(){}.createSerializer(null), null);
}
public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, typeSerializer, null);
}
}
// ListStateDescriptor 实现
public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
public ListStateDescriptor(String name, Class<T> elementTypeClass) {
super(name, new ListSerializer<>(...), null);
}
}
// MapStateDescriptor 实现
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
super(name, new MapSerializer<>(...), null);
}
}
KeyedStateStore 接口
// 位于 org.apache.flink.runtime.state.KeyedStateStore
/**
* KeyedStateStore 提供获取 Keyed State 的接口
*/
public interface KeyedStateStore {
// 获取 ValueState
<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;
// 获取 ListState
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;
// 获取 MapState
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;
// 获取 ReducingState
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;
// 获取 AggregatingState
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;
}
// DefaultKeyedStateStore 实现
public class DefaultKeyedStateStore implements KeyedStateStore {
private final KeyedStateBackend<?> keyedStateBackend;
private final ExecutionConfig executionConfig;
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception {
// 初始化序列化器
if (stateDescriptor.getSerializer() == null) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
// 从 StateBackend 获取状态
return keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
stateDescriptor);
}
}
KeyedStateBackend 接口
// 位于 org.apache.flink.runtime.state.KeyedStateBackend
/**
* KeyedStateBackend 是 Keyed State 的核心接口
*/
public interface KeyedStateBackend<K> extends KeyedStateStore, Closeable {
// 获取当前 Key
K getCurrentKey();
// 设置当前 Key
void setCurrentKey(K key);
// 获取分区状态
<N, S extends State, T> S getPartitionedState(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor) throws Exception;
// 应用状态
void applyToAllKeys(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, KV> stateDescriptor,
KeyedStateFunction<K, S> function) throws Exception;
// 创建状态快照
RunnableFuture<KeyedStateHandle> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception;
}
HeapKeyedStateBackend 实现
// 位于 org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
/**
* HeapKeyedStateBackend 将状态存储在 JVM 堆内存
*/
public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// 状态表:Key -> (Namespace -> State)
private final Map<String, StateTable<K, N, ?>> registeredKVStates;
// 状态名称到状态的映射
private final Map<String, InternalKvState<K, N, ?>> keyValueStatesByName;
// Key 序列化器
private final TypeSerializer<K> keySerializer;
// Key Group 数量
private final int numberOfKeyGroups;
// 当前 Key Group
private KeyGroupRange keyGroupRange;
@Override
public <N, S extends State, T> S getPartitionedState(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor) throws Exception {
// 1. 获取状态名称
String stateName = stateDescriptor.getName();
// 2. 检查是否已创建
StateTable<K, N, ?> stateTable = registeredKVStates.get(stateName);
if (stateTable == null) {
// 3. 创建新的状态表
stateTable = createStateTable(stateDescriptor, namespaceSerializer);
registeredKVStates.put(stateName, stateTable);
}
// 4. 创建状态访问器
return (S) createStateAccessor(stateTable, stateDescriptor, namespace);
}
// 创建状态表
private <N, T> StateTable<K, N, T> createStateTable(
StateDescriptor<?, T> stateDescriptor,
TypeSerializer<N> namespaceSerializer) {
TypeSerializer<T> stateSerializer = stateDescriptor.getSerializer();
return new StateTable<>(
keySerializer,
namespaceSerializer,
stateSerializer,
numberOfKeyGroups);
}
// 创建 ValueState 访问器
private <N, T> ValueState<T> createValueState(
StateTable<K, N, T> stateTable,
ValueStateDescriptor<T> stateDescriptor,
N namespace) {
return new HeapValueState<>(
stateTable,
stateDescriptor,
namespace,
stateTable.getKeySerializer(),
stateTable.getNamespaceSerializer(),
stateTable.getStateSerializer());
}
}
// HeapValueState 实现
public class HeapValueState<K, N, T>
extends AbstractHeapState<K, N, T>
implements InternalValueState<K, N, T> {
public HeapValueState(
StateTable<K, N, T> stateTable,
ValueStateDescriptor<T> stateDescriptor,
N namespace) {
super(stateTable, stateDescriptor, namespace);
}
@Override
public T value() {
// 从状态表获取值
return stateTable.get(currentKey, namespace);
}
@Override
public void update(T value) {
// 更新状态表
if (value == null) {
stateTable.remove(currentKey, namespace);
} else {
stateTable.put(currentKey, namespace, value);
}
}
@Override
public void clear() {
stateTable.remove(currentKey, namespace);
}
}
StateTable 实现
// 位于 org.apache.flink.runtime.state.heap.StateTable
/**
* StateTable 是堆内存状态的核心数据结构
* 使用嵌套 Map 实现:Key -> Namespace -> State
*/
public class StateTable<K, N, S> implements StateSnapshotRestore {
// Key 序列化器
private final TypeSerializer<K> keySerializer;
// Namespace 序列化器
private final TypeSerializer<N> namespaceSerializer;
// State 序列化器
private final TypeSerializer<S> stateSerializer;
// 状态数据结构
// 使用 Key Group 分组,支持增量快照
private final KeyGroupedInternalMap<K, N, S> stateMap;
// Key Group 数量
private final int numberOfKeyGroups;
public StateTable(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<S> stateSerializer,
int numberOfKeyGroups) {
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
this.stateSerializer = stateSerializer;
this.numberOfKeyGroups = numberOfKeyGroups;
this.stateMap = new KeyGroupedInternalMap<>(numberOfKeyGroups);
}
// 获取状态
public S get(K key, N namespace) {
return stateMap.get(key, namespace);
}
// 设置状态
public void put(K key, N namespace, S state) {
stateMap.put(key, namespace, state);
}
// 删除状态
public void remove(K key, N namespace) {
stateMap.remove(key, namespace);
}
// 获取指定 Key Group 的所有状态
public Iterator<Tuple2<K, S>> iterator(int keyGroup) {
return stateMap.iterator(keyGroup);
}
// 创建快照
@Override
public StateSnapshot createSnapshot() {
return new StateTableSnapshot(this);
}
}
// Key Group 分组的内部 Map
public class KeyGroupedInternalMap<K, N, S> {
// 每个 Key Group 一个 Map
private final List<Map<N, Map<K, S>>> keyGroupMaps;
public KeyGroupedInternalMap(int numberOfKeyGroups) {
this.keyGroupMaps = new ArrayList<>(numberOfKeyGroups);
for (int i = 0; i < numberOfKeyGroups; i++) {
keyGroupMaps.add(new HashMap<>());
}
}
// 计算 Key 所属的 Key Group
private int getKeyGroup(K key) {
return Math.abs(key.hashCode()) % keyGroupMaps.size();
}
public S get(K key, N namespace) {
int keyGroup = getKeyGroup(key);
Map<N, Map<K, S>> namespaceMap = keyGroupMaps.get(keyGroup);
if (namespaceMap == null) {
return null;
}
Map<K, S> keyMap = namespaceMap.get(namespace);
if (keyMap == null) {
return null;
}
return keyMap.get(key);
}
public void put(K key, N namespace, S state) {
int keyGroup = getKeyGroup(key);
Map<N, Map<K, S>> namespaceMap = keyGroupMaps.get(keyGroup);
if (namespaceMap == null) {
namespaceMap = new HashMap<>();
keyGroupMaps.set(keyGroup, namespaceMap);
}
Map<K, S> keyMap = namespaceMap.computeIfAbsent(namespace, k -> new HashMap<>());
keyMap.put(key, state);
}
}
RocksDB State Backend
RocksDB 架构
┌─────────────────────────────────────────────────────────────────────────┐
│ RocksDB State Backend 架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ RocksDBKeyedStateBackend │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Column Family (每个状态类型一个) │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ ValueState Column Family │ │ │
│ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Key: KeyGroup + Key + Namespace │ │ │ │
│ │ │ │ Value: State Value (序列化后) │ │ │ │
│ │ │ └─────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ MapState Column Family │ │ │
│ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Key: KeyGroup + Key + Namespace + MapKey │ │ │ │
│ │ │ │ Value: MapValue (序列化后) │ │ │ │
│ │ │ └─────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Write Buffer (MemTable) │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ 内存中的写入缓冲区,先写入这里 │ │ │
│ │ │ 当 MemTable 满后,Flush 到 SST 文件 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ SST Files (磁盘) │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ Sorted String Table,有序的磁盘文件 │ │ │
│ │ │ 通过 Compaction 合并和清理 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Block Cache │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ 读取缓存,缓存热数据块 │ │ │
│ │ │ 使用 LRU 算法淘汰 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
RocksDBKeyedStateBackend 源码
// 位于 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
/**
* RocksDBKeyedStateBackend 将状态存储在 RocksDB
*/
public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// RocksDB 实例
private final RocksDB db;
// Column Family 描述符
private final Map<String, ColumnFamilyHandle> columnFamilyHandles;
// 默认 Column Family
private final ColumnFamilyHandle defaultColumnFamily;
// 写入选项
private final WriteOptions writeOptions;
// 读取选项
private final ReadOptions readOptions;
// Key 序列化器
private final RocksDBKeySerializationUtils.KeySerializer<K> keySerializer;
// 托管内存
private final RocksDBMemoryController memoryController;
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
File instanceBasePath,
RocksDBOptions options,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups) throws Exception {
super(operatorIdentifier, userCodeClassLoader, keySerializer, numberOfKeyGroups);
// 初始化 RocksDB
this.db = openDB(instanceBasePath, options);
// 初始化 Column Family
this.columnFamilyHandles = new HashMap<>();
this.defaultColumnFamily = db.getDefaultColumnFamily();
// 初始化写入选项
this.writeOptions = new WriteOptions()
.setSync(false)
.setDisableWAL(false);
// 初始化读取选项
this.readOptions = new ReadOptions();
// 初始化内存控制器
this.memoryController = new RocksDBMemoryController(options);
}
// 打开 RocksDB
private RocksDB openDB(File basePath, RocksDBOptions options) throws RocksDBException {
// 配置 Column Family
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(
RocksDB.DEFAULT_COLUMN_FAMILY,
getColumnFamilyOptions(options)));
// 配置 DB 选项
DBOptions dbOptions = new DBOptions()
.setCreateIfMissing(true)
.setUseFsync(false)
.setMaxOpenFiles(options.getMaxOpenFiles());
// 打开 DB
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
return RocksDB.open(dbOptions, basePath.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
}
// 获取或创建状态
@Override
public <N, S extends State, T> S getPartitionedState(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor) throws Exception {
String stateName = stateDescriptor.getName();
// 获取或创建 Column Family
ColumnFamilyHandle columnFamily = columnFamilyHandles.get(stateName);
if (columnFamily == null) {
columnFamily = createColumnFamily(stateName, stateDescriptor);
columnFamilyHandles.put(stateName, columnFamily);
}
// 创建状态访问器
return createRocksDBState(columnFamily, stateDescriptor, namespace, namespaceSerializer);
}
// 创建 Column Family
private ColumnFamilyHandle createColumnFamily(
String name,
StateDescriptor<?, ?> stateDescriptor) throws RocksDBException {
ColumnFamilyDescriptor descriptor = new ColumnFamilyDescriptor(
name.getBytes(),
getColumnFamilyOptions(stateDescriptor));
return db.createColumnFamily(descriptor);
}
// 创建 ValueState
private <N, T> ValueState<T> createRocksDBValueState(
ColumnFamilyHandle columnFamily,
ValueStateDescriptor<T> stateDescriptor,
N namespace,
TypeSerializer<N> namespaceSerializer) {
return new RocksDBValueState<>(
db,
columnFamily,
writeOptions,
readOptions,
stateDescriptor,
namespace,
namespaceSerializer,
keySerializer);
}
}
RocksDBValueState 实现
// 位于 org.apache.flink.contrib.streaming.state.RocksDBValueState
/**
* RocksDB 中的 ValueState 实现
*/
public class RocksDBValueState<K, N, T>
extends AbstractRocksDBState<K, N, T>
implements InternalValueState<K, N, T> {
private final TypeSerializer<T> valueSerializer;
public RocksDBValueState(
RocksDB db,
ColumnFamilyHandle columnFamily,
WriteOptions writeOptions,
ReadOptions readOptions,
ValueStateDescriptor<T> stateDescriptor,
N namespace,
TypeSerializer<N> namespaceSerializer,
RocksDBKeySerializationUtils.KeySerializer<K> keySerializer) {
super(db, columnFamily, writeOptions, readOptions,
stateDescriptor, namespace, namespaceSerializer, keySerializer);
this.valueSerializer = stateDescriptor.getSerializer();
}
@Override
public T value() throws IOException {
try {
// 1. 构造 Key
byte[] keyBytes = serializeKey(currentKey, namespace);
// 2. 从 RocksDB 读取
byte[] valueBytes = db.get(columnFamily, readOptions, keyBytes);
// 3. 反序列化
if (valueBytes == null) {
return stateDescriptor.getDefaultValue();
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(
new ByteArrayInputStream(valueBytes)));
} catch (RocksDBException | IOException e) {
throw new IOException("Error while getting value from RocksDB", e);
}
}
@Override
public void update(T value) throws IOException {
if (value == null) {
clear();
return;
}
try {
// 1. 构造 Key
byte[] keyBytes = serializeKey(currentKey, namespace);
// 2. 序列化 Value
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
valueSerializer.serialize(value, out);
byte[] valueBytes = baos.toByteArray();
// 3. 写入 RocksDB
db.put(columnFamily, writeOptions, keyBytes, valueBytes);
} catch (RocksDBException | IOException e) {
throw new IOException("Error while putting value to RocksDB", e);
}
}
@Override
public void clear() {
try {
// 构造 Key 并删除
byte[] keyBytes = serializeKey(currentKey, namespace);
db.delete(columnFamily, writeOptions, keyBytes);
} catch (Exception e) {
// 忽略删除错误
}
}
// 序列化 Key
private byte[] serializeKey(K key, N namespace) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
// 写入 Key Group
int keyGroup = getKeyGroup(key);
out.writeInt(keyGroup);
// 写入 Key
keySerializer.serialize(key, out);
// 写入 Namespace
namespaceSerializer.serialize(namespace, out);
return baos.toByteArray();
}
}
RocksDB 写入流程
┌─────────────────────────────────────────────────────────────────────────┐
│ RocksDB 写入流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Flink State Update │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. state.update(value) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 2. 序列化 Key 和 Value │ │
│ │ │ Key = KeyGroup + Key + Namespace │ │
│ │ │ Value = 序列化的状态值 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 3. db.put(columnFamily, key, value) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐│ │
│ │ │ RocksDB 内部 ││ │
│ │ │ ││ │
│ │ │ 4. 写入 MemTable (内存) ││ │
│ │ │ │ ││ │
│ │ │ │ MemTable 满 ││ │
│ │ │ ▼ ││ │
│ │ │ 5. 切换到 Immutable MemTable ││ │
│ │ │ │ ││ │
│ │ │ │ 后台 Flush ││ │
│ │ │ ▼ ││ │
│ │ │ 6. Flush 到 SST 文件 (磁盘) ││ │
│ │ │ │ ││ │
│ │ │ │ SST 文件过多 ││ │
│ │ │ ▼ ││ │
│ │ │ 7. Compaction (合并) ││ │
│ │ │ • 合并多个 SST 文件 ││ │
│ │ │ • 清理过期/删除的数据 ││ │
│ │ │ • 生成新的 SST 文件 ││ │
│ │ │ ││ │
│ │ └─────────────────────────────────────────────────────────────┘│ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 性能考量: │
│ • MemTable 写入:极快(内存) │
│ • Flush:后台异步,不阻塞写入 │
│ • Compaction:可能影响性能,需要调优 │
│ • Block Cache:读取性能的关键 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
状态快照机制
CheckpointedFunction 接口
// 位于 org.apache.flink.api.common.state.CheckpointedFunction
/**
* CheckpointedFunction 接口用于自定义状态快照
*/
@Public
public interface CheckpointedFunction {
// 创建快照
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 初始化状态
void initializeState(FunctionInitializationContext context) throws Exception;
}
// FunctionSnapshotContext 接口
public interface FunctionSnapshotContext {
// 获取 Checkpoint ID
long getCheckpointId();
// 获取 Checkpoint 时间戳
long getCheckpointTimestamp();
// 获取 OperatorStateStore
OperatorStateStore getOperatorStateStore();
// 获取 KeyedStateStore
KeyedStateStore getKeyedStateStore();
}
状态快照流程
// 位于 org.apache.flink.runtime.state.AbstractStreamOperator
/**
* 算子状态快照实现
*/
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Triggerable<Object, VoidNamespace> {
// 快照状态
public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory checkpointStreamFactory) throws Exception {
OperatorSnapshotFutures snapshotFutures = new OperatorSnapshotFutures();
// 1. 快照 Keyed State
if (keyedStateBackend != null) {
RunnableFuture<KeyedStateHandle> keyedStateFuture =
keyedStateBackend.snapshot(
checkpointId,
timestamp,
checkpointStreamFactory,
checkpointOptions);
snapshotFutures.setKeyedStateFuture(keyedStateFuture);
}
// 2. 快照 Operator State
if (operatorStateBackend != null) {
RunnableFuture<OperatorStateHandle> operatorStateFuture =
operatorStateBackend.snapshot(
checkpointId,
timestamp,
checkpointStreamFactory,
checkpointOptions);
snapshotFutures.setOperatorStateFuture(operatorStateFuture);
}
// 3. 快照 Timer
if (internalTimeServiceManager != null) {
snapshotFutures.setTimerFuture(
internalTimeServiceManager.snapshot(
checkpointId,
timestamp,
checkpointStreamFactory));
}
return snapshotFutures;
}
}
RocksDB 增量快照
// 位于 org.apache.flink.contrib.streaming.state.RocksIncrementalSnapshotUtils
/**
* RocksDB 增量快照实现
*/
public class RocksIncrementalSnapshotUtils {
// 创建增量快照
public static IncrementalKeyedStateHandle createIncrementalSnapshot(
RocksDB db,
long checkpointId,
long timestamp,
Path checkpointPath,
KeyGroupRange keyGroupRange) throws Exception {
// 1. 触发 Checkpoint(RocksDB 内部)
db.getSnapshot();
// 2. 记录自上次快照以来的变化
// 使用 RocksDB 的 Checkpoint 功能
Checkpoint checkpoint = Checkpoint.create(db, checkpointPath.toString());
// 3. 收集新增的 SST 文件
List<StateHandle> stateHandles = new ArrayList<>();
for (File sstFile : collectNewSstFiles(checkpointPath)) {
// 上传到分布式存储
StateHandle handle = uploadToDfs(sstFile, checkpointPath);
stateHandles.add(handle);
}
// 4. 创建增量状态句柄
return new IncrementalKeyedStateHandle(
checkpointId,
timestamp,
keyGroupRange,
stateHandles,
db.getLatestSequenceNumber());
}
// 从增量快照恢复
public static void restoreFromIncrementalSnapshot(
RocksDB db,
IncrementalKeyedStateHandle stateHandle,
Path restorePath) throws Exception {
// 1. 下载 SST 文件
for (StateHandle handle : stateHandle.getStateHandles()) {
downloadFromDfs(handle, restorePath);
}
// 2. RocksDB 自动加载 SST 文件
// (通过 Checkpoint 创建的目录)
}
}
状态 TTL 实现
// 位于 org.apache.flink.api.common.state.StateTtlConfig
/**
* 状态 TTL 配置
*/
public class StateTtlConfig implements Serializable {
// TTL 时长
private final Time ttl;
// 更新类型
private final UpdateType updateType;
// 状态可见性
private final StateVisibility stateVisibility;
// 清理策略
private final CleanupStrategies cleanupStrategies;
public enum UpdateType {
Disabled, // 禁用 TTL
OnCreateAndWrite, // 创建和写入时更新
OnReadAndWrite // 读取和写入时更新
}
public enum StateVisibility {
ReturnExpiredIfNotCleanedUp, // 返回过期但未清理的状态
NeverReturnExpired // 不返回过期状态
}
public static class CleanupStrategies {
// 全量快照时清理
private final CleanupStrategy fullSnapshotCleanup;
// 增量清理
private final CleanupStrategy incrementalCleanup;
// RocksDB Compaction 时清理
private final CleanupStrategy inRocksdbCompactionFilter;
}
}
// Heap 状态 TTL 实现
public class TtlValueState<K, N, T> implements ValueState<T> {
private final ValueState<TtlValue<T>> wrappedState;
private final TtlConfig ttlConfig;
private final TtlTimeProvider timeProvider;
@Override
public T value() throws IOException {
TtlValue<T> ttlValue = wrappedState.value();
if (ttlValue == null) {
return null;
}
// 检查是否过期
if (isExpired(ttlValue)) {
// 根据配置决定是否返回过期值
if (ttlConfig.getStateVisibility() == StateVisibility.NeverReturnExpired) {
return null;
}
}
return ttlValue.getValue();
}
@Override
public void update(T value) throws IOException {
// 包装值并设置时间戳
long timestamp = timeProvider.currentTimestamp();
wrappedState.update(new TtlValue<>(value, timestamp));
}
private boolean isExpired(TtlValue<?> ttlValue) {
long now = timeProvider.currentTimestamp();
long expirationTime = ttlValue.getLastAccessTimestamp() + ttlConfig.getTtl().toMilliseconds();
return now > expirationTime;
}
}
总结
本章从源码层面深入解析了 Flink 状态管理:
| 概念 | 源码位置 | 核心机制 |
|---|---|---|
| StateDescriptor | api.common.state.StateDescriptor | 状态描述与序列化 |
| KeyedStateBackend | runtime.state.KeyedStateBackend | 状态访问接口 |
| StateTable | runtime.state.heap.StateTable | 堆内存状态存储 |
| RocksDBKeyedStateBackend | contrib.streaming.state.RocksDBKeyedStateBackend | RocksDB 状态存储 |
| StateTtlConfig | api.common.state.StateTtlConfig | TTL 配置与清理 |
关键要点:
- Keyed State 绑定到 Key,自动分区管理
- HeapStateBackend 使用 StateTable 存储状态
- RocksDB 使用 Column Family 隔离不同状态
- 增量快照减少数据传输量
参考资料
下一章预告
下一章将深入解析 容错机制,包括:
- Checkpoint 原理
- Barrier 对齐算法
- 两阶段提交