# kafka 缩容时,topic partitions 迁移脚本
# 参考: https://l5555afz2r.feishu.cn/docx/QupHdCozeotVONx9lkvcfr89nbd
# 参考: https://xie.infoq.cn/article/c0d28a5e6c9ccb18e72faca2e
kafka_bin=/root/kafka/bin
data_dir="reassign"
zookeeper_conn="x.x.x.x:2181"
# broker id / 1 2 3 4
reserve_brokers="1,3,4"
if [ ! -d ${data_dir} ]; then
mkdir ${data_dir}
fi
reassign_generate()
{
topic=$1
echo ${topic}
echo "{\"topics\": [{\"topic\": \"${topic}\"}],\"version\": 1}" > ${data_dir}/${topic}.json
cmd="${kafka_bin}/kafka-reassign-partitions.sh --zookeeper ${zookeeper_conn} --topics-to-move-json-file ${data_dir}/${topic}.json --broker-list ${reserve_brokers} --generate"
echo ${cmd}
result=`${cmd}`
if [ $? -ne 0 ]; then
echo "reassign generate error, cmd=${cmd}"
fi
# echo $result
echo "$result" | grep Proposed -A1 | grep -v Proposed > ${data_dir}/${topic}_reassign_plan.json
# ${cmd} | grep Proposed -A1 | grep -v Proposed > ${data_dir}/${topic}_reassign_plan.json
# if [ $? -ne 0 ]; then
# echo "reassign generate error, cmd=${cmd}"
# fi
}
_reassign_success_check()
{
topic=$1
cmd="${kafka_bin}/kafka-reassign-partitions.sh --zookeeper ${zookeeper_conn} --reassignment-json-file ${data_dir}/${topic}_reassign_plan.json --verify"
echo ${cmd}
while true
do
result=`${cmd} | grep -v "Status of partition reassignment:" | grep -v successfully`
if [ -n "$result" ]; then
sleep 0.2
echo "_reassign_success_check sleep 0.2, topic: $topic"
else
echo "_reassign_success_check success, topic: $topic"
break
fi
done
}
reassign_execute()
{
topic=$1
cmd="${kafka_bin}/kafka-reassign-partitions.sh --zookeeper ${zookeeper_conn} --reassignment-json-file ${data_dir}/${topic}_reassign_plan.json --execute"
echo ${cmd}
${cmd} | grep "Current partition replica assignment" -A2 | grep -v "Current partition replica assignment" > ${data_dir}/${topic}_reassign_excute.json
if [ $? -ne 0 ]; then
echo "reassign execute error, cmd=${cmd}"
fi
cat ${data_dir}/${topic}_reassign_excute.json | jq . > /dev/null
if [ $? -ne 0 ]; then
echo "reassign execute error, cmd=${cmd}"
fi
_reassign_success_check $topic
}
one_topic_reassign()
{
topic=$1
reassign_generate ${topic}
reassign_execute ${topic}
}
${kafka_bin}/kafka-topics.sh --list --zookeeper ${zookeeper_conn} | while read topic; do
one_topic_reassign ${topic}
if [ $? -ne 0 ]; then
echo "reassign execute error, topic=${topic}"
fi
done
最后修改:2023 年 11 月 08 日
© 允许规范转载