安全接入點接入
更新時間 2025-08-26 16:37:04
最近更新時間: 2025-08-26 16:37:04
分享文章
本節介紹如何使用的安全接入點接入Kafka的方法,文檔以Java代碼為例。
前提條件
已配置正確的安全組。
已獲取連接Kafka實例的地址。
如果Kafka實例未開啟自動創建Topic功能,在連接實例前,請先創建Topic。
已創建彈性云服務器,如果使用內網同一個VPC訪問實例,請設置彈性云服務器的VPC、子網、安全組與Kafka實例的VPC、子網、安全組一致。
需要用戶先在用戶管理頁面創建用戶,然后給對應的topic授予生產消費權限。
使用內網同一個VPC訪問,實例端口為8092,實例連接地址從控制臺實例詳情菜單處獲取,如下圖所示。
Maven中引入Kafka客戶端
Kafka實例基于社區版本2.8.2/3.6.2,推薦客戶端保持一致。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.2/3.6.2</version>
</dependency>
客戶端關鍵參數
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test_user\" password=\"Kafka@Test\";");
生產者代碼示例
package com.justin.kafka.service.gw.sasl;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
private final KafkaProducer<String, String> producer;
public final static String TOPIC = "test-topic2";
public final static String BROKER_ADDR = "192.168.0.11:8092,192.168.0.9:8092,192.168.0.10:8092";
public Producer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test_user\" password=\"Kafka@Test\";");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put("retries",3);
producer = new KafkaProducer<>(props);
}
public void produce() {
try {
for (int i = 0; i < 10; i++) {
String data = "The msg is " + i;
producer.send(new ProducerRecord<>(TOPIC, data), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// TODO: 異常處理
exception.printStackTrace();
return;
}
System.out.println("produce msg completed, partition id = " + metadata.partition());
}
});
}
} catch (Exception e) {
// TODO: 異常處理
e.printStackTrace();
}
producer.flush();
producer.close();
}
public static void main(String[] args) {
Producer producer = new Producer();
producer.produce();
}
}
消費者代碼示例
package com.justin.kafka.service.gw.sasl;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
private org.apache.kafka.clients.consumer.Consumer<String, String> consumer;
private static final String GROUP_ID = "test-group2";
private static final String TOPIC = "test-topic2";
public final static String BROKER_ADDR = "192.168.0.11:8092,192.168.0.9:8092,192.168.0.10:8092";
public Consumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test_user\" password=\"Kafka@Test\";");
consumer = new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList(TOPIC));
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("the numbers of topic:" + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("the data is " + record.value());
}
}catch (Exception e){
// TODO: 異常處理
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new Consumer().consume();
}
}