消息组件
业务需求
业务系统中为了提高核心服务的并发性,需要对不是强关联的服务之间进行解耦,使用消息实现彼此异步化,保障核心能力。在应对突发高流量的场景时,也需要有异步消息的机制来实现对请求的削峰填谷,保障核心业务不受影响。
解决方案
iuap的消息组件提供基于消息队列RabbitMQ和阿里云 MNS消息服务的功能封装,使用统一的方式来实现点对点消息和发布订阅消息,同时实现了对消息队列的自动监听和消费者调用,使用镜像集群的方式来保证消息的可靠存储,使用ACK机制保证消息的可靠消息,上下文信息自动在生产者和消费者之间传递,开发者只需要关注业务逻辑,更方便实现异步业务。
功能说明
- 支持点对点消息模型;
 - 支持发布订阅消息模型;
 - 同时支持RabbitMQ和阿里云MNS服务;
 - 支持上下文信息在队列生产者和消费者之间传递;
 
使用说明
Maven依赖
<dependency>
    <groupId>com.yonyou.iuap</groupId>
    <artifactId>iuap-mq</artifactId>
    <version>${iuap.modules.version}</version>
</dependency>
iuap.modules.version为在pom.xml定义的需要引用组件的版本。
配置属性文件
在application.properties属性文件中,配置连接信息,根据项目选择配置不同的消息连接方式
#mq
mq.username=admin
mq.password=admin
mq.addresses=localhost:5672
#mns
mns.accountendpoint=http://账号.mns.cn-区域.aliyuncs.com/
mns.accesskeyid=阿里云开发者账号
mns.accesskeysecret=对应开发者账号的secret
调整spring配置文件
如果是RabbitMQ方式,配置消息生产者和消费者对应的spring配置文件,文件中定义消息队列、监听等信息
消息生产者对应的关键bean声明如下,更详细的配置请参考示例工程.
<!-- 连接服务配置  -->
<rabbit:connection-factory id="connectionFactory" addresses="${mq.addresses}"  username="${mq.username}" password="${mq.password}" publisher-confirms="false"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- queue 队列声明-->
<rabbit:queue id="simple_queue" durable="true" auto-delete="false" exclusive="false" name="simple_queue"/>
<!-- exchange queue binging key 绑定,作为点对点模式使用 -->
<rabbit:direct-exchange name="iuap-direct-exchange" durable="true" auto-delete="false" id="iuap-direct-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="simple_queue" key="simple_queue_key"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
<!-- 通用RabbitMQ producer声明 -->
<bean id="rabbitMQProducer" class="com.yonyou.iuap.mq.rabbit.RabbitMQProducer">
    <property name="rabbitTemplate" ref="rabbitTemplate"></property>
</bean>
<!-- 增加失败重试机制,发送失败之后,会尝试重发三次,重发间隔(ms)为 
    第一次 initialInterval 
    此后:initialInterval*multiplier > maxInterval ? maxInterval : initialInterval*multiplier。
    配合集群使用的时候,当mq集群中一个down掉之后,重试机制尝试其他可用的mq。
 -->
<bean id="retryConnTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500"/>
            <property name="multiplier" value="10.0"/>
            <property name="maxInterval" value="5000"/>
        </bean>
    </property>
</bean>
消息消费者方对应的关键bean声明如下,更详细的配置请参考示例工程.
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="simple_queue" ref="queueLitener"/>
</rabbit:listener-container>
<!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <rabbit:listener queues="simple_queue" ref="queueLitener"/>
</rabbit:listener-container>
<bean id="queueLitener" class="com.yonyou.iuap.mq.rabbitmq.MqTestListener"></bean>
如果是阿里云MNS方式,配置MNS所需的客户端和服务,listener中可以定制轮循取消息空闲时候的间隔时间
<bean id="mnsAccount" class="com.aliyun.mns.client.CloudAccount">
    <constructor-arg index="0">           
        <value>${mns.accesskeyid}</value>       
    </constructor-arg>       
    <constructor-arg index="1">           
        <value>${mns.accesskeysecret}</value>       
    </constructor-arg>
    <constructor-arg index="2">           
        <value>${mns.accountendpoint}</value>       
    </constructor-arg>        
</bean>
<bean id="mqService" class="com.yonyou.iuap.mq.mns.AliyunMnsService">
    <property name="mnsAccount" ref="mnsAccount"></property>
</bean>
<bean id="simpleListener" class="com.yonyou.iuap.mq.mns.MnsSimpleListener">
    <constructor-arg index="0">           
        <value>testali-mq-01</value>       
    </constructor-arg>       
    <constructor-arg index="1">           
        <ref bean="mnsAccount"/>    
    </constructor-arg>
    <constructor-arg index="2">           
        <value>5</value>       
    </constructor-arg>
</bean>
API调用
消息发送
业务代码中,在需要发送消息的服务中,引入消息发送的Service,调用发送消息的API
@Autowired
private IMqService mqService;
//rabbitmq
@Test
public void queueMessage1() throws Exception {
    String qName = "iuap-direct-exchange";
    String key = "simple_queue_key";
    String msg = "iuap mq msg test! ";
    mqService.sendMsg(qName, key, msg);
}
//mns
@Test
public void queueMessage2() throws Exception {
    String qName = "testali-mq-01";
    String key = null;
    String msg = "iuap mq msg test! ";
    mqService.sendMsg(qName, key, msg);
}
消息监听
如果是MNS的普通队列的监听,需要编写监听类继承AbstractMessageListener
 public class MnsSimpleListener extends AbstractMessageListener {
    public MnsSimpleListener(String queueName, CloudAccount mnsAccount, int waitSeconds) {
        super(queueName, mnsAccount, waitSeconds);
    }
    @Override
    public void onMessage(Message message) {
        System.out.println(message.getMessageBody());
    }
}
如果是RabbitMQ的普通队列的监听,需要编写监听类实现MessageListener
@Service
public class MqTestListener implements MessageListener{
    private static Logger logger = LoggerFactory.getLogger(MqTestListener.class);
    @Override
    public void onMessage(Message message) {
        logger.info("MQ ======== mq MqTestListener :" + new String(message.getBody()));
    }
}
消息订阅
如果是MNS的topic的监听,需要编写REST服务,供MNS回调,注意MNS要求服务的路径只能映射一层
@Controller
public class MnsTopicListener extends AbstracTopicListener{
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @RequestMapping(value = "/notifications", method = RequestMethod.POST)
    public void receiveMnsTopic(HttpServletRequest request,HttpServletResponse response) {
    //获取消息
    String message = fetchMessage(request,response);
    if(StringUtils.isNotBlank(message)){
        onMessage(message);
    }
    }
        // 业务逻辑,拿到消息体后的处理
    private void onMessage(String message) {
        logger.info(message);
    }
}
更加详细的配置和示例请参考示例工程(DevTool/examples/example_iuap_mq)和官网文档
上下文传递
注意:如果需要消息发送和接收时传递上下文,需要引入iuap-saas-mq,引入方式类似。
Rabbitmq传递上下文信息
消息发送配置
<bean id="rabbitMqContextService" class="com.yonyou.iuap.mq.rabbit.RabbitMqContextService"> <property name="rabbitTemplate" ref="rabbitTemplate"></property> </bean>发送代码
@Resource(name="rabbitMqContextService") IMqService mqService ; /** 发送普通rabbitmq 上下文消息传递*/ @Test public void queueMessageContextTest() throws InterruptedException { long t1 = System.currentTimeMillis(); int msgCount = 2 ; for(int i = 0; i < msgCount; i++){ TestMq t = new TestMq("testmq" + i, "guyz" + i ) ; JsonMapper objectMapper = new JsonMapper(); String msg = objectMapper.toJson(t) ; mqService.sendMsg("iuap-direct-exchange", "simple_queue_key" , msg); } }
消息接收配置
接收方继承类 com.yonyou.iuap.mq.rabbit.consumer.MqSaasListener ,并覆写 handleMessage() 方法。
public class MqSaasTestListener extends MqSaasListener{ private static Logger logger = LoggerFactory.getLogger(MqSaasTestListener.class); @Override //接收消息处理具体业务逻辑 public void handleMessage(String message){ System.out.println( InvocationInfoProxy.getCallid() ); System.out.println( InvocationInfoProxy.getLocale() ); } }
阿里云MNS传递上下文信息
普通队列发送配置
<bean id="mqService" class="com.yonyou.iuap.mq.mns.AliyunMnsSaasService"> <property name="mnsAccount" ref="mnsAccount"></property> </bean>
发送代码
    @Resource(name="mqService") 
    IMqService mqService ;
    //发送普通queue 到ali云 
    @Test
    public void queueMessageContext() throws InterruptedException {
        int msgCount = 4 ;
        for (int i = 0; i < msgCount; i++) {
            TestMq t = new TestMq("testmq" + i, "guyz" + i) ;
            JsonMapper objectMapper = new JsonMapper();
            String msg = objectMapper.toJson(t) ;     
            mqService.sendMsg("testali-mq-01", null, msg);
        }
    }
接收普通queue消息
接收阿里云的普通queue消息是通过主动去阿里获取消息的,需要实现MnsSaasListener类
配置:
<bean id="simpleListener" class="com.yonyou.iuap.mq.mns.MnsSimpleListener"> <constructor-arg index="0"> <value>testali-mq-01</value> </constructor-arg> <constructor-arg index="1"> <ref bean="mnsAccount"/> </constructor-arg> <constructor-arg index="2"> <value>5</value> </constructor-arg> </bean>接收代码样例
    public class MnsSimpleListener extends MnsSaasListener {    
        public MnsSimpleListener(String queueName, CloudAccount mnsAccount, int waitSeconds) {
            super(queueName, mnsAccount, waitSeconds);
        } 
        public void handleMessage(String message){//根据业务需要处理
            System.out.println(   message );
            System.out.println(" ====InvocationInfoProxy.getCallid()  " + InvocationInfoProxy.getCallid() );
        }
        public void afterHandleMessage(){
            InvocationInfoProxy.reset() ; 
        }
    }
阿里云MNS topic类型消息发送
<bean id="mqService" class="com.yonyou.iuap.mq.mns.AliyunMnsSaasService"> <property name="mnsAccount" ref="mnsAccount"></property> </bean>
java代码:
    @Resource(name="mqService") 
    IMqService mqService ;
    //发送给阿里云 topic ,包含上下文信息
    @Test
    public void testTopicMns() throws Exception {
        String topicName = "test-topic-01";
        String msg = "topic context message test! ";
        mqService.publishMsg(topicName, msg);
        System.out.println("topic方式发送消息完成"   );
    }
MNS topic 消息订阅
需要编写REST服务,供MNS回调,注意MNS要求服务的路径只能映射一层,如果为/appname/notifications,出现两个/则不合法
要接收上下文信息需要实现 MnsTopicSaasListener
    @Controller
    public class MnsTopicSaasTestListener extends MnsTopicSaasListener{
        private final Logger logger = LoggerFactory.getLogger(getClass());
        @RequestMapping(value = "/notifications", method = RequestMethod.POST)
        public void receiveMnsTopic(HttpServletRequest request,HttpServletResponse response) {
            super.receiveMnsTopic(request, response);
        }
        // 业务逻辑,拿到消息体后的处理
        @Override
        public void onMessage(String message) {
            //TODO  业务逻辑
            System.out.println("get  topic message :" + message );
            System.out.println(" ====InvocationInfoProxy.getCallid()  " + InvocationInfoProxy.getCallid() );
        }
    }