topic主題交換器,通過路由key模糊匹配
/---------------\ /------------\
producer -----> | | ------topic.info.routing.key-----> | info queue | -------channel-------> consumer
| | \------------/
| |
| | /-------------\
producer -----> | exchange | ------topic.error.routing.key-----> | error queue | -------channel-------> consumer
| | \-------------/
| |
| | /---------------\
producer -----> | | ------topic.warning.routing.key-----> | warning queue | ------channel-----> consumer
\---------------/ \---------------/
一、maven引用
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
二、配置文件
# 自定義配置應用于topic交換器
mq:
config:
#自定義交換器名稱
exchange: log.topic
queue:
#自定義error、info、all隊列名稱
errorName: topic.error.log
infoName: topic.info.log
allName: topic.all.log
#自定義error、info、all路由鍵的名稱
routingInfoKey: topic.info.routing.key
routingErrorKey: topic.error.routing.key
三、生產者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
/**
* spring整合的操作類
* Message 發送的消息對象
* void send(Message var1) throws AmqpException;
* <p>
* var1 路由鍵 Message 發送的消息對象
* void send(String var1, Message var2) throws AmqpException;
* <p>
* var1 指定交換器名稱 var2 路由鍵 Message 發送的消息對象
* void send(String var1, String var2, Message var3) throws AmqpException;
*
* convertAndSend() 方法不需要指定MessageProperties屬性即可發布
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${mq.config.queue.routingInfoKey}")
private String routingInfoKey;
@Value("${mq.config.queue.routingErrorKey}")
private String routingErrorKey;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
//需要指定交換器和路由鍵就可以轉發
rabbitTemplate.convertAndSend(exchange, routingInfoKey, "info+"+msg);
rabbitTemplate.convertAndSend(exchange, routingErrorKey,"error+"+ msg);
rabbitTemplate.convertAndSend(exchange, "topic.order.routing.key","order+"+ msg);
rabbitTemplate.convertAndSend(exchange, "topic.warn.routing.key", "warn+"+msg);
}
}
四、消費者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 模糊匹配所有的數據隊列,注意在配置路由key的時候是*代表階段的配置,.不在匹配范圍內
* @RabbitListener 自定義監聽事件
* @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
* value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
* exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
* key 指定路由鍵
*/
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
value = "${mq.config.queue.allName}", autoDelete = "true"
),
exchange = @Exchange(
value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.*.routing.*")
)
public class AllReceiver {
/**
* 設置監聽方法
*
* @param msg
* @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
*/
@RabbitHandler(isDefault = true)
public void process(String msg) {
log.info("接受到消息:all {}", msg);
}
}
package com.niu.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* error的消費端
* @RabbitListener 自定義監聽事件
* @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
* value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
* exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
* key 指定路由鍵
*/
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.errorName}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "${mq.config.queue.routingErrorKey}")
)
public class ErrorReceiver {
/**
* 設置監聽方法
* @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
*
* @param msg
*/
@RabbitHandler(isDefault = true)
public void process(String msg) {
log.info("接受到消息:error {}", msg);
}
}
package com.niu.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* info的消費端
* @RabbitListener 自定義監聽事件
* @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
* value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
* exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
* key 指定路由鍵
*/
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(
value = "${mq.config.queue.infoName}", autoDelete = "true"
),
exchange = @Exchange(
value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "${mq.config.queue.routingInfoKey}")
)
public class InfoReceiver {
/**
* 設置監聽方法
*
* @param msg
* @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
*/
@RabbitHandler(isDefault = true)
public void process(String msg) {
log.info("接受到消息:info {}", msg);
}
}