docker单点kafka扩容

背景

公司为了快速上线,几个月前通过docker镜像 kafka 部署了一台kafka-borker。Dockerfile如下

1
2
3
4
5
6
7
8
FROM wurstmeister/kafka:latest

ENV KAFKA_MESSAGE_MAX_BYTES=21474836
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
ENV KAFKA_CREATE_TOPICS=test:1:1
ENV KAFKA_ZOOKEEPER_CONNECT=zk1:2181

VOLUME ["/kafka"]

随着业务的增长,单节点的kafka-broker已经成为一个潜在的可靠性隐患。所以借着N年难得一遇的停机维护,决定把kafka-broker扩容为主备集群模式,提升稳定性,并为日后的分区扩容提供可能(不停服)。

问题

除了单节点问题,docker的镜像配置也存在几个问题。

未指定broker.id

broker.id是kafka-broker的唯一标识,选举、服务查找等操作都是基于此标识。如果没有显式的指定,kafka-broker会由zk生成并下发一个大于1000的id,如:(1001),并保存在kafka-broker本地的meta.properties文件中。如果手动指定broker.id是无法指定大于1000的值。测试环境每次会执行rm -rf操作,但是zk不会同时操作。所以每次重启的brokerId都会+1。为了方便后续管理,需要指定brokerId。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object KafkaServer {
if (brokerIdSet.size > 1)
throw new InconsistentBrokerIdException(
s"Failed to match broker.id across log.dirs. This could happen if multiple brokers shared a log directory (log.dirs) " +
s"or partial data was manually copied from another broker. Found $brokerIdSet")
else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
throw new InconsistentBrokerIdException(
s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerIdSet.last} in meta.properties. " +
s"If you moved your data, make sure your configured broker.id matches. " +
s"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
brokerId = generateBrokerId
else if (brokerIdSet.size == 1) // pick broker.id from meta.properties
brokerId = brokerIdSet.last

/**
* Return a sequence id generated by updating the broker sequence id path in ZK.
* Users can provide brokerId in the config. To avoid conflicts between ZK generated
* sequence id and configured brokerId, we increment the generated sequence id by KafkaConfig.MaxReservedBrokerId.
*/
private def generateBrokerId: Int = {
try {
zkClient.generateBrokerSequenceId() + config.maxReservedBrokerId
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)
throw new GenerateBrokerIdException("Failed to generate broker.id", e)
}
}
}

开启自动创建topic

通过KAFKA_AUTO_CREATE_TOPICS_ENABLE开启自动创建topic,会在producer发送消息时创建topic,但是不会指定partition和replica。需要关闭此配置,统一由运维人员审核创建。

镜像采用了latest标签

开发环境可以使用到最近的镜像,但是生产环境使用此tag,容易造成版本不一致。本次升级过程中遇到的问题,就是基于此,后面会详细介绍。

方案

  1. 制作Dockerfile,修改环境变量。
  2. 停止producer应用
  3. 观察consumer应用消费完成
  4. 部署broker2
  5. 修改测试topic的replica并同步数据
  6. 观察测试topic在broker2中的数据及zk节点元数据
  7. 测试producer及consumer
  8. 批量执行replica脚本
  9. 删除broker1数据并重新部署
  10. 启动应用

制作Dockerfile,修改环境变量。

1
2
3
4
5
6
7
8
FROM wurstmeister/kafka:latest

BROKER_ID_COMMAND="echo 2"
ENV KAFKA_MESSAGE_MAX_BYTES=21474836
ENV KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
ENV KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181

VOLUME ["/kafka"]

停止producer应用 & 观察consumer应用消费完成

进入broker docker容器

1
docker exec -it kafka-broker1 bash

查看group列表

1
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看group的消费情况

1
kafka-consumer-groups.sh --bootstrap-server localhost:9092  --group group_name --describe

可以看到

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 6901 6901 0 consumer-1-80496f05-3be9-49c3-8b84-45c26f07a4aa /172.20.0.1 consumer-1 consumer-1

LAG = LOG-END-OFFSET - CURRENT-OFFSET,如果LAG=0,则表示消息已经消费完成。

部署broker2

可以将BROKER_ID_COMMAND等环境变量信息,通过 docker run -e的形式执行,本次通过Dockerfile设置,每个broker对应一个Dockerfile。

修改测试topic的replica并同步数据

通过kafka-reassign-partitions.sh脚本进行replica重分配,分配规则以json文件保存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    "version":1,
"partitions":
[
{
"topic": "test.topic.1",
"partition": 0,
"replicas": [1001,2]
},
{
"topic": "test.topic.1",
"partition": 1,
"replicas": [1001,2]
}
]
}
1
2
3
/opt/kafka/bin/kafka-reassign-partitions.sh  --zookeeper  zk1:2181,zk2:2181,zk3:2181 
\ --reassignment-json-file test.topic.1.json
\ --execute

此时出现了故障,因为Dockerfile使用了lastest标签FROM wurstmeister/kafka:latest导致获取到的版本高于生产环境的broker版本,此时日志提示版本问题,无法同步,且kafka-reassign-partitions一直处于pending状态,无法再次执行。

1
There is an existing assignment running.

解决方案

查找kafka-reassign-partitions强制终止命令

查看源码ReassignPartitionsCommand,只找到verify,没有关闭操作。

删除zk对应节点

kafka-reassign-partitions指定了zk地址,查找在zk中保存的元数据信息,并删除。

kafka/core/src/main/scala/kafka/zk/ZkData.scala

1
2
3
object ReassignPartitionsZNode {
def path = s"${AdminZNode.path}/reassign_partitions"
}

zk命令

1
delete /admin/reassign_partitions
修改dockerfile版本后重新执行kafka-reassign-partitions

再次执行kafka-reassign-partitions,很快执行完成,并在broker2中观察到对应数据。

批量执行replica脚本

批量脚本内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash
KFK_CMD='/opt/kafka/bin/kafka-reassign-partitions.sh
\ --zookeeper zk1:2181,zk2:2181,zk3:2181
\ --reassignment-json-file'
CHK_CMD='/opt/kafka/bin/kafka-topics.sh
\ --describe
\ --zookeeper zk1:2181,zk2:2181,zk3:2181
\ --topic '

topic_list='
test.topic.1
test.topic.2
test.topic.3
test.topic.4
test.topic.5
__consumer_offsets
'

for i in $topic_list
do
$KFK_CMD $i.json --execute
sleep 2
$CHK_CMD i
done

需要提前准备好分配json,并已topic命名,如 __consumer_offsets.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"version":1,
"partitions":
[
{
"topic": "__consumer_offsets",
"partition": 0,
"replicas": [1001,2]
},
{
"topic": "__consumer_offsets",
"partition": 1,
"replicas": [1001,2]
}
]
}

删除broker1数据并重新部署

删除broker1镜像数据并重新制作Dockerfile升级。一定要删除镜像数据,否者会优先读取本地的meta.properties,环境变量不会生效。

结论

至此,docker下单节点kafka扩容有惊无险的完成了。