Kafka核心原理(八):集群管理与运维
2020-06-21·5 分钟阅读
Kafka核心原理(八):集群管理与运维
前言
生产环境的 Kafka 集群需要精心的管理和运维。本章将深入探讨 KRaft 部署模式、集群监控、性能调优和故障排查,帮助你构建高可用、高性能的 Kafka 集群。
KRaft 模式部署
KRaft vs ZooKeeper
┌─────────────────────────────────────────────────────────────────────────┐
│ 架构对比 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ZooKeeper 模式: │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ ZooKeeper │ │ │
│ │ │ Ensemble │ │ │
│ │ │ ┌───┐ ┌───┐ ┌───┐ │ │ │
│ │ │ │ZK1│ │ZK2│ │ZK3│ │ │ │
│ │ │ └───┘ └───┘ └───┘ │ │ │
│ │ └──────────┬──────────┘ │ │
│ │ │ 元数据存储 │ │
│ │ ▼ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ Kafka Cluster │ │ │
│ │ │ ┌───┐ ┌───┐ ┌───┐ │ │ │
│ │ │ │ B1│ │ B2│ │ B3│ │ │ │
│ │ │ └───┘ └───┘ └───┘ │ │ │
│ │ └─────────────────────┘ │ │
│ │ │ │
│ │ 缺点: │ │
│ │ ├── 需要维护两套系统 │ │
│ │ ├── 元数据存储在外部系统 │ │
│ │ └── 扩展性受限 │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ KRaft 模式(Kafka 3.x): │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌───────────────────────────────────────────────────────┐ │ │
│ │ │ Kafka Cluster (KRaft) │ │ │
│ │ │ │ │ │
│ │ │ Controller Quorum Broker Nodes │ │ │
│ │ │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │ │ │
│ │ │ │C1 │ │C2 │ │C3 │ │B1 │ │B2 │ │B3 │ │ │ │
│ │ │ │(L)│ │(F)│ │(F)│ │ │ │ │ │ │ │ │ │
│ │ │ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ │ │ │
│ │ │ │ │ │ │ │
│ │ │ └──────── Raft 共识 ────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ 元数据存储:__cluster_metadata 主题 │ │ │
│ │ │ │ │ │
│ │ └────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 优点: │ │
│ │ ├── 统一架构,无需 ZooKeeper │ │
│ │ ├── 元数据存储在 Kafka 内部 │ │
│ │ ├── 更好的扩展性 │ │
│ │ └── 更快的故障恢复 │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
KRaft 集群部署
# 1. 生成集群 ID
KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
echo "Cluster ID: $KAFKA_CLUSTER_ID"
# 2. 准备配置文件 (server.properties)
# 节点 1 (Controller + Broker)
cat > config/kraft/server-1.properties << EOF
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093,2@localhost:9193,3@localhost:9293
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kraft-combined-logs-1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
EOF
# 3. 格式化存储目录
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-1.properties
# 4. 启动服务
bin/kafka-server-start.sh config/kraft/server-1.properties
# 5. 验证集群状态
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
KRaft 配置详解
# ========== 角色配置 ==========
# combined 模式:同时运行 Controller 和 Broker
process.roles=broker,controller
# 分离模式:
# Controller 节点:process.roles=controller
# Broker 节点:process.roles=broker
# ========== 节点标识 ==========
node.id=1 # 唯一节点 ID
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
# ========== 监听器配置 ==========
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://host1:9092
# ========== 存储配置 ==========
log.dirs=/data/kafka-logs
metadata.log.dir=/data/kafka-logs # 元数据日志目录
# ========== Controller 配置 ==========
controller.quorum.request.timeout.ms=2000
controller.quorum.election.timeout.ms=5000
controller.quorum.election.backoff.max.ms=10000
# ========== 性能配置 ==========
num.network.threads=3
num.io.threads=8
background.threads=10
log.flush.interval.messages=10000
log.flush.interval.ms=1000
集群监控
关键监控指标
┌─────────────────────────────────────────────────────────────────────────┐
│ Kafka 核心监控指标 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Broker 级别: │
│ ├── UnderReplicatedPartitions # 副本不足分区数 │
│ ├── OfflinePartitionsCount # 离线分区数 │
│ ├── ActiveControllerCount # 活跃 Controller 数 │
│ ├── LeaderElectionRateAndTimeMs # Leader 选举速率 │
│ ├── UncleanLeaderElectionsPerSec # 不完全选举数 │
│ ├── TotalTimeMs # 请求处理时间 │
│ ├── BytesInPerSec # 写入速率 │
│ ├── BytesOutPerSec # 读取速率 │
│ ├── MessagesInPerSec # 消息速率 │
│ └── NetworkProcessorAvgIdlePercent # 网络处理器空闲率 │
│ │
│ Topic 级别: │
│ ├── MessagesInPerSec # 消息速率 │
│ ├── BytesInPerSec # 字节速率 │
│ └── BytesOutPerSec # 读取速率 │
│ │
│ Partition 级别: │
│ ├── LogEndOffset # 日志末端位移 │
│ ├── LogStartOffset # 日志起始位移 │
│ └── UnderReplicated # 副本不足标志 │
│ │
│ Consumer Group: │
│ ├── consumer-lag # 消费延迟 │
│ ├── consumer-lead # 消费领先 │
│ ├── commit-rate # 提交速率 │
│ └── join-time # Rebalance 时间 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Prometheus + Grafana 监控
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka1:7071', 'kafka2:7071', 'kafka3:7071']
# 使用 JMX Exporter
# 启动参数添加:
KAFKA_OPTS="-javaagent:/path/to/jmx_prometheus_javaagent.jar=7071:/path/to/kafka.yml"
# kafka.yml (JMX Exporter 配置)
lowercaseOutputName: true
rules:
- pattern: "kafka.server<type=(.+), name=(.+)><>Value"
name: "kafka_server_$1_$2"
type: GAUGE
- pattern: "kafka.server<type=(.+), name=(.+)><>Count"
name: "kafka_server_$1_$2_total"
type: COUNTER
- pattern: "kafka.server<type=(.+), name=(.+), topic=(.+)><>Value"
name: "kafka_server_$1_$2"
labels:
topic: "$3"
type: GAUGE
告警规则
# Kafka 告警规则
groups:
- name: kafka-alerts
rules:
# Broker 宕机
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Broker 宕机"
description: "Broker {{ $labels.instance }} 已宕机超过 1 分钟"
# 离线分区
- alert: KafkaOfflinePartitions
expr: kafka_server_OfflinePartitionsCount > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka 存在离线分区"
description: "当前离线分区数: {{ $value }}"
# 副本不足
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 副本不足"
description: "副本不足分区数: {{ $value }}"
# 消费延迟过大
- alert: KafkaConsumerLag
expr: kafka_consumer_lag > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 消费延迟过大"
description: "消费组 {{ $labels.group }} 延迟: {{ $value }}"
# Controller 异常
- alert: KafkaMultipleControllers
expr: count(kafka_controller_ActiveControllerCount == 1) != 1
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Controller 异常"
description: "检测到多个活跃 Controller"
# 磁盘使用率
- alert: KafkaDiskUsageHigh
expr: kafka_log_Log_Size / kafka_log_Log_Size_limit > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 磁盘使用率过高"
description: "Broker {{ $labels.instance }} 磁盘使用率超过 85%"
运维操作
Topic 管理
# 创建 Topic
bin/kafka-topics.sh --create \
--topic my-topic \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--bootstrap-server localhost:9092
# 查看 Topic 详情
bin/kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# 修改 Topic 配置
bin/kafka-configs.sh --alter \
--topic my-topic \
--add-config retention.ms=1209600000 \
--bootstrap-server localhost:9092
# 增加分区(只能增加,不能减少)
bin/kafka-topics.sh --alter \
--topic my-topic \
--partitions 24 \
--bootstrap-server localhost:9092
# 删除 Topic
bin/kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092
副本重分配
# 1. 生成重分配计划
cat > topics-to-move.json << EOF
{
"topics": [
{"topic": "topic1"},
{"topic": "topic2"}
],
"version": 1
}
EOF
bin/kafka-reassign-partitions.sh \
--topics-to-move-json-file topics-to-move.json \
--broker-list "1,2,3,4,5" \
--generate \
--bootstrap-server localhost:9092
# 2. 执行重分配
cat > reassignment.json << EOF
{
"version": 1,
"partitions": [
{
"topic": "topic1",
"partition": 0,
"replicas": [1, 2, 3]
}
]
}
EOF
bin/kafka-reassign-partitions.sh \
--reassignment-json-file reassignment.json \
--execute \
--bootstrap-server localhost:9092
# 3. 验证重分配状态
bin/kafka-reassign-partitions.sh \
--reassignment-json-file reassignment.json \
--verify \
--bootstrap-server localhost:9092
消费者组管理
# 查看消费者组列表
bin/kafka-consumer-groups.sh --list \
--bootstrap-server localhost:9092
# 查看消费者组详情
bin/kafka-consumer-groups.sh --describe \
--group my-group \
--bootstrap-server localhost:9092
# 重置消费者组 Offset
bin/kafka-consumer-groups.sh --reset-offsets \
--group my-group \
--topic my-topic \
--to-earliest \
--execute \
--bootstrap-server localhost:9092
# 删除消费者组
bin/kafka-consumer-groups.sh --delete \
--group my-group \
--bootstrap-server localhost:9092
故障排查
常见问题与解决
┌─────────────────────────────────────────────────────────────────────────┐
│ 常见故障排查 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Broker 启动失败 │
│ ├── 检查日志:logs/server.log │
│ ├── 常见原因: │
│ │ ├── 端口被占用 │
│ │ ├── 日志目录权限问题 │
│ │ ├── 配置文件错误 │
│ │ └── JVM 内存不足 │
│ └── 解决:检查配置、权限、资源 │
│ │
│ 2. 分区 Leader 选举失败 │
│ ├── 检查 ISR 状态 │
│ │ bin/kafka-topics.sh --describe --topic <topic> │
│ ├── 常见原因: │
│ │ ├── ISR 为空 │
│ │ ├── 副本落后过多 │
│ │ └── unclean.leader.election.enable=false │
│ └── 解决:恢复副本同步或临时启用不完全选举 │
│ │
│ 3. 消费延迟持续增长 │
│ ├── 检查消费者状态 │
│ │ bin/kafka-consumer-groups.sh --describe --group <group> │
│ ├── 常见原因: │
│ │ ├── 消费者处理能力不足 │
│ │ ├── 消费者频繁 Rebalance │
│ │ └── 下游系统瓶颈 │
│ └── 解决:增加消费者、优化处理逻辑、扩容下游 │
│ │
│ 4. 生产者发送超时 │
│ ├── 检查网络和 Broker 状态 │
│ ├── 常见原因: │
│ │ ├── Broker 负载过高 │
│ │ ├── 网络延迟 │
│ │ └── 请求队列积压 │
│ └── 解决:增加超时时间、优化 Broker 配置 │
│ │
│ 5. 磁盘空间不足 │
│ ├── 检查日志目录大小 │
│ ├── 解决: │
│ │ ├── 调整保留策略 │
│ │ ├── 扩容磁盘 │
│ │ └── 迁移数据到其他 Broker │
│ └── 命令:du -sh /data/kafka-logs/* │
│ │
└─────────────────────────────────────────────────────────────────────────┘
诊断命令
# 查看 Broker 状态
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 查看 Controller 状态
bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
# 查看集群元数据
bin/kafka-metadata.sh snapshot \
--snapshot /tmp/metadata.log \
--command-config config/kraft/server.properties
# 检查日志段信息
bin/kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092 \
--topic-list topic1,topic2
# 查看 JVM 状态
jstat -gc <pid> 1000
jmap -histo:live <pid>
# 网络连接诊断
netstat -an | grep 9092
ss -tulpn | grep 9092
最佳实践
生产环境配置清单
# ========== 基础配置 ==========
broker.id=1
log.dirs=/data/kafka-logs
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# ========== 日志配置 ==========
num.partitions=6
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleanup.policy=delete
# ========== 副本配置 ==========
default.replication.factor=3
min.insync.replicas=2
replica.lag.time.max.ms=30000
unclean.leader.election.enable=false
auto.leader.rebalance.enable=true
# ========== 事务配置 ==========
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ========== 压缩配置 ==========
compression.type=producer
# ========== JVM 配置(启动脚本)==========
# KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
# KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
容量规划
容量规划公式:
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ 存储容量计算: │
│ 日消息量 × 平均消息大小 × 保留天数 × 副本数 × 压缩比 × 1.2(预留) │
│ │
│ 示例: │
│ 日消息量:10亿 │
│ 平均消息大小:1KB │
│ 保留天数:7天 │
│ 副本数:3 │
│ 压缩比:0.5 │
│ 预留:1.2 │
│ │
│ 总容量 = 10^9 × 1KB × 7 × 3 × 0.5 × 1.2 │
│ = 12.6 TB │
│ │
│ 吞吐量计算: │
│ 峰值 TPS = 日消息量 × 峰值系数 / 86400 │
│ │
│ 示例(峰值系数 3): │
│ 峰值 TPS = 10^9 × 3 / 86400 ≈ 35,000 TPS │
│ │
│ Broker 数量: │
│ Broker数 = 峰值TPS / 单Broker TPS × 冗余系数 │
│ = 35,000 / 100,000 × 1.3 ≈ 1(单Broker足够) │
│ 实际建议至少 3 节点保证高可用 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
小结
本章我们学习了:
- KRaft 部署:无 ZooKeeper 的新架构,部署配置详解
- 集群监控:核心指标、Prometheus+Grafana 监控方案
- 运维操作:Topic 管理、副本重分配、消费者组管理
- 故障排查:常见问题与解决方案
- 最佳实践:生产配置清单、容量规划
参考资料
下一章预告
在下一章《企业级实战案例》中,我们将探讨:
- 典型企业应用场景
- 实时数据处理架构
- 事件溯源与 CQRS
- 生产环境最佳实践
Kafka 核心原理系列持续更新中,欢迎关注!