亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

springboot對接rabbitmq(topic)

2024-03-28 09:29:34
8
0

topic主題交換器,通過路由key模糊匹配

				/---------------\									 /------------\
producer ----->	|				| ------topic.info.routing.key-----> | info queue | -------channel-------> consumer
				|				| 									 \------------/
				|				| 
				|				| 								      /-------------\
producer -----> |	 exchange	| ------topic.error.routing.key-----> | error queue | -------channel-------> consumer
				|				| 									  \-------------/
				|				| 
				|				|                                       /---------------\
producer -----> |				| ------topic.warning.routing.key-----> | warning queue | ------channel-----> consumer
				\---------------/										\---------------/

一、maven引用

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

二、配置文件

# 自定義配置應用于topic交換器
mq:
  config:
   #自定義交換器名稱
     exchange: log.topic
     queue:
        #自定義error、info、all隊列名稱
        errorName: topic.error.log
        infoName: topic.info.log
        allName: topic.all.log
        #自定義error、info、all路由鍵的名稱
        routingInfoKey: topic.info.routing.key
        routingErrorKey: topic.error.routing.key

三、生產者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    /**
     * spring整合的操作類
     * Message 發送的消息對象
     * void send(Message var1) throws AmqpException;
     * <p>
     * var1 路由鍵 Message 發送的消息對象
     * void send(String var1, Message var2) throws AmqpException;
     * <p>
     * var1 指定交換器名稱 var2 路由鍵 Message 發送的消息對象
     * void send(String var1, String var2, Message var3) throws AmqpException;
     *
     * convertAndSend() 方法不需要指定MessageProperties屬性即可發布
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.config.queue.routingInfoKey}")
    private String routingInfoKey;
    @Value("${mq.config.queue.routingErrorKey}")
    private String routingErrorKey;
    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg) {
        //需要指定交換器和路由鍵就可以轉發
        rabbitTemplate.convertAndSend(exchange, routingInfoKey, "info+"+msg);
        rabbitTemplate.convertAndSend(exchange, routingErrorKey,"error+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.order.routing.key","order+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.warn.routing.key", "warn+"+msg);
    }

}

四、消費者

 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 模糊匹配所有的數據隊列,注意在配置路由key的時候是*代表階段的配置,.不在匹配范圍內
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.allName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.*.routing.*")
)
public class AllReceiver {

    /**
     * 設置監聽方法
     *
     * @param msg
     * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:all {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * error的消費端
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.errorName}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingErrorKey}")
)
public class ErrorReceiver {

    /**
     * 設置監聽方法
     *  @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     *
     * @param msg
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:error {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * info的消費端
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.infoName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingInfoKey}")
)
public class InfoReceiver {

    /**
     * 設置監聽方法
     *
     * @param msg
     * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:info {}", msg);
    }
}
0條評論
0 / 1000
cactusii
15文章數
0粉絲數
cactusii
15 文章 | 0 粉絲
原創

springboot對接rabbitmq(topic)

2024-03-28 09:29:34
8
0

topic主題交換器,通過路由key模糊匹配

				/---------------\									 /------------\
producer ----->	|				| ------topic.info.routing.key-----> | info queue | -------channel-------> consumer
				|				| 									 \------------/
				|				| 
				|				| 								      /-------------\
producer -----> |	 exchange	| ------topic.error.routing.key-----> | error queue | -------channel-------> consumer
				|				| 									  \-------------/
				|				| 
				|				|                                       /---------------\
producer -----> |				| ------topic.warning.routing.key-----> | warning queue | ------channel-----> consumer
				\---------------/										\---------------/

一、maven引用

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

二、配置文件

# 自定義配置應用于topic交換器
mq:
  config:
   #自定義交換器名稱
     exchange: log.topic
     queue:
        #自定義error、info、all隊列名稱
        errorName: topic.error.log
        infoName: topic.info.log
        allName: topic.all.log
        #自定義error、info、all路由鍵的名稱
        routingInfoKey: topic.info.routing.key
        routingErrorKey: topic.error.routing.key

三、生產者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {
    /**
     * spring整合的操作類
     * Message 發送的消息對象
     * void send(Message var1) throws AmqpException;
     * <p>
     * var1 路由鍵 Message 發送的消息對象
     * void send(String var1, Message var2) throws AmqpException;
     * <p>
     * var1 指定交換器名稱 var2 路由鍵 Message 發送的消息對象
     * void send(String var1, String var2, Message var3) throws AmqpException;
     *
     * convertAndSend() 方法不需要指定MessageProperties屬性即可發布
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.config.queue.routingInfoKey}")
    private String routingInfoKey;
    @Value("${mq.config.queue.routingErrorKey}")
    private String routingErrorKey;
    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg) {
        //需要指定交換器和路由鍵就可以轉發
        rabbitTemplate.convertAndSend(exchange, routingInfoKey, "info+"+msg);
        rabbitTemplate.convertAndSend(exchange, routingErrorKey,"error+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.order.routing.key","order+"+ msg);
        rabbitTemplate.convertAndSend(exchange, "topic.warn.routing.key", "warn+"+msg);
    }

}

四、消費者

 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 模糊匹配所有的數據隊列,注意在配置路由key的時候是*代表階段的配置,.不在匹配范圍內
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.allName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.*.routing.*")
)
public class AllReceiver {

    /**
     * 設置監聽方法
     *
     * @param msg
     * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:all {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * error的消費端
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.errorName}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingErrorKey}")
)
public class ErrorReceiver {

    /**
     * 設置監聽方法
     *  @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     *
     * @param msg
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:error {}", msg);
    }
}
package com.niu.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * info的消費端
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(
                        value = "${mq.config.queue.infoName}", autoDelete = "true"
                ),
                exchange = @Exchange(
                        value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "${mq.config.queue.routingInfoKey}")
)
public class InfoReceiver {

    /**
     * 設置監聽方法
     *
     * @param msg
     * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:info {}", msg);
    }
}
文章來自個人專欄
文章 | 訂閱
0條評論
0 / 1000
請輸入你的評論
0
0