編譯運行Demo Java工程
更新時間 2025-06-16 11:54:11
最近更新時間: 2025-06-16 11:54:11
分享文章
介紹連接Kafka編譯運行Demo Java工程
kafka-clients引入依賴
在使用Kafka時,你需要在你的項目中引入相應的依賴。具體的依賴項可能會因你的項目和需求而有所不同。在使用Kafka之前,請確保查閱官方文檔以獲取最新的依賴項和使用說明。
以Java編程語言為例,可以使用Kafka的Java客戶端庫。你可以在Maven或Gradle構建工具中添加以下依賴項:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
示例代碼
-
從控制臺獲取以下信息
-
連接地址
實例連接地址從控制臺實例詳情菜單處獲取,在實例詳情頁面的接入點信息一欄。
-
Topic名稱
在Topic管理頁面,選擇需要的Topic名稱。
-
消費組名稱
在消費組管理頁面,選擇需要的消費組名稱。
-
-
在實例代碼中替換以上信息即可實現消息。
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { private final KafkaProducer<String, String> producer; public final static String TOPIC = "test-topic"; public final static String BROKER_ADDR = "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Producer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put("retries",3); producer = new KafkaProducer<>(props); } public void produce() { try { for (int i = 0; i < 10; i++) { String data = "The msg is " + i; producer.send(new ProducerRecord<>(TOPIC, data), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { // TODO: 異常處理 exception.printStackTrace(); return; } System.out.println("produce msg completed, partition id = " + metadata.partition()); } }); } } catch (Exception e) { // TODO: 異常處理 e.printStackTrace(); } producer.flush(); producer.close(); } public static void main(String[] args) { Producer producer = new Producer(); producer.produce(); } } -
同樣在實例代碼中替換以上信息即可消費消息。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { private org.apache.kafka.clients.consumer.Consumer<String, String> consumer; private static final String GROUP_ID = "test-group"; private static final String TOPIC = "test-topic"; public final static String BROKER_ADDR = "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Consumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); } public void consume() { consumer.subscribe(Arrays.asList(TOPIC)); while (true){ try { ConsumerRecords<String, String> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println("the data is " + record.value()); } }catch (Exception e){ // TODO: 異常處理 e.printStackTrace(); } } } public static void main(String[] args) { new Consumer().consume(); } }