本章節介紹如何使用開源的Kafka客戶端訪問未開啟SASL的Kafka實例的方法,其中包含在內網中通過同一個VPC連接實例和通過公網連接實例兩個場景。如果是使用DNAT訪問實例,請參考使用DNAT訪問Kafka實例。
多語言客戶端(duan)的(de)使用,請參考Kafka官網:。
說明
本章(zhang)節主要描述使用(yong)命令行模式連接Kafka實(shi)例。
Kafka實例的每個代理允許客戶端單IP連接的個數默認為1000個,如果超過了,會出現連接失敗問題。您可以通過修改配置參數來修改(gai)單IP的連接數。
前提條件
1.已(yi)配置正確的(de)安全組。
訪問未開啟SASL的Kafka實例時,實例需要配置正確的安全組規則,具體安全組配置要求,請參考表安全組規則。
2.已(yi)獲取(qu)連接Kafka實例(li)的地(di)址。
- 如果是使用內網通過同一個VPC訪問,實例端口為9092,實例連接地址獲取如下圖。
圖使用內網通過同一個VPC訪問Kafka實例(li)的連接地址(實例(li)未開啟SASL)


- 如果是公網訪問,實例端口為9094,實例連接地址獲取如下圖。
圖(tu)公網訪問Kafka實例的連接地址(實例未開啟SASL)


3.如果Kafka實例未開啟自動創建Topic功能,在連接實例前,請先創建Topic。
4.已下(xia)載或(huo)者或(huo)者,確保Kafka實例版(ban)本(ben)(ben)與(yu)命令行工具版(ban)本(ben)(ben)相同(tong)。
5.已創建(jian)彈(dan)(dan)性(xing)(xing)云服務(wu)器(qi)(qi)(qi),如果使用內網通(tong)過同一(yi)個VPC訪(fang)問實例,請(qing)設置彈(dan)(dan)性(xing)(xing)云服務(wu)器(qi)(qi)(qi)的VPC、子(zi)網、安全組(zu)與Kafka實例的VPC、子(zi)網、安全組(zu)一(yi)致。在彈(dan)(dan)性(xing)(xing)云服務(wu)器(qi)(qi)(qi)中(zhong)安裝,并配置JAVA_HOME與PATH環境變量,具體方(fang)法如下:
使用執行用戶在(zai)用戶家目錄(lu)下(xia)修(xiu)改(gai)“.bash_profile”,添加如下(xia)行。其中“/opt/java/jdk1.8.0_151”為JDK的安裝路(lu)徑,請根據實際情況修(xiu)改(gai)。
export JAVA_HOME=/opt/java/jdk1.8.0_151
export PATH=$JAVA_HOME/bin:$PATH
執行source .bash_profile命令使修改生效。
命令行模式連接實例
以下(xia)操作命令以Linux系(xi)統(tong)為例進行說明:
步驟 1 解(jie)壓Kafka命(ming)令行(xing)工具(ju)。
進入文(wen)件(jian)壓(ya)縮包所在目(mu)錄(lu),然后執行以下命令(ling)解(jie)壓(ya)文(wen)件(jian)。
tar -zxf [kafka_tar]
其中, [kafka_tar] 表示命令(ling)行工具(ju)的壓(ya)縮(suo)包名稱。
例如:
tar -zxf kafka_2.12-2.7.2.tgz
步驟 2 進入Kafka命令行工具的“/bin”目錄下。
注意,Windows系統(tong)下需要進入(ru)“/bin/windows”目錄下。
步驟(zou) 3 執行(xing)如下命(ming)令進行(xing)生(sheng)產消息。
./kafka-console-producer.sh --broker-list {連接地址} --topic {Topic名稱}
參數說明如下:
- 連接地址:從前提條件中獲取的連接地址,如果是公網訪問,請使用“公網連接地址”,如果是VPC內訪問,請使用“內網連接地址”,請根據實際情況選擇。
- Topic名稱:Kafka實例下創建的Topic名稱。如果Kafka實例開啟了自動創建Topic功能,此參數值可以填寫已創建的Topic名稱,也可以填寫未創建的Topic名稱。
本文以(yi)公網連接為例,獲取的Kafka實例公網連接地址為“10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094”。執行(xing)完命令后(hou)輸入(ru)內容(rong),按“Enter”發(fa)送(song)消(xiao)息到Kafka實例,輸入(ru)的每(mei)一(yi)行(xing)內容(rong)都將作(zuo)為一(yi)條消(xiao)息發(fa)送(song)到Kafka實例。
[root@ecs-kafka bin]# ./kafka-console-producer.sh --broker-list 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 --topic topic-demo
>Hello
>DMS
>Kafka!
>^C[root@ecs-kafka bin]#
如需停止生產使用Ctrl+C命令退出。
步(bu)驟 4 執行如下命令消費消息。
./kafka-console-consumer.sh --bootstrap-server {連接地址} --topic {Topic名稱} --group ${消費組名稱} --from-beginning
參數說明如下:
- 連接地址:從前提條件中獲取的連接地址,如果是公網訪問,請使用“公網連接地址”,如果是VPC內訪問,請使用“內網連接地址”,請根據實際情況選擇。
- Topic名稱:Kafka實例下創建的Topic名稱。
- 消費組名稱:根據您的業務需求,設定消費組名稱。 如果已經在配置文件中指定了消費組名稱,請確保命令行中的消費組名稱與配置文件中的相同,否則可能消費失敗 。 消費組名稱開頭包含特殊字符,例如下劃線“_”、#號“#”時,監控數據無法展示。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.3.196.45:9094,10.78.42.127:9094,10.4.49.103:9094 --topic topic-demo --group order-test --from-beginning
Kafka!
DMS
Hello
^CProcessed a total of 3 messages
[root@ecs-kafka bin]#
如需停止消費使用Ctrl+C命令退出。