← 返回文章列表

Kafka核心原理(八):集群管理与运维

2020-06-21·5 分钟阅读

Kafka核心原理(八):集群管理与运维

前言

生产环境的 Kafka 集群需要精心的管理和运维。本章将深入探讨 KRaft 部署模式、集群监控、性能调优和故障排查,帮助你构建高可用、高性能的 Kafka 集群。

KRaft 模式部署

KRaft vs ZooKeeper

┌─────────────────────────────────────────────────────────────────────────┐
│                    架构对比                                              │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ZooKeeper 模式:                                                        │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │                                                                 │    │
│  │    ┌─────────────────────┐                                     │    │
│  │    │     ZooKeeper       │                                     │    │
│  │    │     Ensemble        │                                     │    │
│  │    │  ┌───┐ ┌───┐ ┌───┐ │                                     │    │
│  │    │  │ZK1│ │ZK2│ │ZK3│ │                                     │    │
│  │    │  └───┘ └───┘ └───┘ │                                     │    │
│  │    └──────────┬──────────┘                                     │    │
│  │               │ 元数据存储                                      │    │
│  │               ▼                                                │    │
│  │    ┌─────────────────────┐                                     │    │
│  │    │    Kafka Cluster    │                                     │    │
│  │    │  ┌───┐ ┌───┐ ┌───┐ │                                     │    │
│  │    │  │ B1│ │ B2│ │ B3│ │                                     │    │
│  │    │  └───┘ └───┘ └───┘ │                                     │    │
│  │    └─────────────────────┘                                     │    │
│  │                                                                 │    │
│  │  缺点:                                                         │    │
│  │  ├── 需要维护两套系统                                          │    │
│  │  ├── 元数据存储在外部系统                                      │    │
│  │  └── 扩展性受限                                                │    │
│  │                                                                 │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                                                                         │
│  KRaft 模式(Kafka 3.x):                                              │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │                                                                 │    │
│  │    ┌───────────────────────────────────────────────────────┐  │    │
│  │    │               Kafka Cluster (KRaft)                   │  │    │
│  │    │                                                        │  │    │
│  │    │   Controller Quorum        Broker Nodes               │  │    │
│  │    │   ┌───┐ ┌───┐ ┌───┐     ┌───┐ ┌───┐ ┌───┐           │  │    │
│  │    │   │C1 │ │C2 │ │C3 │     │B1 │ │B2 │ │B3 │           │  │    │
│  │    │   │(L)│ │(F)│ │(F)│     │   │ │   │ │   │           │  │    │
│  │    │   └───┘ └───┘ └───┘     └───┘ └───┘ └───┘           │  │    │
│  │    │        │                                        │    │  │    │
│  │    │        └──────── Raft 共识 ────────────────────┘    │  │    │
│  │    │                                                        │  │    │
│  │    │   元数据存储:__cluster_metadata 主题                  │  │    │
│  │    │                                                        │  │    │
│  │    └────────────────────────────────────────────────────────┘  │    │
│  │                                                                 │    │
│  │  优点:                                                         │    │
│  │  ├── 统一架构,无需 ZooKeeper                                   │    │
│  │  ├── 元数据存储在 Kafka 内部                                   │    │
│  │  ├── 更好的扩展性                                              │    │
│  │  └── 更快的故障恢复                                            │    │
│  │                                                                 │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

KRaft 集群部署

# 1. 生成集群 ID
KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
echo "Cluster ID: $KAFKA_CLUSTER_ID"

# 2. 准备配置文件 (server.properties)
# 节点 1 (Controller + Broker)
cat > config/kraft/server-1.properties << EOF
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093,2@localhost:9193,3@localhost:9293
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kraft-combined-logs-1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
EOF

# 3. 格式化存储目录
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server-1.properties

# 4. 启动服务
bin/kafka-server-start.sh config/kraft/server-1.properties

# 5. 验证集群状态
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

KRaft 配置详解

# ========== 角色配置 ==========
# combined 模式:同时运行 Controller 和 Broker
process.roles=broker,controller

# 分离模式:
# Controller 节点:process.roles=controller
# Broker 节点:process.roles=broker

# ========== 节点标识 ==========
node.id=1                              # 唯一节点 ID
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093

# ========== 监听器配置 ==========
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
controller.listener.names=CONTROLLER
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://host1:9092

# ========== 存储配置 ==========
log.dirs=/data/kafka-logs
metadata.log.dir=/data/kafka-logs    # 元数据日志目录

# ========== Controller 配置 ==========
controller.quorum.request.timeout.ms=2000
controller.quorum.election.timeout.ms=5000
controller.quorum.election.backoff.max.ms=10000

# ========== 性能配置 ==========
num.network.threads=3
num.io.threads=8
background.threads=10
log.flush.interval.messages=10000
log.flush.interval.ms=1000

集群监控

关键监控指标

┌─────────────────────────────────────────────────────────────────────────┐
│                    Kafka 核心监控指标                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Broker 级别:                                                           │
│  ├── UnderReplicatedPartitions          # 副本不足分区数               │
│  ├── OfflinePartitionsCount             # 离线分区数                   │
│  ├── ActiveControllerCount              # 活跃 Controller 数           │
│  ├── LeaderElectionRateAndTimeMs        # Leader 选举速率              │
│  ├── UncleanLeaderElectionsPerSec       # 不完全选举数                 │
│  ├── TotalTimeMs                        # 请求处理时间                 │
│  ├── BytesInPerSec                      # 写入速率                     │
│  ├── BytesOutPerSec                     # 读取速率                     │
│  ├── MessagesInPerSec                   # 消息速率                     │
│  └── NetworkProcessorAvgIdlePercent     # 网络处理器空闲率             │
│                                                                         │
│  Topic 级别:                                                            │
│  ├── MessagesInPerSec                   # 消息速率                     │
│  ├── BytesInPerSec                      # 字节速率                     │
│  └── BytesOutPerSec                     # 读取速率                     │
│                                                                         │
│  Partition 级别:                                                        │
│  ├── LogEndOffset                       # 日志末端位移                 │
│  ├── LogStartOffset                     # 日志起始位移                 │
│  └── UnderReplicated                    # 副本不足标志                 │
│                                                                         │
│  Consumer Group:                                                        │
│  ├── consumer-lag                       # 消费延迟                     │
│  ├── consumer-lead                      # 消费领先                     │
│  ├── commit-rate                        # 提交速率                     │
│  └── join-time                          # Rebalance 时间               │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Prometheus + Grafana 监控

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka1:7071', 'kafka2:7071', 'kafka3:7071']

# 使用 JMX Exporter
# 启动参数添加:
KAFKA_OPTS="-javaagent:/path/to/jmx_prometheus_javaagent.jar=7071:/path/to/kafka.yml"

# kafka.yml (JMX Exporter 配置)
lowercaseOutputName: true
rules:
  - pattern: "kafka.server<type=(.+), name=(.+)><>Value"
    name: "kafka_server_$1_$2"
    type: GAUGE
  - pattern: "kafka.server<type=(.+), name=(.+)><>Count"
    name: "kafka_server_$1_$2_total"
    type: COUNTER
  - pattern: "kafka.server<type=(.+), name=(.+), topic=(.+)><>Value"
    name: "kafka_server_$1_$2"
    labels:
      topic: "$3"
    type: GAUGE

告警规则

# Kafka 告警规则
groups:
  - name: kafka-alerts
    rules:
      # Broker 宕机
      - alert: KafkaBrokerDown
        expr: up{job="kafka"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka Broker 宕机"
          description: "Broker {{ $labels.instance }} 已宕机超过 1 分钟"

      # 离线分区
      - alert: KafkaOfflinePartitions
        expr: kafka_server_OfflinePartitionsCount > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka 存在离线分区"
          description: "当前离线分区数: {{ $value }}"

      # 副本不足
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_UnderReplicatedPartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 副本不足"
          description: "副本不足分区数: {{ $value }}"

      # 消费延迟过大
      - alert: KafkaConsumerLag
        expr: kafka_consumer_lag > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 消费延迟过大"
          description: "消费组 {{ $labels.group }} 延迟: {{ $value }}"

      # Controller 异常
      - alert: KafkaMultipleControllers
        expr: count(kafka_controller_ActiveControllerCount == 1) != 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka Controller 异常"
          description: "检测到多个活跃 Controller"

      # 磁盘使用率
      - alert: KafkaDiskUsageHigh
        expr: kafka_log_Log_Size / kafka_log_Log_Size_limit > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 磁盘使用率过高"
          description: "Broker {{ $labels.instance }} 磁盘使用率超过 85%"

运维操作

Topic 管理

# 创建 Topic
bin/kafka-topics.sh --create \
  --topic my-topic \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --bootstrap-server localhost:9092

# 查看 Topic 详情
bin/kafka-topics.sh --describe \
  --topic my-topic \
  --bootstrap-server localhost:9092

# 修改 Topic 配置
bin/kafka-configs.sh --alter \
  --topic my-topic \
  --add-config retention.ms=1209600000 \
  --bootstrap-server localhost:9092

# 增加分区(只能增加,不能减少)
bin/kafka-topics.sh --alter \
  --topic my-topic \
  --partitions 24 \
  --bootstrap-server localhost:9092

# 删除 Topic
bin/kafka-topics.sh --delete \
  --topic my-topic \
  --bootstrap-server localhost:9092

副本重分配

# 1. 生成重分配计划
cat > topics-to-move.json << EOF
{
  "topics": [
    {"topic": "topic1"},
    {"topic": "topic2"}
  ],
  "version": 1
}
EOF

bin/kafka-reassign-partitions.sh \
  --topics-to-move-json-file topics-to-move.json \
  --broker-list "1,2,3,4,5" \
  --generate \
  --bootstrap-server localhost:9092

# 2. 执行重分配
cat > reassignment.json << EOF
{
  "version": 1,
  "partitions": [
    {
      "topic": "topic1",
      "partition": 0,
      "replicas": [1, 2, 3]
    }
  ]
}
EOF

bin/kafka-reassign-partitions.sh \
  --reassignment-json-file reassignment.json \
  --execute \
  --bootstrap-server localhost:9092

# 3. 验证重分配状态
bin/kafka-reassign-partitions.sh \
  --reassignment-json-file reassignment.json \
  --verify \
  --bootstrap-server localhost:9092

消费者组管理

# 查看消费者组列表
bin/kafka-consumer-groups.sh --list \
  --bootstrap-server localhost:9092

# 查看消费者组详情
bin/kafka-consumer-groups.sh --describe \
  --group my-group \
  --bootstrap-server localhost:9092

# 重置消费者组 Offset
bin/kafka-consumer-groups.sh --reset-offsets \
  --group my-group \
  --topic my-topic \
  --to-earliest \
  --execute \
  --bootstrap-server localhost:9092

# 删除消费者组
bin/kafka-consumer-groups.sh --delete \
  --group my-group \
  --bootstrap-server localhost:9092

故障排查

常见问题与解决

┌─────────────────────────────────────────────────────────────────────────┐
│                    常见故障排查                                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. Broker 启动失败                                                     │
│     ├── 检查日志:logs/server.log                                       │
│     ├── 常见原因:                                                      │
│     │   ├── 端口被占用                                                  │
│     │   ├── 日志目录权限问题                                            │
│     │   ├── 配置文件错误                                                │
│     │   └── JVM 内存不足                                                │
│     └── 解决:检查配置、权限、资源                                       │
│                                                                         │
│  2. 分区 Leader 选举失败                                                │
│     ├── 检查 ISR 状态                                                   │
│     │   bin/kafka-topics.sh --describe --topic <topic>                 │
│     ├── 常见原因:                                                      │
│     │   ├── ISR 为空                                                   │
│     │   ├── 副本落后过多                                                │
│     │   └── unclean.leader.election.enable=false                       │
│     └── 解决:恢复副本同步或临时启用不完全选举                           │
│                                                                         │
│  3. 消费延迟持续增长                                                    │
│     ├── 检查消费者状态                                                  │
│     │   bin/kafka-consumer-groups.sh --describe --group <group>        │
│     ├── 常见原因:                                                      │
│     │   ├── 消费者处理能力不足                                          │
│     │   ├── 消费者频繁 Rebalance                                        │
│     │   └── 下游系统瓶颈                                                │
│     └── 解决:增加消费者、优化处理逻辑、扩容下游                         │
│                                                                         │
│  4. 生产者发送超时                                                      │
│     ├── 检查网络和 Broker 状态                                          │
│     ├── 常见原因:                                                      │
│     │   ├── Broker 负载过高                                             │
│     │   ├── 网络延迟                                                    │
│     │   └── 请求队列积压                                                │
│     └── 解决:增加超时时间、优化 Broker 配置                             │
│                                                                         │
│  5. 磁盘空间不足                                                        │
│     ├── 检查日志目录大小                                                │
│     ├── 解决:                                                          │
│     │   ├── 调整保留策略                                                │
│     │   ├── 扩容磁盘                                                    │
│     │   └── 迁移数据到其他 Broker                                       │
│     └── 命令:du -sh /data/kafka-logs/*                                │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

诊断命令

# 查看 Broker 状态
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 查看 Controller 状态
bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

# 查看集群元数据
bin/kafka-metadata.sh snapshot \
  --snapshot /tmp/metadata.log \
  --command-config config/kraft/server.properties

# 检查日志段信息
bin/kafka-log-dirs.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic-list topic1,topic2

# 查看 JVM 状态
jstat -gc <pid> 1000
jmap -histo:live <pid>

# 网络连接诊断
netstat -an | grep 9092
ss -tulpn | grep 9092

最佳实践

生产环境配置清单

# ========== 基础配置 ==========
broker.id=1
log.dirs=/data/kafka-logs
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# ========== 日志配置 ==========
num.partitions=6
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleanup.policy=delete

# ========== 副本配置 ==========
default.replication.factor=3
min.insync.replicas=2
replica.lag.time.max.ms=30000
unclean.leader.election.enable=false
auto.leader.rebalance.enable=true

# ========== 事务配置 ==========
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# ========== 压缩配置 ==========
compression.type=producer

# ========== JVM 配置(启动脚本)==========
# KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
# KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"

容量规划

容量规划公式:
┌─────────────────────────────────────────────────────────────────────────┐
│                                                                         │
│  存储容量计算:                                                          │
│  日消息量 × 平均消息大小 × 保留天数 × 副本数 × 压缩比 × 1.2(预留)    │
│                                                                         │
│  示例:                                                                  │
│  日消息量:10亿                                                         │
│  平均消息大小:1KB                                                      │
│  保留天数:7天                                                          │
│  副本数:3                                                              │
│  压缩比:0.5                                                            │
│  预留:1.2                                                              │
│                                                                         │
│  总容量 = 10^9 × 1KB × 7 × 3 × 0.5 × 1.2                              │
│         = 12.6 TB                                                       │
│                                                                         │
│  吞吐量计算:                                                            │
│  峰值 TPS = 日消息量 × 峰值系数 / 86400                                │
│                                                                         │
│  示例(峰值系数 3):                                                    │
│  峰值 TPS = 10^9 × 3 / 86400 ≈ 35,000 TPS                             │
│                                                                         │
│  Broker 数量:                                                          │
│  Broker数 = 峰值TPS / 单Broker TPS × 冗余系数                          │
│           = 35,000 / 100,000 × 1.3 ≈ 1(单Broker足够)                │
│           实际建议至少 3 节点保证高可用                                  │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

小结

本章我们学习了:

  1. KRaft 部署:无 ZooKeeper 的新架构,部署配置详解
  2. 集群监控:核心指标、Prometheus+Grafana 监控方案
  3. 运维操作:Topic 管理、副本重分配、消费者组管理
  4. 故障排查:常见问题与解决方案
  5. 最佳实践:生产配置清单、容量规划

参考资料

  1. Kafka KRaft Mode
  2. Kafka Monitoring
  3. Kafka Operations

下一章预告

在下一章《企业级实战案例》中,我们将探讨:

  • 典型企业应用场景
  • 实时数据处理架构
  • 事件溯源与 CQRS
  • 生产环境最佳实践

Kafka 核心原理系列持续更新中,欢迎关注!

分享: