Flink 底层原理系列(七):网络与反压
2021-03-04·8 分钟阅读
前言
网络通信是 Flink 分布式处理的基础,而反压机制则是保证系统稳定性的关键。理解网络缓冲区管理和 Credit-based 流控的实现原理,对于优化 Flink 作业性能至关重要。本章将从源码层面深入解析这些核心机制。
技术亮点
| 技术点 | 难度 | 面试价值 | 本文覆盖 |
|---|---|---|---|
| Network Buffer | ⭐⭐⭐⭐ | 进阶考点 | ✅ |
| Credit-based 流控 | ⭐⭐⭐⭐⭐ | 高频考点 | ✅ |
| 反压传播 | ⭐⭐⭐⭐ | 高频考点 | ✅ |
| 网络优化 | ⭐⭐⭐⭐ | 实战价值 | ✅ |
面试考点
- 什么是反压?Flink 如何实现反压?
- Credit-based 流控是如何工作的?源码实现是什么?
- Network Buffer 是如何管理的?
- 如何诊断和解决反压问题?
网络缓冲区架构
整体架构
┌─────────────────────────────────────────────────────────────────────────┐
│ 网络缓冲区架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ TaskManager │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ NetworkBufferPool │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ 全局网络缓冲池(所有 Task 共享) │ │ │
│ │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │
│ │ │ │Buffer│ │Buffer│ │Buffer│ │Buffer│ │ ... │ │ │ │
│ │ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ 申请/回收 │ │ │
│ │ └──────────────┼──────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────┴───────────┐ │ │
│ │ ▼ ▼ │ │
│ │ InputGate OutputGate │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │LocalBufferPool│ │LocalBufferPool│ │ │
│ │ │┌───────────┐│ │┌───────────┐│ │ │
│ │ ││InputChannel││ ││ResultPartition│ │ │
│ │ ││ Buffer ││ ││ Buffer ││ │ │
│ │ │└───────────┘│ │└───────────┘│ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │Netty │ │Netty │ │ │
│ │ │Client │◄──────────►│Server │ │ │
│ │ └──────────┘ └──────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
NetworkBufferPool 源码
// 位于 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
/**
* NetworkBufferPool 是 TaskManager 级别的全局缓冲池
* 所有网络缓冲区都从这里分配
*/
public class NetworkBufferPool implements BufferPoolFactory {
// 总缓冲区数量
private final int totalNumberOfMemorySegments;
// 缓冲区大小
private final int memorySegmentSize;
// 可用缓冲区队列
private final ArrayDeque<MemorySegment> availableMemorySegments;
// 统计信息
private final AtomicInteger numberOfAvailableMemorySegments;
private final AtomicInteger numberOfRegisteredBufferPools;
public NetworkBufferPool(
long maxNumberOfMemorySegmentsToAllocate,
int memorySegmentSize) {
this.totalNumberOfMemorySegments = (int) maxNumberOfMemorySegmentsToAllocate;
this.memorySegmentSize = memorySegmentSize;
this.availableMemorySegments = new ArrayDeque<>(totalNumberOfMemorySegments);
// 预分配所有 MemorySegment
for (int i = 0; i < totalNumberOfMemorySegments; i++) {
availableMemorySegments.add(MemorySegment.allocateHeapMemory(memorySegmentSize));
}
this.numberOfAvailableMemorySegments = new AtomicInteger(totalNumberOfMemorySegments);
this.numberOfRegisteredBufferPools = new AtomicInteger(0);
}
// 请求缓冲区
public MemorySegment requestMemorySegment() {
MemorySegment segment = availableMemorySegments.poll();
if (segment != null) {
numberOfAvailableMemorySegments.decrementAndGet();
}
return segment;
}
// 批量请求缓冲区
public List<MemorySegment> requestMemorySegmentsBlocking(int numberOfSegments)
throws IOException {
List<MemorySegment> segments = new ArrayList<>(numberOfSegments);
while (segments.size() < numberOfSegments) {
MemorySegment segment = requestMemorySegment();
if (segment != null) {
segments.add(segment);
} else {
// 等待缓冲区可用
waitForAvailableSegments();
}
}
return segments;
}
// 回收缓冲区
public void recycle(MemorySegment segment) {
availableMemorySegments.add(segment);
numberOfAvailableMemorySegments.incrementAndGet();
// 通知等待的请求
notifyAvailableSegments();
}
// 创建 LocalBufferPool
@Override
public BufferPool createBufferPool(
int numRequiredBuffers,
int maxUsedBuffers) {
LocalBufferPool bufferPool = new LocalBufferPool(
this,
numRequiredBuffers,
maxUsedBuffers);
numberOfRegisteredBufferPools.incrementAndGet();
// 预分配必需的缓冲区
List<MemorySegment> initialSegments =
requestMemorySegmentsBlocking(numRequiredBuffers);
bufferPool.setMemorySegments(initialSegments);
return bufferPool;
}
}
LocalBufferPool 源码
// 位于 org.apache.flink.runtime.io.network.buffer.LocalBufferPool
/**
* LocalBufferPool 是 InputGate/OutputGate 级别的本地缓冲池
* 从 NetworkBufferPool 申请缓冲区
*/
public class LocalBufferPool implements BufferPool {
// 全局缓冲池
private final NetworkBufferPool networkBufferPool;
// 必需的缓冲区数量
private final int numRequiredBuffers;
// 最大可用缓冲区数量
private final int maxBuffersPerChannel;
// 当前持有的缓冲区
private final ArrayDeque<MemorySegment> availableMemorySegments;
// 缓冲区数量
private final AtomicInteger currentNumberOfMemorySegments;
// 等待缓冲区的请求者
private final Deque<BufferAvailabilityListener> bufferAvailabilityListeners;
public LocalBufferPool(
NetworkBufferPool networkBufferPool,
int numRequiredBuffers,
int maxBuffersPerChannel) {
this.networkBufferPool = networkBufferPool;
this.numRequiredBuffers = numRequiredBuffers;
this.maxBuffersPerChannel = maxBuffersPerChannel;
this.availableMemorySegments = new ArrayDeque<>();
this.currentNumberOfMemorySegments = new AtomicInteger(0);
this.bufferAvailabilityListeners = new ArrayDeque<>();
}
// 请求缓冲区
@Override
public MemorySegment requestMemorySegment() {
MemorySegment segment = availableMemorySegments.poll();
if (segment == null) {
// 尝试从全局池申请
if (currentNumberOfMemorySegments.get() < maxBuffersPerChannel) {
segment = networkBufferPool.requestMemorySegment();
if (segment != null) {
currentNumberOfMemorySegments.incrementAndGet();
}
}
}
return segment;
}
// 请求缓冲区(阻塞)
@Override
public MemorySegment requestMemorySegmentBlocking() throws InterruptedException {
while (true) {
MemorySegment segment = requestMemorySegment();
if (segment != null) {
return segment;
}
// 等待缓冲区可用
synchronized (this) {
wait();
}
}
}
// 回收缓冲区
@Override
public void recycle(MemorySegment segment) {
if (currentNumberOfMemorySegments.get() > numRequiredBuffers) {
// 超过必需数量,归还给全局池
networkBufferPool.recycle(segment);
currentNumberOfMemorySegments.decrementAndGet();
} else {
// 放回本地池
availableMemorySegments.add(segment);
}
// 通知等待者
synchronized (this) {
notifyAll();
}
notifyBufferAvailable();
}
// 设置缓冲区数量
public void setMemorySegments(List<MemorySegment> memorySegments) {
availableMemorySegments.addAll(memorySegments);
currentNumberOfMemorySegments.set(memorySegments.size());
}
}
InputChannel 实现
InputChannel 接口
// 位于 org.apache.flink.runtime.io.network.partition.consumer.InputChannel
/**
* InputChannel 是数据输入通道的抽象
*/
public abstract class InputChannel {
// 通道 ID
protected final int channelIndex;
// 所属 InputGate
protected final SingleInputGate inputGate;
// 缓冲区队列
protected final BufferQueue bufferQueue;
// Channel 状态
protected ChannelState state = ChannelState.ACTIVE;
// 请求分区
public abstract void requestSubpartition(int subpartitionIndex) throws IOException;
// 获取下一个缓冲区
public abstract BufferOrEvent getNextBuffer() throws IOException, InterruptedException;
// 通知缓冲区可用
public abstract void notifyBufferAvailable(Buffer buffer);
// 检查是否收到 Credit
public abstract boolean isBacklogged();
// 获取未读缓冲区数量
public abstract int getNumberOfQueuedBuffers();
// 销毁通道
public abstract void releaseAllResources() throws IOException;
}
RemoteInputChannel 源码
// 位于 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
/**
* RemoteInputChannel 处理远程数据接收
* 是 Credit-based 流控的核心组件
*/
public class RemoteInputChannel extends InputChannel {
// 远程连接
private final ConnectionID connectionId;
// Netty 连接
private final PartitionRequestClient partitionRequestClient;
// 初始 Credit(缓冲区数量)
private final int initialCredit;
// 当前可用 Credit
private final AtomicInteger currentCredit;
// 未读取的 Buffer 数量
private final AtomicLong numBytesIn;
// Buffer 队列
private final Queue<Buffer> receivedBuffers;
// Credit 监听器
private final CreditListener creditListener;
// 是否发送了 Credit 通知
private volatile boolean notifyCreditAvailable = false;
public RemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
ConnectionID connectionId,
PartitionRequestClient partitionRequestClient,
int initialCredit) {
super(inputGate, channelIndex);
this.connectionId = connectionId;
this.partitionRequestClient = partitionRequestClient;
this.initialCredit = initialCredit;
this.currentCredit = new AtomicInteger(initialCredit);
this.receivedBuffers = new ConcurrentLinkedQueue<>();
this.numBytesIn = new AtomicLong(0);
}
// 请求远程分区
@Override
public void requestSubpartition(int subpartitionIndex) throws IOException {
// 发送分区请求
partitionRequestClient.requestSubpartition(
this,
subpartitionIndex,
initialCredit);
}
// 接收远程 Buffer
public void onBuffer(Buffer buffer, int sequenceNumber) {
// 检查序列号
if (expectedSequenceNumber != sequenceNumber) {
throw new IllegalStateException("Sequence number mismatch");
}
expectedSequenceNumber++;
// 减少可用 Credit
int remainingCredit = currentCredit.decrementAndGet();
if (remainingCredit < 0) {
// 超过 Credit 限制,这是错误情况
throw new IllegalStateException("Received buffer without credit");
}
// 添加到接收队列
synchronized (receivedBuffers) {
receivedBuffers.add(buffer);
numBytesIn.addAndGet(buffer.getSize());
}
// 通知 InputGate 有数据可读
inputGate.notifyChannelNonEmpty(this);
}
// 获取下一个 Buffer
@Override
public BufferOrEvent getNextBuffer() throws IOException {
Buffer buffer;
synchronized (receivedBuffers) {
buffer = receivedBuffers.poll();
}
if (buffer == null) {
return null;
}
// 更新统计信息
numBytesIn.addAndGet(-buffer.getSize());
// 检查是否需要发送 Credit 通知
// 当 Credit 低于阈值时,通知上游
checkCreditAvailable();
return new BufferOrEvent(buffer, channelIndex);
}
// 检查并通知 Credit 可用
private void checkCreditAvailable() {
// Credit 增加了(消费了 Buffer)
// 检查是否需要发送 Credit 通知
if (!notifyCreditAvailable && currentCredit.get() < initialCredit / 2) {
notifyCreditAvailable = true;
// 通知 Credit 监听器
inputGate.notifyCreditAvailable(this);
}
}
// 获取当前 Credit
public int getCredit() {
return currentCredit.get();
}
// 增加 Credit(消费 Buffer 后调用)
public int increaseCredit(int numCredits) {
return currentCredit.addAndGet(numCredits);
}
// 发送 Credit 通知给上游
public void sendCreditNotification(int numCredits) throws IOException {
partitionRequestClient.notifyCreditAvailable(
this,
numCredits);
}
// 通知 Buffer 已消费(由 InputGate 调用)
public void notifyBufferConsumed() {
// 增加 Credit
currentCredit.incrementAndGet();
checkCreditAvailable();
}
// 获取未读 Buffer 数量
@Override
public int getNumberOfQueuedBuffers() {
synchronized (receivedBuffers) {
return receivedBuffers.size();
}
}
// 是否有积压
@Override
public boolean isBacklogged() {
return getNumberOfQueuedBuffers() > 0;
}
}
ResultPartition 实现
ResultPartition 源码
// 位于 org.apache.flink.runtime.io.network.partition.ResultPartition
/**
* ResultPartition 是数据输出的核心组件
*/
public class ResultPartition {
// 分区 ID
private final ResultPartitionID partitionId;
// 分区类型
private final ResultPartitionType partitionType;
// 子分区数量
private final int numberOfSubpartitions;
// 子分区数组
private final ResultSubpartition[] subpartitions;
// Buffer 池
private BufferPool bufferPool;
// 分区状态
private volatile boolean isReleased = false;
public ResultPartition(
String owningTaskName,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
int networkBufferSize) {
this.partitionId = partitionId;
this.partitionType = partitionType;
this.numberOfSubpartitions = numberOfSubpartitions;
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
// 创建子分区
for (int i = 0; i < numberOfSubpartitions; i++) {
subpartitions[i] = new PipelinedSubpartition(i, this);
}
}
// 设置 Buffer 池
public void setBufferPool(BufferPool bufferPool) {
this.bufferPool = bufferPool;
}
// 发送 Buffer 到指定子分区
public void emit(Buffer buffer, int targetSubpartition) throws IOException {
if (isReleased) {
throw new IllegalStateException("Partition already released");
}
subpartitions[targetSubpartition].add(buffer);
}
// 广播 Buffer 到所有子分区
public void broadcast(Buffer buffer) throws IOException {
for (int i = 0; i < numberOfSubpartitions; i++) {
// 复制 Buffer(除了最后一个)
Buffer toEmit = (i < numberOfSubpartitions - 1)
? buffer.copy()
: buffer;
emit(toEmit, i);
}
}
// 广播事件
public void broadcastEvent(AbstractEvent event) throws IOException {
for (int i = 0; i < numberOfSubpartitions; i++) {
Buffer eventBuffer = EventSerializer.toBuffer(event);
emit(eventBuffer, i);
}
}
// 获取子分区
public ResultSubpartition getSubpartition(int index) {
return subpartitions[index];
}
// 请求 Buffer
public BufferBuilder getBufferBuilder() throws InterruptedException {
return bufferPool.requestBufferBuilderBlocking();
}
// 获取 Backlog(积压数量)
public int getNumberOfQueuedBuffers() {
int total = 0;
for (ResultSubpartition subpartition : subpartitions) {
total += subpartition.getNumberOfQueuedBuffers();
}
return total;
}
}
PipelinedSubpartition 源码
// 位于 org.apache.flink.runtime.io.network.partition.PipelinedSubpartition
/**
* PipelinedSubpartition 是流水线子分区实现
*/
public class PipelinedSubpartition extends ResultSubpartition {
// Buffer 队列
private final ArrayDeque<Buffer> buffers;
// Buffer 数量
private final AtomicInteger buffersInBacklog;
// 数据可用监听器
private BufferAvailabilityListener availabilityListener;
// 消费者视图
private PipelinedSubpartitionView readerView;
public PipelinedSubpartition(int index, ResultPartition parent) {
super(index, parent);
this.buffers = new ArrayDeque<>();
this.buffersInBacklog = new AtomicInteger(0);
}
// 添加 Buffer
@Override
public boolean add(Buffer buffer) {
// 检查是否已释放
if (isReleased) {
buffer.recycleBuffer();
return false;
}
// 添加到队列
synchronized (buffers) {
buffers.add(buffer);
buffersInBacklog.incrementAndGet();
}
// 通知监听器
notifyDataAvailable();
return true;
}
// 获取下一个 Buffer
public BufferAndBacklog pollBuffer() {
Buffer buffer;
synchronized (buffers) {
buffer = buffers.poll();
if (buffer != null) {
buffersInBacklog.decrementAndGet();
}
}
if (buffer == null) {
return null;
}
return new BufferAndBacklog(
buffer,
buffersInBacklog.get(), // 剩余 backlog
getNextBufferIsEvent());
}
// 获取积压数量
@Override
public int getNumberOfQueuedBuffers() {
return buffersInBacklog.get();
}
// 通知数据可用
private void notifyDataAvailable() {
if (availabilityListener != null) {
availabilityListener.notifyDataAvailable();
}
}
// 创建读取视图
public PipelinedSubpartitionView createReadView(
BufferAvailabilityListener availabilityListener) {
this.availabilityListener = availabilityListener;
this.readerView = new PipelinedSubpartitionView(this);
// 如果已有数据,立即通知
if (getNumberOfQueuedBuffers() > 0) {
notifyDataAvailable();
}
return readerView;
}
}
// Buffer 和 Backlog 的组合
public class BufferAndBacklog {
private final Buffer buffer;
private final int backlogSize;
private final boolean nextBufferIsEvent;
// ...
}
Credit-based 流控源码
流控原理
┌─────────────────────────────────────────────────────────────────────────┐
│ Credit-based 流控原理 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 核心思想:下游告知上游可发送的数据量 │
│ │
│ Sender (上游 TaskManager) Receiver (下游 TaskManager) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ │ │ │ │
│ │ ResultPartition │ │ RemoteInputChannel │ │
│ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │
│ │ │ Subpartition │ │ │ │ Buffer Queue │ │ │
│ │ │ ┌───────────┐ │ │ │ │ ┌───────────┐ │ │ │
│ │ │ │ Buffer 1 │ │ │ │ │ │ Buffer 1 │ │ │ │
│ │ │ │ Buffer 2 │ │ │ │ │ └───────────┘ │ │ │
│ │ │ │ Buffer 3 │ │ │ │ │ │ │ │
│ │ │ └───────────┘ │ │ │ │ Credit: 2 │ │ │
│ │ │ Backlog: 3 │ │ │ │ (可用缓冲区) │ │ │
│ │ └───────────────┘ │ │ └───────────────┘ │ │
│ │ │ │ │ │
│ └──────────┬──────────┘ └──────────┬──────────┘ │
│ │ │ │
│ │ 1. Request (initialCredit=2) │ │
│ │ ◄──────────────────────────────│ │
│ │ │ │
│ │ 2. Data (Buffer1) + Backlog=3 │ │
│ │ ──────────────────────────────►│ │
│ │ │ │
│ │ 3. Data (Buffer2) + Backlog=2 │ Credit 变为 0 │
│ │ ──────────────────────────────►│ │
│ │ │ │
│ │ (停止发送,等待 Credit) │ │
│ │ │ │
│ │ 4. Credit Notification (2) │ 消费 Buffer,恢复 Credit│
│ │ ◄──────────────────────────────│ │
│ │ │ │
│ │ 5. Data (Buffer3) + Backlog=1 │ │
│ │ ──────────────────────────────►│ │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
PartitionRequestClientHandler 源码
// 位于 org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler
/**
* Netty 客户端处理器
* 处理来自上游的数据和 Credit 通知
*/
public class PartitionRequestClientHandler
extends ChannelInboundHandlerAdapter {
// 输入通道映射
private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels;
// 待发送的 Credit 通知队列
private final Queue<CreditNotification> pendingCreditNotifications;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof BufferResponse) {
// 接收 Buffer 数据
BufferResponse bufferResponse = (BufferResponse) msg;
RemoteInputChannel inputChannel = inputChannels.get(
bufferResponse.getReceiverId());
if (inputChannel != null) {
// 将 Buffer 交给 InputChannel
inputChannel.onBuffer(
bufferResponse.getBuffer(),
bufferResponse.getSequenceNumber());
// 更新 Backlog
int backlog = bufferResponse.getBacklog();
inputChannel.onBacklog(backlog);
}
} else if (msg instanceof CreditRequest) {
// 上游请求 Credit 信息(通常不需要处理)
}
}
// 发送 Credit 通知
public void notifyCreditAvailable(
RemoteInputChannel inputChannel,
int numCredits) throws IOException {
// 创建 Credit 通知
CreditNotification notification = new CreditNotification(
inputChannel.getInputChannelId(),
numCredits);
// 发送到 Netty Channel
Channel channel = inputChannel.getChannel();
if (channel != null && channel.isActive()) {
channel.writeAndFlush(notification);
}
}
// 注册 InputChannel
public void registerInputChannel(RemoteInputChannel inputChannel) {
inputChannels.put(inputChannel.getInputChannelId(), inputChannel);
}
}
PartitionRequestServerHandler 源码
// 位于 org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler
/**
* Netty 服务端处理器
* 处理来自下游的请求和 Credit 通知
*/
public class PartitionRequestServerHandler
extends ChannelInboundHandlerAdapter {
// 分区提供者
private final NetworkSequenceViewReader reader;
// 当前 Backlog
private int currentBacklog = 0;
// 可用 Credit
private int availableCredit = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof PartitionRequest) {
// 处理分区请求
PartitionRequest request = (PartitionRequest) msg;
// 创建读取视图
reader = createViewReader(
request.getPartitionId(),
request.getSubpartitionIndex());
// 设置初始 Credit
availableCredit = request.getCredit();
// 开始发送数据
writeAndFlushNextMessage(ctx);
} else if (msg instanceof CreditNotification) {
// 处理 Credit 通知
CreditNotification creditNotification = (CreditNotification) msg;
// 增加 Credit
availableCredit += creditNotification.getCredit();
// 尝试发送更多数据
writeAndFlushNextMessage(ctx);
}
}
// 发送下一个消息
private void writeAndFlushNextMessage(ChannelHandlerContext ctx) throws Exception {
while (availableCredit > 0) {
// 获取下一个 Buffer
BufferAndBacklog bufferAndBacklog = reader.getNextBuffer();
if (bufferAndBacklog == null) {
// 没有更多数据
break;
}
// 减少 Credit
availableCredit--;
// 更新 Backlog
currentBacklog = bufferAndBacklog.getBacklog();
// 构造响应
BufferResponse response = new BufferResponse(
bufferAndBacklog.getBuffer(),
reader.getSequenceNumber(),
reader.getReceiverId(),
currentBacklog);
// 发送
ctx.writeAndFlush(response);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof BacklogAnnouncement) {
// Backlog 通知(用于反压检测)
BacklogAnnouncement announcement = (BacklogAnnouncement) msg;
currentBacklog = announcement.getBacklog();
}
}
}
Credit 分配策略
// 位于 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
/**
* Credit 分配和通知策略
*/
public class RemoteInputChannel extends InputChannel {
// 初始 Credit
private final int initialCredit;
// 最小 Credit(低于此值时通知上游)
private final int minCreditThreshold;
// 最大 Credit
private final int maxCredit;
// 当前 Credit
private final AtomicInteger currentCredit;
// 上次通知的 Credit
private final AtomicInteger lastAnnouncedCredit;
// 判断是否需要发送 Credit 通知
public boolean shouldAnnounceCredit() {
int current = currentCredit.get();
int last = lastAnnouncedCredit.get();
// Credit 增加了,且超过阈值
return current > last &&
current >= minCreditThreshold;
}
// 发送 Credit 通知
public void announceCredit() throws IOException {
int credit = currentCredit.get();
// 发送通知
partitionRequestClient.notifyCreditAvailable(this, credit);
// 更新上次通知的值
lastAnnouncedCredit.set(credit);
}
// 动态 Credit 调整
public int getDynamicCredit() {
// 根据消费速率动态调整 Credit
// 如果消费快,增加 Credit;消费慢,减少 Credit
double consumptionRate = getConsumptionRate();
if (consumptionRate > 0.8) {
// 消费快,增加 Credit
return Math.min(maxCredit, initialCredit * 2);
} else if (consumptionRate < 0.3) {
// 消费慢,减少 Credit
return Math.max(1, initialCredit / 2);
}
return initialCredit;
}
}
反压传播机制
反压检测
// 位于 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
/**
* 反压检测实现
*/
public class RemoteInputChannel extends InputChannel {
// 反压检测阈值
private final float backpressureThreshold = 0.7f;
// 反压状态
private volatile boolean isBackPressured = false;
// 检测反压
public boolean checkBackpressure() {
// 计算缓冲区使用率
int queuedBuffers = getNumberOfQueuedBuffers();
int availableCredit = currentCredit.get();
int totalBuffers = queuedBuffers + availableCredit;
if (totalBuffers == 0) {
return false;
}
float usage = (float) queuedBuffers / totalBuffers;
boolean wasBackPressured = isBackPressured;
isBackPressured = usage > backpressureThreshold;
// 状态变化时记录
if (isBackPressured != wasBackPressured) {
if (isBackPressured) {
// 进入反压
metrics.backpressureEnter();
} else {
// 退出反压
metrics.backpressureExit();
}
}
return isBackPressured;
}
}
反压传播流程
┌─────────────────────────────────────────────────────────────────────────┐
│ 反压传播流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Task A (上游) Task B (中游) Task C (下游, 慢) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ │ │ │ │ │ │
│ │ Output │──────────►│ Input │──────────►│ Input │ │
│ │ Buffer │ │ Buffer │ │ Buffer │ │
│ │ Pool │ │ Pool │ │ Pool │ │
│ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │
│ │ │Free │ │ │ │Free │ │ │ │Full! │ │ │
│ │ └─────┘ │ │ └─────┘ │ │ └─────┘ │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ │ │ Credit=0 │ │
│ │ │◄────────────────────│ │
│ │ │ │ │
│ │ │ 停止发送数据 │ │
│ │ │ │ │
│ │ Credit=0 │ │ │
│ │◄────────────────────│ │ │
│ │ │ │ │
│ │ 停止发送数据 │ │ │
│ │ │ │ │
│ │
│ 反压传播链: │
│ Task C 慢 → Buffer 满 → Credit=0 → Task B 停止发送 │
│ → Task B Buffer 满 → Credit=0 → Task A 停止发送 │
│ │
│ 结果:整个链路降速到最慢节点的处理速度 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
反压监控指标
// 位于 org.apache.flink.runtime.metrics.MetricNames
/**
* 反压相关指标
*/
public class MetricNames {
// 反压状态
public static final String BACKPRESSURE_STATUS = "backpressureStatus";
// 反压比率
public static final String BACKPRESSURE_RATIO = "backpressureRatio";
// 输入缓冲区使用率
public static final String INPUT_BUFFER_USAGE = "inputBufferUsage";
// 输出缓冲区使用率
public static final String OUTPUT_BUFFER_USAGE = "outputBufferUsage";
// 网络输入缓冲区数量
public static final String NUM_BUFFERS_IN_LOCAL = "numBuffersInLocal";
public static final String NUM_BUFFERS_IN_REMOTE = "numBuffersInRemote";
// 网络输出缓冲区数量
public static final String NUM_BUFFERS_OUT = "numBuffersOut";
}
// 反压监控实现
public class BackpressureMonitor {
private final MetricGroup metricGroup;
private volatile float backpressureRatio = 0.0f;
// 更新反压比率
public void updateBackpressureRatio(
int blockedChannels,
int totalChannels) {
this.backpressureRatio = (float) blockedChannels / totalChannels;
metricGroup.gauge(MetricNames.BACKPRESSURE_RATIO,
() -> backpressureRatio);
}
// 获取反压状态
public BackpressureStatus getBackpressureStatus() {
if (backpressureRatio < 0.1) {
return BackpressureStatus.OK;
} else if (backpressureRatio < 0.5) {
return BackpressureStatus.LOW;
} else {
return BackpressureStatus.HIGH;
}
}
}
public enum BackpressureStatus {
OK, // 0-10%
LOW, // 10-50%
HIGH // 50-100%
}
网络优化策略
Buffer Debloat 机制
// 位于 org.apache.flink.runtime.io.network.buffer.BufferDebloater
/**
* Buffer Debloat 自动调整缓冲区大小
* 根据实际吞吐和延迟动态调整
*/
public class BufferDebloater {
// 目标缓冲时间(毫秒)
private final long targetTotalBufferSize;
// 最小缓冲区大小
private final long minBufferSize;
// 最大缓冲区大小
private final long maxBufferSize;
// 计算新的缓冲区大小
public long calculateNewBufferSize(
long currentBufferSize,
long throughputBytesPerSecond,
long inputGateCount) {
// 根据吞吐量计算理想缓冲区大小
// 理想大小 = 吞吐量 * 目标延迟 / 1000
long idealBufferSize =
throughputBytesPerSecond * targetTotalBufferSize / 1000;
// 平均分配到每个 InputGate
long idealBufferSizePerGate =
idealBufferSize / inputGateCount;
// 限制在最小和最大范围内
long newSize = Math.max(
minBufferSize,
Math.min(maxBufferSize, idealBufferSizePerGate));
// 平滑过渡(避免剧烈变化)
return (long) (currentBufferSize * 0.9 + newSize * 0.1);
}
}
网络配置优化
# 网络缓冲区配置
# 网络内存占 Flink 内存的比例
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
# 每个 Channel 的缓冲区数量
taskmanager.network.memory.buffers-per-channel: 2
taskmanager.network.memory.floating-buffers-per-gate: 8
# Buffer Debloat(自适应调整)
taskmanager.network.memory.buffer-debloat.enabled: true
taskmanager.network.memory.buffer-debloat.period: 200ms
taskmanager.network.memory.buffer-debloat.samples: 20
taskmanager.network.memory.buffer-debloat.target: 100ms
# Netty 配置
taskmanager.network.netty.client.connectTimeout: 1min
taskmanager.network.netty.sendReceiveBufferSize: 0 # 使用系统默认
taskmanager.network.netty.transport: auto
总结
本章从源码层面深入解析了 Flink 网络与反压机制:
| 概念 | 源码位置 | 核心机制 |
|---|---|---|
| NetworkBufferPool | buffer.NetworkBufferPool | 全局缓冲池管理 |
| LocalBufferPool | buffer.LocalBufferPool | 本地缓冲池管理 |
| RemoteInputChannel | consumer.RemoteInputChannel | 远程数据接收,Credit 管理 |
| ResultPartition | partition.ResultPartition | 数据输出,子分区管理 |
| Credit-based | netty.PartitionRequestServerHandler | 流控实现 |
关键要点:
- NetworkBufferPool 是全局缓冲池,LocalBufferPool 从中申请缓冲区
- Credit-based 流控通过 Credit 通知控制发送速率
- 反压通过 Credit=0 传播到上游
- Buffer Debloat 自动调整缓冲区大小优化性能
参考资料
下一章预告
下一章将深入解析 内存管理,包括:
- MemorySegment 机制
- 内存分配器实现
- 托管内存使用