# kafka 缩容时,topic partitions 迁移脚本
# 参考: https://l5555afz2r.feishu.cn/docx/QupHdCozeotVONx9lkvcfr89nbd
# 参考: https://xie.infoq.cn/article/c0d28a5e6c9ccb18e72faca2e
kafka_bin=/root/kafka/bin
data_dir="reassign"
zookeeper_conn="x.x.x.x:2181"
# broker id / 1 2 3 4
reserve_brokers="1,3,4"

if [ ! -d ${data_dir} ]; then
  mkdir ${data_dir}
fi

reassign_generate()
{
    topic=$1
    echo ${topic}
    echo "{\"topics\": [{\"topic\": \"${topic}\"}],\"version\": 1}" > ${data_dir}/${topic}.json

    cmd="${kafka_bin}/kafka-reassign-partitions.sh  --zookeeper ${zookeeper_conn} --topics-to-move-json-file  ${data_dir}/${topic}.json --broker-list ${reserve_brokers} --generate"
    echo ${cmd}

    result=`${cmd}`
    if [ $? -ne 0 ]; then
        echo "reassign generate error, cmd=${cmd}"
    fi
    # echo $result
    echo "$result"  | grep Proposed -A1 | grep -v Proposed >  ${data_dir}/${topic}_reassign_plan.json

    # ${cmd} | grep Proposed -A1 | grep -v Proposed >  ${data_dir}/${topic}_reassign_plan.json
    # if [ $? -ne 0 ]; then
    #     echo "reassign generate error, cmd=${cmd}"
    # fi
}

_reassign_success_check()
{
    topic=$1
    cmd="${kafka_bin}/kafka-reassign-partitions.sh --zookeeper ${zookeeper_conn} --reassignment-json-file ${data_dir}/${topic}_reassign_plan.json --verify"
    echo ${cmd} 
    while true
    do
        result=`${cmd} | grep -v "Status of partition reassignment:" | grep -v successfully`
        if [ -n "$result" ]; then
            sleep 0.2
            echo "_reassign_success_check sleep 0.2, topic: $topic"
        else
            echo "_reassign_success_check success, topic: $topic"
            break
        fi
    done
}

reassign_execute()
{
    topic=$1

    cmd="${kafka_bin}/kafka-reassign-partitions.sh --zookeeper ${zookeeper_conn} --reassignment-json-file  ${data_dir}/${topic}_reassign_plan.json --execute"
    echo ${cmd} 
    ${cmd} | grep "Current partition replica assignment" -A2 | grep -v "Current partition replica assignment" >  ${data_dir}/${topic}_reassign_excute.json
    if [ $? -ne 0 ]; then
        echo "reassign execute error, cmd=${cmd}"
    fi

    cat ${data_dir}/${topic}_reassign_excute.json | jq . > /dev/null
    if [ $? -ne 0 ]; then
        echo "reassign execute error, cmd=${cmd}"
    fi

    _reassign_success_check $topic
}

one_topic_reassign()
{
    topic=$1
    reassign_generate ${topic}
    reassign_execute ${topic}
}

${kafka_bin}/kafka-topics.sh --list --zookeeper ${zookeeper_conn} | while read topic; do
    one_topic_reassign ${topic}
    if [ $? -ne 0 ]; then
        echo "reassign execute error, topic=${topic}"
    fi
done
最后修改:2023 年 11 月 08 日
如果觉得我的文章对你有用,请随意赞赏