RocketMQ里的一個Consumer Group代表一個Consumer群組。對于大多數分布式應用來說,一個Consumer Group下通常會有多個Consumer實例。訂閱關系一致指的是同一個Consumer Group下所有Consumer實例的處理邏輯必須完全一致,一旦訂閱關系不一致,消息消費的邏輯就會混亂,甚至導致消息丟失。
背景信息
RocketMQ 中一個消費者代表一個Consumer實例群組。在大多數場景中,一個消費者組下面包含多個Consumer實例。
由于分布式消息服務RocketMQ的訂閱關系主要由Topic+Tag共同組成,因此,保持訂閱關系一致意味著同一個消費者Group ID下所有的Consumer實例訂閱關系的一致性大概包括下面幾個方面:
同一個消費組訂閱的Topic必須一致,例如:在同一個消費組下,ConsumerA訂閱Topic1和Topic2,ConsumerB也必須訂閱Topic1和Topic2,只訂閱Topic1、只訂閱Topic2或訂閱Topic2和Topic3都是不允許的。
同一個消費者訂閱的同一個Topic的場景下Tag必須一致,包括Tag的數量和T順序,例如:ConsumerA訂閱Topic1的Tag配置為Tag1||Tag2,ConsumerB訂閱Topic1的Tag也必須是Tag1||Tag2,只訂閱Tag1、只訂閱Tag2或者訂閱Tag2||Tag1都是不允許的。
正確的訂閱關系如下,多個不同的topic可以被多個消費組訂閱,但是同一個消費組下的多個Consumer實例訂閱Topic和Tag都必須一致。
代碼示例
- 訂閱一個Topic、一個Tag
同一個消費組下面的全部消費者實例均訂閱一個topic,且均配置同一個tag這種是符合訂閱關系一致性原則的。
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
- 訂閱一個topic多個tag
每個消費者訂閱消息的代碼必須一致
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
- 訂閱多個topic且訂閱多個tag
consumer.setConsumerGroup("group1");
consumer.subscribe(topic1,"Tag1");
consumer.subscribe(topic2,"Tag1|Tag2");
consumer.subscribe(topic3,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
常見的訂閱關系不一致情況
如果Rocketmq實例消費到的消息不符合預期,可以檢查一下消費者邏輯是否存在訂閱關系不一致的情況
下面列舉幾種常見的錯誤示例
同一個消費組下訂閱的topic不一致
消費者實例1的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic1,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
消費者實例2的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic2,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
消費者實例3的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic3,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
同一個消費組的消費實例訂閱的topic相同但訂閱的tag不一致
消費者實例1的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
消費者實例2的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
消費者實例3的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
同一個消費組下全部消費者實例訂閱的topic以及tag都一致但訂閱tag的順序不一致
消費者實例1的代碼
:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
消費者實例2的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}
消費者實例3的代碼:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs,?context)->{
//?????????do?something
}