本文主要介紹在命令行(xing)模式下使用SASL證書連接Kafka實(shi)例(li)的操作(zuo),其(qi)中包(bao)含內網(wang)訪問和(he)公網(wang)訪問兩種連接場(chang)景。
在使用SASL證書的(de)場景(jing)下,通過(guo)內網(wang)(wang)訪(fang)問(wen)(wen)和(he)通過(guo)公網(wang)(wang)訪(fang)問(wen)(wen),僅涉及連(lian)(lian)接IP和(he)端口不(bu)一(yi)致,其他操作步驟是(shi)一(yi)樣的(de),內網(wang)(wang)訪(fang)問(wen)(wen)的(de)連(lian)(lian)接端口為9093,公網(wang)(wang)訪(fang)問(wen)(wen)的(de)連(lian)(lian)接端口為9095。
文中僅介(jie)紹公網訪(fang)問(wen)的連接示例,如果使用內網訪(fang)問(wen)時,替換為相應的連接地址即可。
說明Kafka實例的每個代理允許客戶端單IP連接的個數默認為1000個,如果超過了,會出現連接失敗問題。您可以通過修改配置參數來修改單IP的連接數。
前提條件
1.已配置(zhi)正確(que)的安全(quan)組,安全(quan)組規(gui)則(ze)(ze)請(qing)參考(kao)準備環境中(zhong)的安全(quan)組規(gui)則(ze)(ze) 。
2.已獲取連(lian)接(jie)Kafka實(shi)例的地址。
- 如果是使用內網通過同一個VPC訪問,實例端口為9093,實例連接地址獲取如下圖。
圖 使用內網通過(guo)同一個VPC訪(fang)問Kafka實(shi)例的(de)連接地址(zhi)(實(shi)例已開啟(qi)SASL)


- 如果是公網訪問,實例端口為9095,實例連接地址獲取如下圖。
圖(tu) 公(gong)網訪問Kafka實例(li)的(de)連接地址(實例(li)已開啟SASL)


3.已獲(huo)取(qu)開啟的SASL認證機制(zhi)。
在Kafka實(shi)例(li)詳情(qing)頁的“連接(jie)信(xin)息”區域,查(cha)看“開(kai)(kai)啟(qi)的SASL認證機(ji)制(zhi)”。如果SCRAM-SHA-512和PLAIN都開(kai)(kai)啟(qi)了,根據實(shi)際情(qing)況(kuang)選(xuan)擇其中任(ren)意一種配置連接(jie)。如果頁面未顯示(shi)“開(kai)(kai)啟(qi)的SASL認證機(ji)制(zhi)”,默認使用(yong)PLAIN機(ji)制(zhi)。
圖(tu) 開(kai)啟的SASL認證機制


4.如果Kafka實例未開啟自(zi)動(dong)創建Topic功能,獲(huo)取Topic名稱。
在實例的Topic管理頁簽中獲取(可選)步驟三:創建Topic中創建的(de)Topic名稱。
圖 查(cha)看Topic名稱


5.已購買ECS,并完成JDK安裝、環境變量配置以及Kafka開源客戶端下載,具體操作請參考準備環境。
配置生產消費配置文件
步驟 1 登錄Linux系統的ECS。
步驟 2 在ECS的(de)“/etc/hosts”文件中配(pei)置host和IP的(de)映射關系,以便客戶端能夠快速解析實例的(de)Broker。
其中,IP地(di)址(zhi)(zhi)必須為(wei)(wei)實例(li)(li)連接地(di)址(zhi)(zhi)(從前提條件獲(huo)取的連接地(di)址(zhi)(zhi)),host為(wei)(wei)每個實例(li)(li)主機的名稱(您可以自(zi)定義(yi)主機的名稱,但不能(neng)重復(fu))。
例如:
10.154.48.120 server01
10.154.48.121 server02
10.154.48.122 server03
步驟 3 下(xia)(xia)載(zai)client.jks證書。在Kafka控(kong)制臺單(dan)(dan)擊(ji)Kafka實(shi)例名稱,進入實(shi)例詳情頁面,在“連接信息 > SSL證書”所在行,單(dan)(dan)擊(ji)“下(xia)(xia)載(zai)”。
解壓(ya)壓(ya)縮包,獲取壓(ya)縮包中的客戶端(duan)證(zheng)書文件:client.jks。
步(bu)驟 4 根(gen)據(ju)已獲取的SASL認證機制,修改Kafka命令行工具配置文件。
- PLAIN機制: 在Kafka命令行工具的“/config”目錄中找到“consumer.properties”和“producer.properties”文件,并分別在文件中增加如下內容。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="**********" \
password="**********";
sasl.mechanism=PLAIN
說明username和(he)password為創建(jian)Kafka實例過程中開啟SASL_SSL時填入的用(yong)戶名(ming)和(he)密(mi)碼,或(huo)者(zhe)創建(jian)SASL_SSL用(yong)戶時設置的用(yong)戶名(ming)和(he)密(mi)碼。
- SCRAM-SHA-512機制: 在Kafka命令行工具的“/config”目錄中找到“consumer.properties”和“producer.properties”文件,并分別在文件中增加如下內容。
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="**********" \
password="**********";
sasl.mechanism=SCRAM-SHA-512
說明username和password為創建Kafka實例過程中開啟SASL_SSL時填入的用戶名和密碼,或者創建SASL_SSL用戶時設置的用戶名和密碼。
步驟(zou)5 根據安全協議,修改Kafka命令行工具配置文件。
1.SASL_SSL: 在Kafka命令行工(gong)具(ju)的“/config”目錄(lu)中找到“consumer.properties”和(he)“producer.properties”文(wen)(wen)件(jian),并分別在文(wen)(wen)件(jian)中增加如下內容。
security.protocol=SASL_SSL
ssl.truststore.location={ssl_truststore_path}
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=
參數說明:
- ssl.truststore.location配置為client.truststore.jks證書的存放路徑。注意,Windows系統下證書路徑中也必須使用“/”,不能使用Windows系統中復制路徑時的“\”,否則客戶端獲取證書失敗。
- ssl.truststore.password為服務器證書密碼,不可更改,需要保持為dms@kafka 。
- ssl.endpoint.identification.algorithm為證書域名校驗開關,為空則表示關閉。這里需要 保持關閉狀態,必須設置為空 。
- SASL_PLAINTEXT: 在Kafka命令行工具的“/config”目錄中找到“consumer.properties”和“producer.properties”文件,并分別在文件中增加如下內容。
security.protocol=SASL_PLAINTEXT
生產消息
進(jin)入Kafka客戶端文(wen)件(jian)的“/bin”目(mu)錄下(xia),執行(xing)如下(xia)命令(ling)進(jin)行(xing)生(sheng)產消息(xi)。
./kafka-console-producer.sh --broker-list ?{連接地址} --topic 連接地址??topic?{Topic名稱} --producer.config ../config/producer.properties
參數說明如下:
- 連接地址:從前提條件獲取的連接地址。
- Topic名稱:Kafka實例下創建的Topic名稱。
示例如下,“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”為Kafka實例連接地址。
執行完命令后(hou),輸(shu)(shu)入需要生產的消(xiao)息(xi)內容,按(an)“Enter”發送消(xiao)息(xi)到Kafka實例(li),輸(shu)(shu)入的每一行內容都將作為一條消(xiao)息(xi)發送到Kafka實例(li)。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 ?--topic topic-demo --producer.config ../config/producer.propertiesHello
DMS
Kafka!
^C[root@ecs-kafka bin]#
如需停止生產使用Ctrl+C命令退出。
消費消息
執(zhi)行如下命令消費(fei)消息。
./kafka-console-consumer.sh --bootstrap-server ?{連接地址} --topic 連接地址??topic?{Topic名稱} --group ${消費組名稱} --from-beginning ?--consumer.config ../config/consumer.properties
參數說明如下:
- 連接地址:從前提條件獲取的連接地址。
- Topic名稱:Kafka實例下創建的Topic名稱。
- 消費組名稱:根據您的業務需求,設定消費組名稱。 如果已經在配置文件中指定了消費組名稱,請確保命令行中的消費組名稱與配置文件中的相同,否則可能消費失敗 。 消費組名稱開頭包含特殊字符,例如下劃線“_”、#號“#”時,監控數據無法展示。
示例如下:
[root@ecs-kafka bin]# ?./kafka-console-consumer.sh --bootstrap-server 10.xxx.xxx.202:9095,10.xxx.xxx.197:9095,10.xxx.xxx.68:9095 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
Hello
Kafka!
DMS
^CProcessed a total of 3 messages
[root@ecs-kafka bin]#
如需停止消費使用Ctrl+C命令退出。
后續步驟
您(nin)可以(yi)通過設置監控指標的告警(jing)規格,當實(shi)例、節(jie)點(dian)、隊(dui)列等有(you)異(yi)常時,可以(yi)及時接收異(yi)常信(xin)息。