Spring RabbitMQ

题外

由于项目上必须使用RabbitMQ,特地学习及研究了一段时间,项目成功稳定上线,故总结回顾

RabbitMQ

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。

使用体验

  • 各个系统解耦,同步转为异步。
  • 减少服务器并发压力,针对大批量数据可按顺序执行,避免并发问题

依赖

<dependency>  
    <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.1</version>
</dependency>  

消费者配置

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <description>rabbitmq 消费者服务配置</description>

    <!-- 基于注解的根包扫描路径 -->
    <context:component-scan base-package="com.epoch.customizeproject.bsht.rabbitmq" />

    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="xxx" username="xxx" password="xxx" port="5672" virtual-host="xxx" />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 监听器 -->
    <bean id="queueListenter" class="com.epoch.customizeproject.bsht.rabbitmq.QueueListenter"/>

    <!-- 配置监听queue -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queue-names="FSSC.NODE.UAT" ref="queueListenter"/>
    </rabbit:listener-container>

    <!--定义queue -->
    <rabbit:queue name="FSSC.NODE.UAT"  exclusive="false"  durable="true" />

</beans>  
  • connection-factory:MQ服务器连接信息配置

    host:地址/username:用户/password:密码/port:端口/virtual-host:虚拟主机

  • queueListenter:消息监听类配置

  • listener-containe:配置监听容器

    acknowledge:消息自动确认(auto)或手动确认(manual)

    connection-factory:MQ服务器连接名称

    queue-names:队列名

    ref:监听类名

  • rabbit:queue

    durable:接收到的消息持久化(true)为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失

    exclusive:排他性(false)

生产者配置

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/rabbit
      http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <description>rabbitmq生产者服务配置</description>
    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="" username="xxx" password="xxx" port="5672" virtual-host="xxx" />
    <!--连接配置-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义queue -->
    <rabbit:queue name="FSSC.NODE.UAT" durable="true" auto-delete="false" />

    <!-- 定义fanout exchange,绑定queueTest -->
    <rabbit:fanout-exchange id="amq.fanout" name="amq.fanout" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="FSSC.NODE.UAT"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--创建消息队列模板-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="amq.fanout"   />
    <!--rabbit链接配置文件 -->

</beans>  

基本配置与消费者配置相同,主要是Exchange 模式,绑定队列,我这边使用的是Fanout模式

  • Fanout Exchange(传播订阅者模式)就是一个exchange,可以绑定多个队列,消息到来时同时传给多个队列。

  • topic Exchange

  • direct Exchange (单个交换) 该交换机收到消息后会把消息发送到指定routing-key的队列中

监听队列

@Component
public class QueueListenter implements MessageListener {

    @Override
    public void onMessage(Message msg) {
        try{
            System.out.print(msg.toString());
        }catch(Exception e){
            e.printStackTrace();
        }
    }

kxind

性别:男. 敢于尝试 爱折腾的死宅程序猿. https://github.com/kxinds