Kafka核心原理(二):消息模型与存储机制
2020-04-15·6 分钟阅读
Kafka核心原理(二):消息模型与存储机制
前言
Kafka 的消息存储是其高性能的基础。本章将深入剖析 Kafka 的消息模型、存储架构与日志段设计,理解 Kafka 如何通过精巧的存储设计实现百万级 TPS 的写入性能。
消息模型
三级消息组织结构
Kafka 采用三级结构组织消息:Topic → Partition → Message
┌─────────────────────────────────────────────────────────────────────────┐
│ Kafka 消息组织结构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Topic(主题)- 逻辑容器 │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ order-events │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Partition(分区)- 并行处理单元 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │ Partition 3 │ │
│ │ (Broker 1) │ │ (Broker 2) │ │ (Broker 3) │ │ (Broker 1) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Message(消息)- 数据单元 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Offset: 0 │ │ Offset: 0 │ │ Offset: 0 │ │ Offset: 0 │ │
│ │ Offset: 1 │ │ Offset: 1 │ │ Offset: 1 │ │ Offset: 1 │ │
│ │ Offset: 2 │ │ Offset: 2 │ │ Offset: 2 │ │ Offset: 2 │ │
│ │ ... │ │ ... │ │ ... │ │ ... │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Topic 设计原则
Topic 设计最佳实践:
├── 命名规范
│ ├── 格式:<业务域>.<实体>.<动作/事件>
│ ├── 示例:
│ │ ├── order.payment.created # 订单支付创建
│ │ ├── user.profile.updated # 用户资料更新
│ │ ├── inventory.stock.warning # 库存预警
│ │ └── system.monitoring.metrics # 系统监控指标
│ └── 注意:避免使用空格和特殊字符
│
├── 分区数量
│ ├── 考虑因素:
│ │ ├── 目标吞吐量
│ │ ├── 消费者并行度
│ │ ├── Broker 数量
│ │ └── 未来扩展空间
│ ├── 计算公式:
│ │ 分区数 = max(目标吞吐量/单分区吞吐量, 消费者数量)
│ └── 建议:生产环境建议分区数 ≤ Broker数 × 10
│
└── 保留策略
├── 基于时间:log.retention.hours=168(默认7天)
├── 基于大小:log.retention.bytes=-1(无限制)
└── 日志压缩:cleanup.policy=compact
Partition 分区机制
分区的作用
┌─────────────────────────────────────────────────────────────────┐
│ 分区核心作用 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 并行处理能力 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Topic: high-throughput-topic (100分区) │ │
│ │ │ │
│ │ 吞吐量 = 单分区吞吐量 × 分区数 │ │
│ │ = 50MB/s × 100 = 5GB/s │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 数据分布均衡 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 默认分区策略:Key.hashCode % 分区数 │ │
│ │ │ │
│ │ 相同Key的消息 → 相同分区 → 保证顺序性 │ │
│ │ 不同Key的消息 → 均匀分布 → 负载均衡 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 水平扩展能力 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 扩容前:3 Broker,100分区 │ │
│ │ 扩容后:6 Broker,200分区(新增分区) │ │
│ │ │ │
│ │ 数据自动重新分布到新节点 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
分区策略
// Kafka 内置分区策略
// 1. DefaultPartitioner(默认)
// - 有Key:按Key哈希分区
// - 无Key:Sticky分区(粘性分区)
public class DefaultPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// 使用 murmur2 哈希算法
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// 2. RoundRobinPartitioner(轮询)
// - 消息均匀分布到所有分区
public class RoundRobinPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
return nextValue(topic) % numPartitions;
}
}
// 3. UniformStickyPartitioner(统一粘性)
// - 批量发送到同一分区,提高吞吐量
public class UniformStickyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
return stickyPartitionCache.partition(topic, cluster);
}
}
自定义分区策略
// 业务场景:按地区分区,同地区订单进入同一分区
public class RegionPartitioner implements Partitioner {
private static final Map<String, Integer> REGION_MAP = Map.of(
"north", 0, // 北区 → 分区0-2
"south", 1, // 南区 → 分区3-5
"east", 2, // 东区 → 分区6-8
"west", 3 // 西区 → 分区9-11
);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partitionCount = cluster.partitionCountForTopic(topic);
int partitionsPerRegion = partitionCount / 4;
// 从消息中提取地区信息
String region = extractRegion(value);
Integer regionIndex = REGION_MAP.getOrDefault(region, 0);
// 在地区范围内随机选择分区
int startPartition = regionIndex * partitionsPerRegion;
return startPartition + ThreadLocalRandom.current().nextInt(partitionsPerRegion);
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}
消息格式
消息结构
Kafka 消息由消息头和消息体组成:
┌─────────────────────────────────────────────────────────────────────────┐
│ Kafka 消息格式(v2) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Record Batch(消息批次) │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ BASE_OFFSET (8 bytes) - 基础偏移量 │ │
│ │ BATCH_LENGTH (4 bytes) - 批次长度 │ │
│ │ PARTITION_LEADER_EPOCH (4 bytes) - 分区Leader纪元 │ │
│ │ MAGIC (1 byte) - 消息格式版本(2) │ │
│ │ CRC (4 bytes) - 校验和 │ │
│ │ ATTRIBUTES (2 bytes) - 属性标志 │ │
│ │ LAST_OFFSET_DELTA (4 bytes) - 最后一条消息偏移量增量 │ │
│ │ BASE_TIMESTAMP (8 bytes) - 基础时间戳 │ │
│ │ MAX_TIMESTAMP (8 bytes) - 最大时间戳 │ │
│ │ PRODUCER_ID (8 bytes) - 生产者ID(事务支持) │ │
│ │ PRODUCER_EPOCH (2 bytes) - 生产者纪元 │ │
│ │ BASE_SEQUENCE (4 bytes) - 基础序列号 │ │
│ │ RECORDS_COUNT (4 bytes) - 消息数量 │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Record 1 │ │ │
│ │ │ LENGTH (varint) - 消息长度 │ │ │
│ │ │ ATTRIBUTES (int8) - 属性 │ │ │
│ │ │ TIMESTAMP_DELTA (varint) - 时间戳增量 │ │ │
│ │ │ OFFSET_DELTA (varint) - 偏移量增量 │ │ │
│ │ │ KEY_LENGTH (varint) - Key长度(-1表示null) │ │ │
│ │ │ KEY (bytes) - Key内容 │ │ │
│ │ │ VALUE_LENGTH (varint) - Value长度(-1表示null) │ │ │
│ │ │ VALUE (bytes) - Value内容 │ │ │
│ │ │ HEADERS_COUNT (varint) - Header数量 │ │ │
│ │ │ HEADERS (array) - Header数组 │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Record 2 ... Record N │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
消息压缩
Kafka 支持多种压缩算法:
┌─────────────────────────────────────────────────────────────────┐
│ 压缩算法对比 │
├──────────────┬──────────┬──────────┬──────────┬────────────────┤
│ 压缩类型 │ 压缩率 │ 压缩速度 │ 解压速度 │ 适用场景 │
├──────────────┼──────────┼──────────┼──────────┼────────────────┤
│ none │ 1.0x │ 最快 │ 最快 │ 低带宽要求 │
│ gzip │ 3-4x │ 慢 │ 慢 │ 高压缩比 │
│ snappy │ 2x │ 快 │ 快 │ 平衡性能 │
│ lz4 │ 2.5x │ 最快 │ 最快 │ 高吞吐量 │
│ zstd │ 3-4x │ 中等 │ 快 │ 新场景首选 │
└──────────────┴──────────┴──────────┴──────────┴────────────────┘
生产者压缩配置:
compression.type=lz4 # 推荐 lz4 或 zstd
compression.lz4.level=9 # 压缩级别
序列化机制
// 常用序列化器
// 1. String 序列化器
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
public byte[] serialize(String topic, String data) {
return data == null ? null : data.getBytes(encoding);
}
}
// 2. JSON 序列化器(需要自定义)
public class JsonSerializer<T> implements Serializer<T> {
private ObjectMapper mapper = new ObjectMapper();
public byte[] serialize(String topic, T data) {
try {
return mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}
}
// 3. Avro 序列化器(推荐用于大数据场景)
// 优点:Schema演进、紧凑格式、跨语言支持
// 配置:
// value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
// schema.registry.url=http://localhost:8081
// 4. Protobuf 序列化器
// 优点:高性能、强类型、向后兼容
// 配置:
// value.serializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
存储架构
日志目录结构
┌─────────────────────────────────────────────────────────────────────────┐
│ Kafka 日志目录结构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ /kafka-logs/ │
│ │ │
│ ├── __consumer_offsets-0/ # 消费者组偏移量主题 │
│ │ ├── 00000000000000000000.log # 日志文件 │
│ │ ├── 00000000000000000000.index # 偏移量索引 │
│ │ ├── 00000000000000000000.timeindex # 时间戳索引 │
│ │ └── leader-epoch-checkpoint # Leader纪元检查点 │
│ │ │
│ ├── __consumer_offsets-1/ │
│ │ └── ... │
│ │ │
│ ├── order-events-0/ # 业务主题分区 │
│ │ ├── 00000000000000000000.log │
│ │ ├── 00000000000000000000.index │
│ │ ├── 00000000000000000000.timeindex │
│ │ ├── 00000000000005242880.log # 第二个日志段 │
│ │ ├── 00000000000005242880.index │
│ │ └── 00000000000005242880.timeindex │
│ │ │
│ ├── order-events-1/ │
│ │ └── ... │
│ │ │
│ ├── meta.properties # 集群元数据 │
│ └── recovery-point-offset-checkpoint # 恢复点 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
LogSegment 设计
每个 Partition 由多个 LogSegment 组成:
┌─────────────────────────────────────────────────────────────────────────┐
│ LogSegment 结构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Partition (order-events-0) │
│ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ Segment 1: 00000000000000000000 │ │
│ │ │ ├── Base Offset: 0 │ │
│ │ │ ├── Last Offset: 1048575 │ │
│ │ │ ├── Size: 1GB │ │
│ │ │ └── 状态: 已写满(只读) │ │
│ │ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ Segment 2: 000000000010485760 │ │
│ │ │ ├── Base Offset: 10485760 │ │
│ │ │ ├── Last Offset: 2097151 │ │
│ │ │ ├── Size: 1GB │ │
│ │ │ └── 状态: 已写满(只读) │ │
│ │ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ Segment 3: 00000000002097152 [Active Segment] │ │
│ │ │ ├── Base Offset: 2097152 │ │
│ │ │ ├── Last Offset: 3145728 (写入中...) │ │
│ │ │ ├── Size: 512MB (增长中...) │ │
│ │ │ └── 状态: 活跃(可读写) │ │
│ │ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 写入方向(追加写入) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
LogSegment 文件组成
每个 LogSegment 由以下文件组成:
1. .log 文件 - 实际消息数据
├── 顺序追加写入
├── 文件名 = Base Offset
└── 默认大小 1GB (log.segment.bytes)
2. .index 文件 - 偏移量索引
├── 稀疏索引(每4KB建立一条索引)
├── 记录: (相对偏移量, 物理位置)
└── 用于快速定位消息位置
3. .timeindex 文件 - 时间戳索引
├── 记录: (时间戳, 相对偏移量)
└── 用于按时间查找消息
4. .txnindex 文件 - 事务索引
├── 记录事务相关元数据
└── 用于事务隔离读取
5. leader-epoch-checkpoint - Leader纪元检查点
└── 记录 Leader Epoch 信息
索引机制
偏移量索引
┌─────────────────────────────────────────────────────────────────────────┐
│ 偏移量索引结构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ .index 文件结构(稀疏索引) │
│ ┌──────────────┬────────────────┐ │
│ │ 相对偏移量 │ 物理位置 │ │
│ ├──────────────┼────────────────┤ │
│ │ 0 │ 0 │ ← 消息0,位置0 │
│ │ 47 │ 4096 │ ← 消息47,位置4096 │
│ │ 94 │ 8192 │ ← 消息94,位置8192 │
│ │ 141 │ 12288 │ │
│ │ ... │ ... │ │
│ │ 2350 │ 204800 │ │
│ └──────────────┴────────────────┘ │
│ │
│ 查找流程: │
│ 1. 二分查找定位到目标偏移量附近的索引条目 │
│ 2. 从索引位置开始扫描 .log 文件 │
│ 3. 找到精确偏移量对应的消息 │
│ │
│ 示例:查找 offset=100 的消息 │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 1. 二分查找:找到 offset=94 的索引条目 │ │
│ │ 2. 获取物理位置:8192 │ │
│ │ 3. 从位置8192开始扫描 .log 文件 │ │
│ │ 4. 找到 offset=100 的消息(位置≈8704) │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
时间戳索引
┌─────────────────────────────────────────────────────────────────────────┐
│ 时间戳索引结构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ .timeindex 文件结构 │
│ ┌────────────────────┬────────────────┐ │
│ │ 时间戳 │ 相对偏移量 │ │
│ ├────────────────────┼────────────────┤ │
│ │ 1704067200000 │ 0 │ ← 2024-01-01 00:00:00 │
│ │ 1704067260000 │ 120 │ ← 2024-01-01 00:01:00 │
│ │ 1704067320000 │ 240 │ │
│ │ 1704067380000 │ 360 │ │
│ │ ... │ ... │ │
│ └────────────────────┴────────────────┘ │
│ │
│ 查找流程: │
│ 1. 二分查找定位到目标时间戳附近的索引条目 │
│ 2. 获取对应的相对偏移量 │
│ 3. 使用偏移量索引定位到具体消息 │
│ │
│ 应用场景: │
│ ├── 消费者从特定时间点开始消费 │
│ ├── 日志清理(基于时间删除) │
│ └── 数据恢复与回溯 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
日志清理策略
基于时间的清理
配置参数:
log.retention.hours=168 # 保留7天
log.retention.minutes=-1 # 分钟级覆盖
log.retention.ms=-1 # 毫秒级覆盖(优先级最高)
清理过程:
┌─────────────────────────────────────────────────────────────┐
│ │
│ 检查周期:log.retention.check.interval.ms=300000(5分钟) │
│ │
│ ┌─────────┐ ┌───────────────┐ ┌──────────────┐ │
│ │ 扫描日志 │ ──► │ 判断过期时间 │ ──► │ 删除过期段 │ │
│ │ 目录 │ │ lastModified │ │ │ │
│ └─────────┘ └───────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
基于大小的清理
配置参数:
log.retention.bytes=1073741824 # 每个分区保留1GB
log.segment.bytes=1073741824 # 每个段1GB
清理过程:
┌─────────────────────────────────────────────────────────────┐
│ │
│ 1. 计算分区总大小 │
│ totalSize = sum(所有段大小) │
│ │
│ 2. 判断是否超过阈值 │
│ if totalSize > log.retention.bytes: │
│ 删除最旧的段 │
│ │
│ 3. 重复直到满足大小限制 │
│ │
└─────────────────────────────────────────────────────────────┘
日志压缩(Log Compaction)
┌─────────────────────────────────────────────────────────────────────────┐
│ 日志压缩原理 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 目的:保留每个 Key 的最新值,删除历史版本 │
│ │
│ 压缩前: │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ K1:V1 | K2:V1 | K1:V2 | K3:V1 | K2:V2 | K1:V3 | K4:V1 │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ 压缩后: │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ K3:V1 | K2:V2 | K1:V3 | K4:V1 (保留每个Key的最新值) │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ 配置: │
│ cleanup.policy=compact # 启用压缩 │
│ min.cleanable.dirty.ratio=0.5 # 脏数据比例阈值 │
│ delete.retention.ms=86400000 # 删除标记保留时间 │
│ min.compaction.lag.ms=0 # 最小压缩延迟 │
│ │
│ 工作原理: │
│ 1. 后台线程定期扫描日志段 │
│ 2. 识别每个 Key 的最新值 │
│ 3. 创建新的压缩日志段 │
│ 4. 原子替换旧日志段 │
│ │
│ 适用场景: │
│ ├── __consumer_offsets(消费者偏移量) │
│ ├── 配置中心(保留最新配置) │
│ ├── 状态表(如用户状态、会话信息) │
│ └── Change Data Capture(CDC) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
存储优化实践
磁盘选择
┌─────────────────────────────────────────────────────────────────┐
│ 磁盘类型选择 │
├──────────────┬────────────────┬────────────────────────────────┤
│ 磁盘类型 │ 顺序写性能 │ 建议 │
├──────────────┼────────────────┼────────────────────────────────┤
│ HDD │ 100-200 MB/s │ 小规模、成本敏感场景 │
│ SATA SSD │ 400-500 MB/s │ 中等规模生产环境 │
│ NVMe SSD │ 2000+ MB/s │ 高吞吐量生产环境 │
└──────────────┴────────────────┴────────────────────────────────┘
关键指标:
├── 顺序写吞吐量(最重要)
├── 随机读性能(消费者回溯)
└── IOPS(索引访问)
文件系统配置
# 推荐文件系统:XFS(首选)或 ext4
# XFS 挂载选项
mount -o noatime,nodiratime,logbufs=8,logbsize=256k /dev/sdb1 /kafka-logs
# ext4 挂载选项
mount -o noatime,nodiratime,data=writeback /dev/sdb1 /kafka-logs
# 关键参数说明:
# noatime, nodiratime - 禁用访问时间更新,减少IO
# logbufs, logbsize - XFS日志缓冲区优化
# data=writeback - ext4数据写入模式优化
关键配置参数
# 日志段配置
log.segment.bytes=1073741824 # 日志段大小 1GB
log.segment.ms=604800000 # 日志段滚动周期 7天
# 索引配置
log.index.size.max.bytes=10485760 # 索引文件最大 10MB
log.index.interval.bytes=4096 # 索引间隔 4KB
# 刷盘配置(生产环境建议依赖OS页缓存)
log.flush.interval.messages=10000 # 每10000条消息刷盘
log.flush.interval.ms=1000 # 每1秒刷盘
# 文件描述符
# 需要调整系统限制:
# ulimit -n 100000
# 文件数估算 = 分区数 × (日志段数 × 3) + 缓冲
生产环境最佳实践
存储容量规划
// 容量计算公式
public class StorageCapacityCalculator {
public static long calculateRequiredStorage(
long dailyMessages, // 日消息量
int messageSize, // 平均消息大小(字节)
int replicationFactor, // 副本数
int retentionDays, // 保留天数
double compressionRatio // 压缩比
) {
long dailyDataSize = dailyMessages * messageSize;
long compressedDailySize = (long)(dailyDataSize * compressionRatio);
long totalDataSize = compressedDailySize * retentionDays;
long withReplication = totalDataSize * replicationFactor;
// 预留20%空间用于日志段、索引等
return (long)(withReplication * 1.2);
}
public static void main(String[] args) {
// 示例计算
// 日消息量:1亿
// 平均消息大小:1KB
// 副本数:3
// 保留天数:7
// 压缩比:0.5(50%压缩)
long required = calculateRequiredStorage(
100_000_000L, 1024, 3, 7, 0.5
);
System.out.println("所需存储容量: " + (required / 1024 / 1024 / 1024) + " GB");
// 输出:所需存储容量: 1260 GB
}
}
监控指标
关键监控指标:
├── 日志段大小
│ └── kafka.log.Log:size
│
├── 日志段数量
│ └── kafka.log.Log:numSegments
│
├── 索引大小
│ └── kafka.log.Log:indexSize
│
├── 清理延迟
│ └── kafka.log.Log:cleanerLag
│
├── 刷盘时间
│ └── kafka.log.Log:flushTimeMs
│
└── 磁盘使用率
└── 系统级监控(df -h)
小结
本章我们学习了:
- 消息模型:Topic-Partition-Message 三级结构
- 分区机制:分区策略、自定义分区器
- 消息格式:Record Batch 结构、压缩算法
- 存储架构:日志段设计、索引机制
- 清理策略:基于时间、基于大小、日志压缩
参考资料
下一章预告
在下一章《生产者核心原理》中,我们将深入探讨:
- 生产者发送流程与架构
- 消息分区策略详解
- ACK 机制与可靠性保证
- 批量发送与性能优化
Kafka 核心原理系列持续更新中,欢迎关注!