← 返回文章列表

Flink 底层原理系列(七):网络与反压

2021-03-04·8 分钟阅读

前言

网络通信是 Flink 分布式处理的基础,而反压机制则是保证系统稳定性的关键。理解网络缓冲区管理和 Credit-based 流控的实现原理,对于优化 Flink 作业性能至关重要。本章将从源码层面深入解析这些核心机制。

技术亮点

技术点难度面试价值本文覆盖
Network Buffer⭐⭐⭐⭐进阶考点
Credit-based 流控⭐⭐⭐⭐⭐高频考点
反压传播⭐⭐⭐⭐高频考点
网络优化⭐⭐⭐⭐实战价值

面试考点

  1. 什么是反压?Flink 如何实现反压?
  2. Credit-based 流控是如何工作的?源码实现是什么?
  3. Network Buffer 是如何管理的?
  4. 如何诊断和解决反压问题?

网络缓冲区架构

整体架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        网络缓冲区架构                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  TaskManager                                                            │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                                                                 │   │
│  │                  NetworkBufferPool                              │   │
│  │  ┌─────────────────────────────────────────────────────────┐   │   │
│  │  │  全局网络缓冲池(所有 Task 共享)                        │   │   │
│  │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐              │   │   │
│  │  │  │Buffer│ │Buffer│ │Buffer│ │Buffer│ │ ... │              │   │   │
│  │  │  └─────┘ └─────┘ └─────┘ └─────┘ └─────┘              │   │   │
│  │  │              │                                          │   │   │
│  │  │              ▼ 申请/回收                                │   │   │
│  │  └──────────────┼──────────────────────────────────────────┘   │   │
│  │                 │                                               │   │
│  │     ┌───────────┴───────────┐                                  │   │
│  │     ▼                       ▼                                  │   │
│  │  InputGate               OutputGate                            │   │
│  │  ┌─────────────┐        ┌─────────────┐                       │   │
│  │  │LocalBufferPool│       │LocalBufferPool│                       │   │
│  │  │┌───────────┐│        │┌───────────┐│                       │   │
│  │  ││InputChannel││        ││ResultPartition│                     │   │
│  │  ││  Buffer   ││        ││  Buffer   ││                       │   │
│  │  │└───────────┘│        │└───────────┘│                       │   │
│  │  └─────────────┘        └─────────────┘                       │   │
│  │         │                      │                               │   │
│  │         ▼                      ▼                               │   │
│  │  ┌──────────┐            ┌──────────┐                         │   │
│  │  │Netty     │            │Netty     │                         │   │
│  │  │Client    │◄──────────►│Server    │                         │   │
│  │  └──────────┘            └──────────┘                         │   │
│  │                                                                 │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

NetworkBufferPool 源码

// 位于 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool

/**
 * NetworkBufferPool 是 TaskManager 级别的全局缓冲池
 * 所有网络缓冲区都从这里分配
 */
public class NetworkBufferPool implements BufferPoolFactory {
    
    // 总缓冲区数量
    private final int totalNumberOfMemorySegments;
    
    // 缓冲区大小
    private final int memorySegmentSize;
    
    // 可用缓冲区队列
    private final ArrayDeque<MemorySegment> availableMemorySegments;
    
    // 统计信息
    private final AtomicInteger numberOfAvailableMemorySegments;
    private final AtomicInteger numberOfRegisteredBufferPools;
    
    public NetworkBufferPool(
            long maxNumberOfMemorySegmentsToAllocate,
            int memorySegmentSize) {
        
        this.totalNumberOfMemorySegments = (int) maxNumberOfMemorySegmentsToAllocate;
        this.memorySegmentSize = memorySegmentSize;
        this.availableMemorySegments = new ArrayDeque<>(totalNumberOfMemorySegments);
        
        // 预分配所有 MemorySegment
        for (int i = 0; i < totalNumberOfMemorySegments; i++) {
            availableMemorySegments.add(MemorySegment.allocateHeapMemory(memorySegmentSize));
        }
        
        this.numberOfAvailableMemorySegments = new AtomicInteger(totalNumberOfMemorySegments);
        this.numberOfRegisteredBufferPools = new AtomicInteger(0);
    }
    
    // 请求缓冲区
    public MemorySegment requestMemorySegment() {
        MemorySegment segment = availableMemorySegments.poll();
        if (segment != null) {
            numberOfAvailableMemorySegments.decrementAndGet();
        }
        return segment;
    }
    
    // 批量请求缓冲区
    public List<MemorySegment> requestMemorySegmentsBlocking(int numberOfSegments) 
            throws IOException {
        
        List<MemorySegment> segments = new ArrayList<>(numberOfSegments);
        
        while (segments.size() < numberOfSegments) {
            MemorySegment segment = requestMemorySegment();
            if (segment != null) {
                segments.add(segment);
            } else {
                // 等待缓冲区可用
                waitForAvailableSegments();
            }
        }
        
        return segments;
    }
    
    // 回收缓冲区
    public void recycle(MemorySegment segment) {
        availableMemorySegments.add(segment);
        numberOfAvailableMemorySegments.incrementAndGet();
        
        // 通知等待的请求
        notifyAvailableSegments();
    }
    
    // 创建 LocalBufferPool
    @Override
    public BufferPool createBufferPool(
            int numRequiredBuffers,
            int maxUsedBuffers) {
        
        LocalBufferPool bufferPool = new LocalBufferPool(
            this,
            numRequiredBuffers,
            maxUsedBuffers);
        
        numberOfRegisteredBufferPools.incrementAndGet();
        
        // 预分配必需的缓冲区
        List<MemorySegment> initialSegments = 
            requestMemorySegmentsBlocking(numRequiredBuffers);
        bufferPool.setMemorySegments(initialSegments);
        
        return bufferPool;
    }
}

LocalBufferPool 源码

// 位于 org.apache.flink.runtime.io.network.buffer.LocalBufferPool

/**
 * LocalBufferPool 是 InputGate/OutputGate 级别的本地缓冲池
 * 从 NetworkBufferPool 申请缓冲区
 */
public class LocalBufferPool implements BufferPool {
    
    // 全局缓冲池
    private final NetworkBufferPool networkBufferPool;
    
    // 必需的缓冲区数量
    private final int numRequiredBuffers;
    
    // 最大可用缓冲区数量
    private final int maxBuffersPerChannel;
    
    // 当前持有的缓冲区
    private final ArrayDeque<MemorySegment> availableMemorySegments;
    
    // 缓冲区数量
    private final AtomicInteger currentNumberOfMemorySegments;
    
    // 等待缓冲区的请求者
    private final Deque<BufferAvailabilityListener> bufferAvailabilityListeners;
    
    public LocalBufferPool(
            NetworkBufferPool networkBufferPool,
            int numRequiredBuffers,
            int maxBuffersPerChannel) {
        
        this.networkBufferPool = networkBufferPool;
        this.numRequiredBuffers = numRequiredBuffers;
        this.maxBuffersPerChannel = maxBuffersPerChannel;
        this.availableMemorySegments = new ArrayDeque<>();
        this.currentNumberOfMemorySegments = new AtomicInteger(0);
        this.bufferAvailabilityListeners = new ArrayDeque<>();
    }
    
    // 请求缓冲区
    @Override
    public MemorySegment requestMemorySegment() {
        MemorySegment segment = availableMemorySegments.poll();
        
        if (segment == null) {
            // 尝试从全局池申请
            if (currentNumberOfMemorySegments.get() < maxBuffersPerChannel) {
                segment = networkBufferPool.requestMemorySegment();
                if (segment != null) {
                    currentNumberOfMemorySegments.incrementAndGet();
                }
            }
        }
        
        return segment;
    }
    
    // 请求缓冲区(阻塞)
    @Override
    public MemorySegment requestMemorySegmentBlocking() throws InterruptedException {
        while (true) {
            MemorySegment segment = requestMemorySegment();
            if (segment != null) {
                return segment;
            }
            
            // 等待缓冲区可用
            synchronized (this) {
                wait();
            }
        }
    }
    
    // 回收缓冲区
    @Override
    public void recycle(MemorySegment segment) {
        if (currentNumberOfMemorySegments.get() > numRequiredBuffers) {
            // 超过必需数量,归还给全局池
            networkBufferPool.recycle(segment);
            currentNumberOfMemorySegments.decrementAndGet();
        } else {
            // 放回本地池
            availableMemorySegments.add(segment);
        }
        
        // 通知等待者
        synchronized (this) {
            notifyAll();
        }
        notifyBufferAvailable();
    }
    
    // 设置缓冲区数量
    public void setMemorySegments(List<MemorySegment> memorySegments) {
        availableMemorySegments.addAll(memorySegments);
        currentNumberOfMemorySegments.set(memorySegments.size());
    }
}

InputChannel 实现

InputChannel 接口

// 位于 org.apache.flink.runtime.io.network.partition.consumer.InputChannel

/**
 * InputChannel 是数据输入通道的抽象
 */
public abstract class InputChannel {
    
    // 通道 ID
    protected final int channelIndex;
    
    // 所属 InputGate
    protected final SingleInputGate inputGate;
    
    // 缓冲区队列
    protected final BufferQueue bufferQueue;
    
    // Channel 状态
    protected ChannelState state = ChannelState.ACTIVE;
    
    // 请求分区
    public abstract void requestSubpartition(int subpartitionIndex) throws IOException;
    
    // 获取下一个缓冲区
    public abstract BufferOrEvent getNextBuffer() throws IOException, InterruptedException;
    
    // 通知缓冲区可用
    public abstract void notifyBufferAvailable(Buffer buffer);
    
    // 检查是否收到 Credit
    public abstract boolean isBacklogged();
    
    // 获取未读缓冲区数量
    public abstract int getNumberOfQueuedBuffers();
    
    // 销毁通道
    public abstract void releaseAllResources() throws IOException;
}

RemoteInputChannel 源码

// 位于 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel

/**
 * RemoteInputChannel 处理远程数据接收
 * 是 Credit-based 流控的核心组件
 */
public class RemoteInputChannel extends InputChannel {
    
    // 远程连接
    private final ConnectionID connectionId;
    
    // Netty 连接
    private final PartitionRequestClient partitionRequestClient;
    
    // 初始 Credit(缓冲区数量)
    private final int initialCredit;
    
    // 当前可用 Credit
    private final AtomicInteger currentCredit;
    
    // 未读取的 Buffer 数量
    private final AtomicLong numBytesIn;
    
    // Buffer 队列
    private final Queue<Buffer> receivedBuffers;
    
    // Credit 监听器
    private final CreditListener creditListener;
    
    // 是否发送了 Credit 通知
    private volatile boolean notifyCreditAvailable = false;
    
    public RemoteInputChannel(
            SingleInputGate inputGate,
            int channelIndex,
            ConnectionID connectionId,
            PartitionRequestClient partitionRequestClient,
            int initialCredit) {
        
        super(inputGate, channelIndex);
        this.connectionId = connectionId;
        this.partitionRequestClient = partitionRequestClient;
        this.initialCredit = initialCredit;
        this.currentCredit = new AtomicInteger(initialCredit);
        this.receivedBuffers = new ConcurrentLinkedQueue<>();
        this.numBytesIn = new AtomicLong(0);
    }
    
    // 请求远程分区
    @Override
    public void requestSubpartition(int subpartitionIndex) throws IOException {
        // 发送分区请求
        partitionRequestClient.requestSubpartition(
            this,
            subpartitionIndex,
            initialCredit);
    }
    
    // 接收远程 Buffer
    public void onBuffer(Buffer buffer, int sequenceNumber) {
        // 检查序列号
        if (expectedSequenceNumber != sequenceNumber) {
            throw new IllegalStateException("Sequence number mismatch");
        }
        expectedSequenceNumber++;
        
        // 减少可用 Credit
        int remainingCredit = currentCredit.decrementAndGet();
        
        if (remainingCredit < 0) {
            // 超过 Credit 限制,这是错误情况
            throw new IllegalStateException("Received buffer without credit");
        }
        
        // 添加到接收队列
        synchronized (receivedBuffers) {
            receivedBuffers.add(buffer);
            numBytesIn.addAndGet(buffer.getSize());
        }
        
        // 通知 InputGate 有数据可读
        inputGate.notifyChannelNonEmpty(this);
    }
    
    // 获取下一个 Buffer
    @Override
    public BufferOrEvent getNextBuffer() throws IOException {
        Buffer buffer;
        synchronized (receivedBuffers) {
            buffer = receivedBuffers.poll();
        }
        
        if (buffer == null) {
            return null;
        }
        
        // 更新统计信息
        numBytesIn.addAndGet(-buffer.getSize());
        
        // 检查是否需要发送 Credit 通知
        // 当 Credit 低于阈值时,通知上游
        checkCreditAvailable();
        
        return new BufferOrEvent(buffer, channelIndex);
    }
    
    // 检查并通知 Credit 可用
    private void checkCreditAvailable() {
        // Credit 增加了(消费了 Buffer)
        // 检查是否需要发送 Credit 通知
        if (!notifyCreditAvailable && currentCredit.get() < initialCredit / 2) {
            notifyCreditAvailable = true;
            // 通知 Credit 监听器
            inputGate.notifyCreditAvailable(this);
        }
    }
    
    // 获取当前 Credit
    public int getCredit() {
        return currentCredit.get();
    }
    
    // 增加 Credit(消费 Buffer 后调用)
    public int increaseCredit(int numCredits) {
        return currentCredit.addAndGet(numCredits);
    }
    
    // 发送 Credit 通知给上游
    public void sendCreditNotification(int numCredits) throws IOException {
        partitionRequestClient.notifyCreditAvailable(
            this,
            numCredits);
    }
    
    // 通知 Buffer 已消费(由 InputGate 调用)
    public void notifyBufferConsumed() {
        // 增加 Credit
        currentCredit.incrementAndGet();
        checkCreditAvailable();
    }
    
    // 获取未读 Buffer 数量
    @Override
    public int getNumberOfQueuedBuffers() {
        synchronized (receivedBuffers) {
            return receivedBuffers.size();
        }
    }
    
    // 是否有积压
    @Override
    public boolean isBacklogged() {
        return getNumberOfQueuedBuffers() > 0;
    }
}

ResultPartition 实现

ResultPartition 源码

// 位于 org.apache.flink.runtime.io.network.partition.ResultPartition

/**
 * ResultPartition 是数据输出的核心组件
 */
public class ResultPartition {
    
    // 分区 ID
    private final ResultPartitionID partitionId;
    
    // 分区类型
    private final ResultPartitionType partitionType;
    
    // 子分区数量
    private final int numberOfSubpartitions;
    
    // 子分区数组
    private final ResultSubpartition[] subpartitions;
    
    // Buffer 池
    private BufferPool bufferPool;
    
    // 分区状态
    private volatile boolean isReleased = false;
    
    public ResultPartition(
            String owningTaskName,
            ResultPartitionID partitionId,
            ResultPartitionType partitionType,
            int numberOfSubpartitions,
            int networkBufferSize) {
        
        this.partitionId = partitionId;
        this.partitionType = partitionType;
        this.numberOfSubpartitions = numberOfSubpartitions;
        this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
        
        // 创建子分区
        for (int i = 0; i < numberOfSubpartitions; i++) {
            subpartitions[i] = new PipelinedSubpartition(i, this);
        }
    }
    
    // 设置 Buffer 池
    public void setBufferPool(BufferPool bufferPool) {
        this.bufferPool = bufferPool;
    }
    
    // 发送 Buffer 到指定子分区
    public void emit(Buffer buffer, int targetSubpartition) throws IOException {
        if (isReleased) {
            throw new IllegalStateException("Partition already released");
        }
        
        subpartitions[targetSubpartition].add(buffer);
    }
    
    // 广播 Buffer 到所有子分区
    public void broadcast(Buffer buffer) throws IOException {
        for (int i = 0; i < numberOfSubpartitions; i++) {
            // 复制 Buffer(除了最后一个)
            Buffer toEmit = (i < numberOfSubpartitions - 1) 
                ? buffer.copy() 
                : buffer;
            emit(toEmit, i);
        }
    }
    
    // 广播事件
    public void broadcastEvent(AbstractEvent event) throws IOException {
        for (int i = 0; i < numberOfSubpartitions; i++) {
            Buffer eventBuffer = EventSerializer.toBuffer(event);
            emit(eventBuffer, i);
        }
    }
    
    // 获取子分区
    public ResultSubpartition getSubpartition(int index) {
        return subpartitions[index];
    }
    
    // 请求 Buffer
    public BufferBuilder getBufferBuilder() throws InterruptedException {
        return bufferPool.requestBufferBuilderBlocking();
    }
    
    // 获取 Backlog(积压数量)
    public int getNumberOfQueuedBuffers() {
        int total = 0;
        for (ResultSubpartition subpartition : subpartitions) {
            total += subpartition.getNumberOfQueuedBuffers();
        }
        return total;
    }
}

PipelinedSubpartition 源码

// 位于 org.apache.flink.runtime.io.network.partition.PipelinedSubpartition

/**
 * PipelinedSubpartition 是流水线子分区实现
 */
public class PipelinedSubpartition extends ResultSubpartition {
    
    // Buffer 队列
    private final ArrayDeque<Buffer> buffers;
    
    // Buffer 数量
    private final AtomicInteger buffersInBacklog;
    
    // 数据可用监听器
    private BufferAvailabilityListener availabilityListener;
    
    // 消费者视图
    private PipelinedSubpartitionView readerView;
    
    public PipelinedSubpartition(int index, ResultPartition parent) {
        super(index, parent);
        this.buffers = new ArrayDeque<>();
        this.buffersInBacklog = new AtomicInteger(0);
    }
    
    // 添加 Buffer
    @Override
    public boolean add(Buffer buffer) {
        // 检查是否已释放
        if (isReleased) {
            buffer.recycleBuffer();
            return false;
        }
        
        // 添加到队列
        synchronized (buffers) {
            buffers.add(buffer);
            buffersInBacklog.incrementAndGet();
        }
        
        // 通知监听器
        notifyDataAvailable();
        
        return true;
    }
    
    // 获取下一个 Buffer
    public BufferAndBacklog pollBuffer() {
        Buffer buffer;
        synchronized (buffers) {
            buffer = buffers.poll();
            if (buffer != null) {
                buffersInBacklog.decrementAndGet();
            }
        }
        
        if (buffer == null) {
            return null;
        }
        
        return new BufferAndBacklog(
            buffer,
            buffersInBacklog.get(),  // 剩余 backlog
            getNextBufferIsEvent());
    }
    
    // 获取积压数量
    @Override
    public int getNumberOfQueuedBuffers() {
        return buffersInBacklog.get();
    }
    
    // 通知数据可用
    private void notifyDataAvailable() {
        if (availabilityListener != null) {
            availabilityListener.notifyDataAvailable();
        }
    }
    
    // 创建读取视图
    public PipelinedSubpartitionView createReadView(
            BufferAvailabilityListener availabilityListener) {
        
        this.availabilityListener = availabilityListener;
        this.readerView = new PipelinedSubpartitionView(this);
        
        // 如果已有数据,立即通知
        if (getNumberOfQueuedBuffers() > 0) {
            notifyDataAvailable();
        }
        
        return readerView;
    }
}

// Buffer 和 Backlog 的组合
public class BufferAndBacklog {
    private final Buffer buffer;
    private final int backlogSize;
    private final boolean nextBufferIsEvent;
    
    // ...
}

Credit-based 流控源码

流控原理

┌─────────────────────────────────────────────────────────────────────────┐
│                        Credit-based 流控原理                             │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  核心思想:下游告知上游可发送的数据量                                    │
│                                                                         │
│  Sender (上游 TaskManager)         Receiver (下游 TaskManager)         │
│  ┌─────────────────────┐          ┌─────────────────────┐             │
│  │                     │          │                     │             │
│  │  ResultPartition    │          │  RemoteInputChannel │             │
│  │  ┌───────────────┐  │          │  ┌───────────────┐  │             │
│  │  │ Subpartition  │  │          │  │ Buffer Queue  │  │             │
│  │  │ ┌───────────┐ │  │          │  │ ┌───────────┐ │  │             │
│  │  │ │ Buffer 1  │ │  │          │  │ │ Buffer 1  │ │  │             │
│  │  │ │ Buffer 2  │ │  │          │  │ └───────────┘ │  │             │
│  │  │ │ Buffer 3  │ │  │          │  │               │  │             │
│  │  │ └───────────┘ │  │          │  │ Credit: 2     │  │             │
│  │  │ Backlog: 3    │  │          │  │ (可用缓冲区)  │  │             │
│  │  └───────────────┘  │          │  └───────────────┘  │             │
│  │                     │          │                     │             │
│  └──────────┬──────────┘          └──────────┬──────────┘             │
│             │                                │                         │
│             │  1. Request (initialCredit=2)  │                         │
│             │ ◄──────────────────────────────│                         │
│             │                                │                         │
│             │  2. Data (Buffer1) + Backlog=3 │                         │
│             │ ──────────────────────────────►│                         │
│             │                                │                         │
│             │  3. Data (Buffer2) + Backlog=2 │  Credit 变为 0          │
│             │ ──────────────────────────────►│                         │
│             │                                │                         │
│             │        (停止发送,等待 Credit) │                         │
│             │                                │                         │
│             │  4. Credit Notification (2)    │  消费 Buffer,恢复 Credit│
│             │ ◄──────────────────────────────│                         │
│             │                                │                         │
│             │  5. Data (Buffer3) + Backlog=1 │                         │
│             │ ──────────────────────────────►│                         │
│             │                                │                         │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

PartitionRequestClientHandler 源码

// 位于 org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler

/**
 * Netty 客户端处理器
 * 处理来自上游的数据和 Credit 通知
 */
public class PartitionRequestClientHandler 
        extends ChannelInboundHandlerAdapter {
    
    // 输入通道映射
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels;
    
    // 待发送的 Credit 通知队列
    private final Queue<CreditNotification> pendingCreditNotifications;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if (msg instanceof BufferResponse) {
            // 接收 Buffer 数据
            BufferResponse bufferResponse = (BufferResponse) msg;
            
            RemoteInputChannel inputChannel = inputChannels.get(
                bufferResponse.getReceiverId());
            
            if (inputChannel != null) {
                // 将 Buffer 交给 InputChannel
                inputChannel.onBuffer(
                    bufferResponse.getBuffer(),
                    bufferResponse.getSequenceNumber());
                
                // 更新 Backlog
                int backlog = bufferResponse.getBacklog();
                inputChannel.onBacklog(backlog);
            }
        } else if (msg instanceof CreditRequest) {
            // 上游请求 Credit 信息(通常不需要处理)
        }
    }
    
    // 发送 Credit 通知
    public void notifyCreditAvailable(
            RemoteInputChannel inputChannel, 
            int numCredits) throws IOException {
        
        // 创建 Credit 通知
        CreditNotification notification = new CreditNotification(
            inputChannel.getInputChannelId(),
            numCredits);
        
        // 发送到 Netty Channel
        Channel channel = inputChannel.getChannel();
        if (channel != null && channel.isActive()) {
            channel.writeAndFlush(notification);
        }
    }
    
    // 注册 InputChannel
    public void registerInputChannel(RemoteInputChannel inputChannel) {
        inputChannels.put(inputChannel.getInputChannelId(), inputChannel);
    }
}

PartitionRequestServerHandler 源码

// 位于 org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler

/**
 * Netty 服务端处理器
 * 处理来自下游的请求和 Credit 通知
 */
public class PartitionRequestServerHandler 
        extends ChannelInboundHandlerAdapter {
    
    // 分区提供者
    private final NetworkSequenceViewReader reader;
    
    // 当前 Backlog
    private int currentBacklog = 0;
    
    // 可用 Credit
    private int availableCredit = 0;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if (msg instanceof PartitionRequest) {
            // 处理分区请求
            PartitionRequest request = (PartitionRequest) msg;
            
            // 创建读取视图
            reader = createViewReader(
                request.getPartitionId(),
                request.getSubpartitionIndex());
            
            // 设置初始 Credit
            availableCredit = request.getCredit();
            
            // 开始发送数据
            writeAndFlushNextMessage(ctx);
            
        } else if (msg instanceof CreditNotification) {
            // 处理 Credit 通知
            CreditNotification creditNotification = (CreditNotification) msg;
            
            // 增加 Credit
            availableCredit += creditNotification.getCredit();
            
            // 尝试发送更多数据
            writeAndFlushNextMessage(ctx);
        }
    }
    
    // 发送下一个消息
    private void writeAndFlushNextMessage(ChannelHandlerContext ctx) throws Exception {
        
        while (availableCredit > 0) {
            // 获取下一个 Buffer
            BufferAndBacklog bufferAndBacklog = reader.getNextBuffer();
            
            if (bufferAndBacklog == null) {
                // 没有更多数据
                break;
            }
            
            // 减少 Credit
            availableCredit--;
            
            // 更新 Backlog
            currentBacklog = bufferAndBacklog.getBacklog();
            
            // 构造响应
            BufferResponse response = new BufferResponse(
                bufferAndBacklog.getBuffer(),
                reader.getSequenceNumber(),
                reader.getReceiverId(),
                currentBacklog);
            
            // 发送
            ctx.writeAndFlush(response);
        }
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof BacklogAnnouncement) {
            // Backlog 通知(用于反压检测)
            BacklogAnnouncement announcement = (BacklogAnnouncement) msg;
            currentBacklog = announcement.getBacklog();
        }
    }
}

Credit 分配策略

// 位于 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel

/**
 * Credit 分配和通知策略
 */
public class RemoteInputChannel extends InputChannel {
    
    // 初始 Credit
    private final int initialCredit;
    
    // 最小 Credit(低于此值时通知上游)
    private final int minCreditThreshold;
    
    // 最大 Credit
    private final int maxCredit;
    
    // 当前 Credit
    private final AtomicInteger currentCredit;
    
    // 上次通知的 Credit
    private final AtomicInteger lastAnnouncedCredit;
    
    // 判断是否需要发送 Credit 通知
    public boolean shouldAnnounceCredit() {
        int current = currentCredit.get();
        int last = lastAnnouncedCredit.get();
        
        // Credit 增加了,且超过阈值
        return current > last && 
               current >= minCreditThreshold;
    }
    
    // 发送 Credit 通知
    public void announceCredit() throws IOException {
        int credit = currentCredit.get();
        
        // 发送通知
        partitionRequestClient.notifyCreditAvailable(this, credit);
        
        // 更新上次通知的值
        lastAnnouncedCredit.set(credit);
    }
    
    // 动态 Credit 调整
    public int getDynamicCredit() {
        // 根据消费速率动态调整 Credit
        // 如果消费快,增加 Credit;消费慢,减少 Credit
        double consumptionRate = getConsumptionRate();
        
        if (consumptionRate > 0.8) {
            // 消费快,增加 Credit
            return Math.min(maxCredit, initialCredit * 2);
        } else if (consumptionRate < 0.3) {
            // 消费慢,减少 Credit
            return Math.max(1, initialCredit / 2);
        }
        
        return initialCredit;
    }
}

反压传播机制

反压检测

// 位于 org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel

/**
 * 反压检测实现
 */
public class RemoteInputChannel extends InputChannel {
    
    // 反压检测阈值
    private final float backpressureThreshold = 0.7f;
    
    // 反压状态
    private volatile boolean isBackPressured = false;
    
    // 检测反压
    public boolean checkBackpressure() {
        // 计算缓冲区使用率
        int queuedBuffers = getNumberOfQueuedBuffers();
        int availableCredit = currentCredit.get();
        int totalBuffers = queuedBuffers + availableCredit;
        
        if (totalBuffers == 0) {
            return false;
        }
        
        float usage = (float) queuedBuffers / totalBuffers;
        
        boolean wasBackPressured = isBackPressured;
        isBackPressured = usage > backpressureThreshold;
        
        // 状态变化时记录
        if (isBackPressured != wasBackPressured) {
            if (isBackPressured) {
                // 进入反压
                metrics.backpressureEnter();
            } else {
                // 退出反压
                metrics.backpressureExit();
            }
        }
        
        return isBackPressured;
    }
}

反压传播流程

┌─────────────────────────────────────────────────────────────────────────┐
│                        反压传播流程                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Task A (上游)          Task B (中游)         Task C (下游, 慢)         │
│  ┌─────────┐           ┌─────────┐           ┌─────────┐              │
│  │         │           │         │           │         │              │
│  │ Output  │──────────►│ Input   │──────────►│ Input   │              │
│  │ Buffer  │           │ Buffer  │           │ Buffer  │              │
│  │ Pool    │           │ Pool    │           │ Pool    │              │
│  │ ┌─────┐ │           │ ┌─────┐ │           │ ┌─────┐ │              │
│  │ │Free │ │           │ │Free │ │           │ │Full! │ │              │
│  │ └─────┘ │           │ └─────┘ │           │ └─────┘ │              │
│  └─────────┘           └─────────┘           └─────────┘              │
│       │                     │                     │                    │
│       │                     │    Credit=0         │                    │
│       │                     │◄────────────────────│                    │
│       │                     │                     │                    │
│       │                     │ 停止发送数据        │                    │
│       │                     │                     │                    │
│       │    Credit=0         │                     │                    │
│       │◄────────────────────│                     │                    │
│       │                     │                     │                    │
│       │ 停止发送数据        │                     │                    │
│       │                     │                     │                    │
│                                                                         │
│  反压传播链:                                                           │
│  Task C 慢 → Buffer 满 → Credit=0 → Task B 停止发送                    │
│           → Task B Buffer 满 → Credit=0 → Task A 停止发送              │
│                                                                         │
│  结果:整个链路降速到最慢节点的处理速度                                  │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

反压监控指标

// 位于 org.apache.flink.runtime.metrics.MetricNames

/**
 * 反压相关指标
 */
public class MetricNames {
    
    // 反压状态
    public static final String BACKPRESSURE_STATUS = "backpressureStatus";
    
    // 反压比率
    public static final String BACKPRESSURE_RATIO = "backpressureRatio";
    
    // 输入缓冲区使用率
    public static final String INPUT_BUFFER_USAGE = "inputBufferUsage";
    
    // 输出缓冲区使用率
    public static final String OUTPUT_BUFFER_USAGE = "outputBufferUsage";
    
    // 网络输入缓冲区数量
    public static final String NUM_BUFFERS_IN_LOCAL = "numBuffersInLocal";
    public static final String NUM_BUFFERS_IN_REMOTE = "numBuffersInRemote";
    
    // 网络输出缓冲区数量
    public static final String NUM_BUFFERS_OUT = "numBuffersOut";
}

// 反压监控实现
public class BackpressureMonitor {
    
    private final MetricGroup metricGroup;
    private volatile float backpressureRatio = 0.0f;
    
    // 更新反压比率
    public void updateBackpressureRatio(
            int blockedChannels, 
            int totalChannels) {
        
        this.backpressureRatio = (float) blockedChannels / totalChannels;
        metricGroup.gauge(MetricNames.BACKPRESSURE_RATIO, 
            () -> backpressureRatio);
    }
    
    // 获取反压状态
    public BackpressureStatus getBackpressureStatus() {
        if (backpressureRatio < 0.1) {
            return BackpressureStatus.OK;
        } else if (backpressureRatio < 0.5) {
            return BackpressureStatus.LOW;
        } else {
            return BackpressureStatus.HIGH;
        }
    }
}

public enum BackpressureStatus {
    OK,      // 0-10%
    LOW,     // 10-50%
    HIGH     // 50-100%
}

网络优化策略

Buffer Debloat 机制

// 位于 org.apache.flink.runtime.io.network.buffer.BufferDebloater

/**
 * Buffer Debloat 自动调整缓冲区大小
 * 根据实际吞吐和延迟动态调整
 */
public class BufferDebloater {
    
    // 目标缓冲时间(毫秒)
    private final long targetTotalBufferSize;
    
    // 最小缓冲区大小
    private final long minBufferSize;
    
    // 最大缓冲区大小
    private final long maxBufferSize;
    
    // 计算新的缓冲区大小
    public long calculateNewBufferSize(
            long currentBufferSize,
            long throughputBytesPerSecond,
            long inputGateCount) {
        
        // 根据吞吐量计算理想缓冲区大小
        // 理想大小 = 吞吐量 * 目标延迟 / 1000
        long idealBufferSize = 
            throughputBytesPerSecond * targetTotalBufferSize / 1000;
        
        // 平均分配到每个 InputGate
        long idealBufferSizePerGate = 
            idealBufferSize / inputGateCount;
        
        // 限制在最小和最大范围内
        long newSize = Math.max(
            minBufferSize,
            Math.min(maxBufferSize, idealBufferSizePerGate));
        
        // 平滑过渡(避免剧烈变化)
        return (long) (currentBufferSize * 0.9 + newSize * 0.1);
    }
}

网络配置优化

# 网络缓冲区配置
# 网络内存占 Flink 内存的比例
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb

# 每个 Channel 的缓冲区数量
taskmanager.network.memory.buffers-per-channel: 2
taskmanager.network.memory.floating-buffers-per-gate: 8

# Buffer Debloat(自适应调整)
taskmanager.network.memory.buffer-debloat.enabled: true
taskmanager.network.memory.buffer-debloat.period: 200ms
taskmanager.network.memory.buffer-debloat.samples: 20
taskmanager.network.memory.buffer-debloat.target: 100ms

# Netty 配置
taskmanager.network.netty.client.connectTimeout: 1min
taskmanager.network.netty.sendReceiveBufferSize: 0  # 使用系统默认
taskmanager.network.netty.transport: auto

总结

本章从源码层面深入解析了 Flink 网络与反压机制:

概念源码位置核心机制
NetworkBufferPoolbuffer.NetworkBufferPool全局缓冲池管理
LocalBufferPoolbuffer.LocalBufferPool本地缓冲池管理
RemoteInputChannelconsumer.RemoteInputChannel远程数据接收,Credit 管理
ResultPartitionpartition.ResultPartition数据输出,子分区管理
Credit-basednetty.PartitionRequestServerHandler流控实现

关键要点

  1. NetworkBufferPool 是全局缓冲池,LocalBufferPool 从中申请缓冲区
  2. Credit-based 流控通过 Credit 通知控制发送速率
  3. 反压通过 Credit=0 传播到上游
  4. Buffer Debloat 自动调整缓冲区大小优化性能

参考资料

下一章预告

下一章将深入解析 内存管理,包括:

  • MemorySegment 机制
  • 内存分配器实现
  • 托管内存使用
分享: