創建實例時開啟SASL_SSL訪問,則數據加密傳輸,安全性更高。
由于安全問題,支持的加密套件為 TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 ,
TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256和
TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 。
本章節介紹如何使用開源的Kafka客戶端訪問開啟SASL的Kafka實例的方法,其中包含在內網中通過同一個VPC連接實例和通過公網連接實例兩個場景。如果是使用DNAT訪問實例,請參考使用DNAT訪問Kafka實例。
說明Kafka實例的每個代理允許客戶端單IP連接的個數默認為1000個,如果超過了,會出現連接失敗問題。您可以通過修改配置參數來修改單IP的連接數。
前提條件
1.已配置正確的安全組。
訪問開啟SASL的Kafka實例時,實例需要配置正確的安全組規則,具體安全組配置要求,請參考上表。
2.已獲取連接Kafka實例的地址。
- 如果是使用內網通過同一個VPC訪問,實例端口為9093,實例連接地址獲取如下圖。
圖 使用內網通過同一個VPC訪問Kafka實例的連接地址(實例已開啟SASL)


- 如果是公網訪問,實例端口為9095,實例連接地址獲取如下圖。
圖 公網訪問Kafka實例的連接地址(實例已開啟SASL)


3.已獲取開啟的SASL認證機制。
在Kafka實例詳情頁的“連接信息”區域,查看“開啟的SASL認證機制”。如果SCRAM-SHA-512和PLAIN都開啟了,根據實際情況選擇其中任意一種配置連接。如果頁面未顯示“開啟的SASL認證機制”,默認使用PLAIN機制。
圖 開啟的SASL認證機制


4.如果Kafka實例未開啟自動創建Topic功能,在連接實例前,請先創建Topic。
5.已下載client.jks證書。如果沒有,在控制臺單擊Kafka實例名稱,進入實例詳情頁面,在“連接信息 > SSL證書”所在行,單擊“下載”。下載壓縮包后解壓,獲取壓縮包中的客戶端證書文件:client.jks。
6.已下載或者或者,確保Kafka實例版本與命令行工具版本相同。
7.已創建彈性云服務器,如果使用內網通過同一個VPC訪問實例,請設置彈性云服務器的VPC、子網、安全組與Kafka實例的VPC、子網、安全組一致。在彈性云服務器中安裝,并配置JAVA_HOME與PATH環境變量,具體方法如下:
使用執行用戶在用戶家目錄下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”為JDK的安裝路徑,請根據實際情況修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151 ?
export PATH=$?JAVA_HOME/bin:JAVAH?OME/bin:$?PATH
執行source .bash_profile命令使修改生效。
命令行模式連接實例
以下操作命令以Linux系統為例進行說明。
步驟 1 在客戶端所在主機的“/etc/hosts”文件中配置host和IP的映射關系,以便客戶端能夠快速解析實例的Broker。
其中,IP地址必須為實例連接地址(從前提條件獲取的連接地址),host為每個實例主機的名稱(主機的名稱由您自行設置,但不能重復)。
例如:
10.154.48.120 server01
10.154.48.121 server02
10.154.48.122 server03
步驟 2 解壓Kafka命令行工具。
進入文件壓縮包所在目錄,然后執行以下命令解壓文件。
tar -zxf [kafka_tar]
其中, [kafka_tar] 表示命令行工具的壓縮包名稱。
例如:
tar -zxf kafka_2.12-2.7.2.tgz
步驟 3 根據SASL認證機制,修改Kafka命令行工具配置文件。
1、PLAIN機制: 在Kafka命令行工具的“/config”目錄中找到“consumer.properties”和“producer.properties”文件,并分別在文件中增加如下內容。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="**********"
password="**********"; ???
?????
sasl.mechanism=PLAINsecurity.protocol=SASL_SSL
ssl.truststore.location={ssl_truststore_path}
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=
參數說明:
- username和password為創建Kafka實例過程中開啟SASL_SSL時填入的用戶名和密碼,或者創建SASL_SSL用戶時設置的用戶名和密碼。
- ssl.truststore.location配置為client.jks證書的存放路徑。注意,Windows系統下證書路徑中也必須使用“/”,不能使用Windows系統中復制路徑時的“\”,否則客戶端獲取證書失敗。
- ssl.truststore.password為服務器證書密碼,不可更改,需要保持為dms@kafka 。
- ssl.endpoint.identification.algorithm為證書域名校驗開關,為空則表示關閉。這里需要 保持關閉狀態,必須設置為空 。
2、SCRAM-SHA-512機制: 在Kafka命令行工具的“/config”目錄中找到“consumer.properties”和“producer.properties”文件,并分別在文件中增加如下內容。
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="**********" \
password="**********"; ????????
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
ssl.truststore.location={ssl_truststore_path}
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=
參數說明:
- username和password為創建Kafka實例過程中開啟SASL_SSL時填入的用戶名和密碼,或者創建SASL_SSL用戶時設置的用戶名和密碼。
- ssl.truststore.location配置為client.jks證書的存放路徑。注意,Windows系統下證書路徑中也必須使用“/”,不能使用Windows系統中復制路徑時的“\”,否則客戶端獲取證書失敗。
- ssl.truststore.password為服務器證書密碼,不可更改,需要保持為dms@kafka 。
- ssl.endpoint.identification.algorithm為證書域名校驗開關,為空則表示關閉。這里需要 保持關閉狀態,必須設置為空 。
步驟 4 進入Kafka命令行工具的“/bin”目錄下。
注意,Windows系統下需要進入“/bin/windows”目錄下。
步驟 5 執行如下命令進行生產消息。
./kafka-console-producer.sh --broker-list {連接地址} --topic {Topic名稱} --producer.config ../config/producer.properties
參數說明如下:
- 連接地址:從前提條件獲取的連接地址,如果是公網訪問,請使用“公網連接地址”,如果是VPC內訪問,請使用“內網連接地址”,請根據實際情況選擇。
- Topic名稱:Kafka實例下創建的Topic名稱。如果Kafka實例開啟了自動創建Topic功能,此參數值可以填寫已創建的Topic名稱,也可以填寫未創建的Topic名稱。
本文以公網訪問為例,Kafka實例連接地址為“10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095”。
執行完命令后,輸入需要生產的消息內容,按“Enter”發送消息到Kafka實例,輸入的每一行內容都將作為一條消息發送到Kafka實例。
[root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 --topic topic-demo --producer.config ../config/producer.properties
>Hello
>DMS
>Kafka!
>^C[root@ecs-kafka bin]#
如需停止生產使用Ctrl+C命令退出。
步驟 6 執行如下命令消費消息。
./kafka-console-consumer.sh --bootstrap-server ${連接地址} --topic ${Topic名稱} --group ${消費組名稱} --from-beginning ?--consumer.config ../config/consumer.properties
參數說明如下:
- 連接地址:從前提條件獲取的連接地址,如果是公網訪問,請使用“公網連接地址”,如果是VPC內訪問,請使用“內網連接地址”,請根據實際情況選擇。
- Topic名稱:Kafka實例下創建的Topic名稱。
- 消費組名稱:根據您的業務需求,設定消費組名稱。 如果已經在配置文件中指定了消費組名稱,請確保命令行中的消費組名稱與配置文件中的相同,否則可能消費失敗。 消費組名稱開頭包含特殊字符,例如下劃線“_”、#號“#”時,監控數據無法展示。
示例如下:
[root@ecs-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.3.196.45:9095,10.78.42.127:9095,10.4.49.103:9095 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
Hello
DMS
Kafka!
^CProcessed a total of 3 messages
[root@ecs-kafka bin]#
如需停止消費使用Ctrl+C命令退出。