Topic,即消(xiao)息主題。創(chuang)建(jian)(jian)Kafka實例成(cheng)功后,如(ru)果沒有開啟(qi)“Kafka自動(dong)創(chuang)建(jian)(jian)Topic”,需要手(shou)動(dong)創(chuang)建(jian)(jian)Topic,然后才能進行生(sheng)產消(xiao)息和(he)消(xiao)費(fei)消(xiao)息。如(ru)果實例開啟(qi)了“Kafka自動(dong)創(chuang)建(jian)(jian)Topic”,則該操作為可選。
“Kafka自(zi)動(dong)創建(jian)(jian)Topic”表示在(zai)生產或(huo)(huo)消費一(yi)個(ge)未創建(jian)(jian)的(de)Topic時,會自(zi)動(dong)創建(jian)(jian)此Topic,此Topic的(de)默認參(can)(can)數(shu)(shu)(shu)(shu)值(zhi)如下(xia):分(fen)(fen)區數(shu)(shu)(shu)(shu)為(wei)(wei)3,副(fu)本(ben)(ben)數(shu)(shu)(shu)(shu)為(wei)(wei)3,老化時間為(wei)(wei)72小時,不(bu)開啟同(tong)步(bu)復制(zhi)和同(tong)步(bu)落盤。如果在(zai)“配(pei)置參(can)(can)數(shu)(shu)(shu)(shu)”中修改“log.retention.hours”、“default.replication.factor”或(huo)(huo)“num.partitions”的(de)參(can)(can)數(shu)(shu)(shu)(shu)值(zhi),此后(hou)自(zi)動(dong)創建(jian)(jian)的(de)Topic參(can)(can)數(shu)(shu)(shu)(shu)值(zhi)為(wei)(wei)修改后(hou)的(de)參(can)(can)數(shu)(shu)(shu)(shu)值(zhi)。例如:“num.partitions”修改為(wei)(wei)“5”,自(zi)動(dong)創建(jian)(jian)的(de)Topic參(can)(can)數(shu)(shu)(shu)(shu)值(zhi)如下(xia):分(fen)(fen)區數(shu)(shu)(shu)(shu)為(wei)(wei)5,副(fu)本(ben)(ben)數(shu)(shu)(shu)(shu)為(wei)(wei)3,老化時間為(wei)(wei)72小時,不(bu)開啟同(tong)步(bu)復制(zhi)和同(tong)步(bu)落盤。
Kafka實例對Topic的總分區數設置了上限, 當Topic的總分區數達到上限后,用戶就無法繼續創建Topic 。不同規格配置的Topic總分區數不同,具體請參考Kafka產品規格說明。
本(ben)文主要(yao)介紹手動(dong)創建Topic的操(cao)作,有以(yi)(yi)下幾種方(fang)式,您可以(yi)(yi)根據實際(ji)情況選擇任意一種方(fang)式:
- 方式1:在控制臺創建
- 方式2:在Kafka Manager創建
- 方式3:在Kafka客戶端上創建
說明實(shi)例節點出現(xian)故(gu)障的情況下,單副本(ben)(ben)Topic查詢消息時可能會報“內部服務錯誤”,因此(ci)不建議(yi)使(shi)用單副本(ben)(ben)Topic。
方式1:在控制臺創建
步驟(zou) 1 登錄(lu)管理控(kong)制(zhi)臺。
步驟 2 在管理控制臺右上角單擊
,選擇區域。
說明請選擇Kafka實例所在的區域。
步驟 3 在管理控制臺左上角單擊
,選擇(ze)“企(qi)業中間件”-“分布(bu)(bu)式消(xiao)息服務”-“Kafka專享版”,進入(ru)分布(bu)(bu)式消(xiao)息服務Kafka專享版頁(ye)面。
步驟 4 單擊Kafka實例的名(ming)稱(cheng),進入實例詳情頁面。
步驟 5 選擇“Topic管(guan)理”頁簽(qian),單擊“創建Topic”。
彈出“創建(jian)Topic”對話框。
圖 創建Topic


步驟 6 填寫Topic名稱和配置信息。
表(biao) Topic參數說明
參考 說明 Topic名稱 系(xi)統為您自(zi)動(dong)生成了Topic名稱,您可以根據(ju)需要修改(gai)。
創建Topic后(hou)不(bu)能(neng)修改名稱。
分區數 您(nin)可以設(she)置Topic的分(fen)區(qu)數(shu),分(fen)區(qu)數(shu)越(yue)大消(xiao)費(fei)的并發(fa)度越(yue)大。
該參數(shu)設置為1時,消(xiao)(xiao)費(fei)消(xiao)(xiao)息(xi)時會(hui)按照(zhao)先入(ru)先出(chu)的(de)順序進行(xing)消(xiao)(xiao)費(fei)。
取(qu)值范圍:1~200
默認值:3
副本數 您可以為每(mei)個(ge)Topic設(she)置副本(ben)(ben)的(de)數(shu)量,Kafka會(hui)自動在每(mei)個(ge)副本(ben)(ben)上備份(fen)數(shu)據,當(dang)其(qi)中一個(ge)Broker故障時數(shu)據依然是可用的(de),副本(ben)(ben)數(shu)越大可靠(kao)性越高。
該參數(shu)設置為1時(shi),表示只(zhi)有(you)一(yi)份數(shu)據(ju)。
默認值:3
說明實例節點出現故障(zhang)的情(qing)況下,單(dan)副(fu)本Topic查詢(xun)消息時(shi)可(ke)能會(hui)報“內部服(fu)務錯誤”,因此(ci)不建議使用單(dan)副(fu)本Topic。
老化時間(小時) 消(xiao)(xiao)(xiao)(xiao)息的最長保留時(shi)間(jian),消(xiao)(xiao)(xiao)(xiao)費(fei)者(zhe)必須在(zai)此時(shi)間(jian)結束前(qian)消(xiao)(xiao)(xiao)(xiao)費(fei)消(xiao)(xiao)(xiao)(xiao)息,否則(ze)消(xiao)(xiao)(xiao)(xiao)息將被(bei)刪除(chu)。刪除(chu)的消(xiao)(xiao)(xiao)(xiao)息,無(wu)法(fa)被(bei)消(xiao)(xiao)(xiao)(xiao)費(fei)。
取值范(fan)圍:1~720
默認值:72
同步復制 指后(hou)端收到生產消息請求并復制給所(suo)有副(fu)本(ben)后(hou),才返回(hui)客戶端。
開啟同(tong)步復(fu)制后,需(xu)要(yao)在客戶(hu)端配置acks=all或(huo)者-1,否(fou)則無效。
當副本數(shu)為1時,不能(neng)選(xuan)擇同步(bu)復(fu)制功能(neng)。
同步落盤 同步落盤(pan)是指生產的每條消(xiao)息都會(hui)立即(ji)寫入磁盤(pan)。
開啟:生產的每(mei)條消(xiao)息(xi)都會立即寫(xie)入磁(ci)盤,可靠性(xing)更高。
關(guan)閉:生產的消息存在(zai)內存中,不會(hui)立(li)即寫入磁盤。
消息時間戳類型 定義消息中的時間戳類型,取值如下:
CreateTime:生產者創建(jian)消息(xi)的時(shi)間(jian)。
LogAppendTime:broker將(jiang)消息寫入日志(zhi)的(de)時間。
批(pi)處理消息(xi)最大(da)值 Kafka允許(xu)的最(zui)大批處理大小(xiao),如果啟用消息壓縮,則表示壓縮后的最(zui)大批處理大小(xiao)。
如果增加(jia)“max.message.bytes”的(de)值,且(qie)存在消費(fei)(fei)者版(ban)本早于0.10.2,此時消費(fei)(fei)者的(de)“fetch size”值也必須(xu)增加(jia),以(yi)便消費(fei)(fei)者可以(yi)獲取增加(jia)后(hou)的(de)批(pi)處理大小。
描述 Topic的描述信息。
步驟 7 配置完(wan)成后(hou),單擊(ji)“確定”,完(wan)成創建Topic。
方式2:在Kafka Manager創建
登錄Kafka Manager后,在頁面(mian)頂部選擇(ze)“Topic > Create”,然后按照(zhao)界(jie)面(mian)參數(shu)填寫即可。
圖 在Kafka Manager中創建(jian)Topic


說明Topic名稱(cheng)開(kai)頭包含特殊字符,例如下劃線“_”、#號“#”時,監控數(shu)據無法展示。
方式3:在Kafka客戶端上創建
Kafka客戶端版本為2.2以上時,支持通過kafka-topics.sh創建Topic,以(yi)及管(guan)理Topic的各類參數。
說明Topic名稱開頭包含特殊字符,例如下劃線“_”、#號“#”時,監控數據無法展示。
- 未開啟SASL的Kafka實例,在“/{命令行工具所在目錄}/kafka_{version}/bin/”目錄下,通過以下命令創建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}
- 已開啟SASL的Kafka實例,通過以下步驟創建Topic。
- (可選)如果已經設置了SSL證書配置,請跳過此步驟。否則請執行以下操作。在Kafka客戶端的“/config”目錄中創建“ssl-user-config.properties”文件,參考步驟3增加SSL證書配置。
- 在“/{命令行工具所在目錄}/kafka_{version}/bin/”目錄下,通過以下命令創建Topic。
./kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --command-config ./config/ssl-user-config.properties