消息组件
业务需求
业务系统中为了提高核心服务的并发性,需要对不是强关联的服务之间进行解耦,使用消息实现彼此异步化,保障核心能力。在应对突发高流量的场景时,也需要有异步消息的机制来实现对请求的削峰填谷,保障核心业务不受影响。
解决方案
iuap的消息组件提供基于消息队列RabbitMQ和阿里云 MNS消息服务的功能封装,使用统一的方式来实现点对点消息和发布订阅消息,同时实现了对消息队列的自动监听和消费者调用,使用镜像集群的方式来保证消息的可靠存储,使用ACK机制保证消息的可靠消息,上下文信息自动在生产者和消费者之间传递,开发者只需要关注业务逻辑,更方便实现异步业务。
功能说明
- 支持点对点消息模型;
- 支持发布订阅消息模型;
- 同时支持RabbitMQ和阿里云MNS服务;
- 支持上下文信息在队列生产者和消费者之间传递;
使用说明
Maven依赖
<dependency>
<groupId>com.yonyou.iuap</groupId>
<artifactId>iuap-mq</artifactId>
<version>${iuap.modules.version}</version>
</dependency>
iuap.modules.version为在pom.xml定义的需要引用组件的版本。
配置属性文件
在application.properties属性文件中,配置连接信息,根据项目选择配置不同的消息连接方式
#mq
mq.username=admin
mq.password=admin
mq.addresses=localhost:5672
#mns
mns.accountendpoint=http://账号.mns.cn-区域.aliyuncs.com/
mns.accesskeyid=阿里云开发者账号
mns.accesskeysecret=对应开发者账号的secret
调整spring配置文件
如果是RabbitMQ方式,配置消息生产者和消费者对应的spring配置文件,文件中定义消息队列、监听等信息
消息生产者对应的关键bean声明如下,更详细的配置请参考示例工程.
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" addresses="${mq.addresses}" username="${mq.username}" password="${mq.password}" publisher-confirms="false"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- queue 队列声明-->
<rabbit:queue id="simple_queue" durable="true" auto-delete="false" exclusive="false" name="simple_queue"/>
<!-- exchange queue binging key 绑定,作为点对点模式使用 -->
<rabbit:direct-exchange name="iuap-direct-exchange" durable="true" auto-delete="false" id="iuap-direct-exchange">
<rabbit:bindings>
<rabbit:binding queue="simple_queue" key="simple_queue_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 通用RabbitMQ producer声明 -->
<bean id="rabbitMQProducer" class="com.yonyou.iuap.mq.rabbit.RabbitMQProducer">
<property name="rabbitTemplate" ref="rabbitTemplate"></property>
</bean>
<!-- 增加失败重试机制,发送失败之后,会尝试重发三次,重发间隔(ms)为
第一次 initialInterval
此后:initialInterval*multiplier > maxInterval ? maxInterval : initialInterval*multiplier。
配合集群使用的时候,当mq集群中一个down掉之后,重试机制尝试其他可用的mq。
-->
<bean id="retryConnTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval" value="5000"/>
</bean>
</property>
</bean>
消息消费者方对应的关键bean声明如下,更详细的配置请参考示例工程.
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="simple_queue" ref="queueLitener"/>
</rabbit:listener-container>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="simple_queue" ref="queueLitener"/>
</rabbit:listener-container>
<bean id="queueLitener" class="com.yonyou.iuap.mq.rabbitmq.MqTestListener"></bean>
如果是阿里云MNS方式,配置MNS所需的客户端和服务,listener中可以定制轮循取消息空闲时候的间隔时间
<bean id="mnsAccount" class="com.aliyun.mns.client.CloudAccount">
<constructor-arg index="0">
<value>${mns.accesskeyid}</value>
</constructor-arg>
<constructor-arg index="1">
<value>${mns.accesskeysecret}</value>
</constructor-arg>
<constructor-arg index="2">
<value>${mns.accountendpoint}</value>
</constructor-arg>
</bean>
<bean id="mqService" class="com.yonyou.iuap.mq.mns.AliyunMnsService">
<property name="mnsAccount" ref="mnsAccount"></property>
</bean>
<bean id="simpleListener" class="com.yonyou.iuap.mq.mns.MnsSimpleListener">
<constructor-arg index="0">
<value>testali-mq-01</value>
</constructor-arg>
<constructor-arg index="1">
<ref bean="mnsAccount"/>
</constructor-arg>
<constructor-arg index="2">
<value>5</value>
</constructor-arg>
</bean>
API调用
消息发送
业务代码中,在需要发送消息的服务中,引入消息发送的Service,调用发送消息的API
@Autowired
private IMqService mqService;
//rabbitmq
@Test
public void queueMessage1() throws Exception {
String qName = "iuap-direct-exchange";
String key = "simple_queue_key";
String msg = "iuap mq msg test! ";
mqService.sendMsg(qName, key, msg);
}
//mns
@Test
public void queueMessage2() throws Exception {
String qName = "testali-mq-01";
String key = null;
String msg = "iuap mq msg test! ";
mqService.sendMsg(qName, key, msg);
}
消息监听
如果是MNS的普通队列的监听,需要编写监听类继承AbstractMessageListener
public class MnsSimpleListener extends AbstractMessageListener {
public MnsSimpleListener(String queueName, CloudAccount mnsAccount, int waitSeconds) {
super(queueName, mnsAccount, waitSeconds);
}
@Override
public void onMessage(Message message) {
System.out.println(message.getMessageBody());
}
}
如果是RabbitMQ的普通队列的监听,需要编写监听类实现MessageListener
@Service
public class MqTestListener implements MessageListener{
private static Logger logger = LoggerFactory.getLogger(MqTestListener.class);
@Override
public void onMessage(Message message) {
logger.info("MQ ======== mq MqTestListener :" + new String(message.getBody()));
}
}
消息订阅
如果是MNS的topic的监听,需要编写REST服务,供MNS回调,注意MNS要求服务的路径只能映射一层
@Controller
public class MnsTopicListener extends AbstracTopicListener{
private final Logger logger = LoggerFactory.getLogger(getClass());
@RequestMapping(value = "/notifications", method = RequestMethod.POST)
public void receiveMnsTopic(HttpServletRequest request,HttpServletResponse response) {
//获取消息
String message = fetchMessage(request,response);
if(StringUtils.isNotBlank(message)){
onMessage(message);
}
}
// 业务逻辑,拿到消息体后的处理
private void onMessage(String message) {
logger.info(message);
}
}
更加详细的配置和示例请参考示例工程(DevTool/examples/example_iuap_mq)和官网文档
上下文传递
注意:如果需要消息发送和接收时传递上下文,需要引入iuap-saas-mq,引入方式类似。
Rabbitmq传递上下文信息
消息发送配置
<bean id="rabbitMqContextService" class="com.yonyou.iuap.mq.rabbit.RabbitMqContextService"> <property name="rabbitTemplate" ref="rabbitTemplate"></property> </bean>
发送代码
@Resource(name="rabbitMqContextService") IMqService mqService ; /** 发送普通rabbitmq 上下文消息传递*/ @Test public void queueMessageContextTest() throws InterruptedException { long t1 = System.currentTimeMillis(); int msgCount = 2 ; for(int i = 0; i < msgCount; i++){ TestMq t = new TestMq("testmq" + i, "guyz" + i ) ; JsonMapper objectMapper = new JsonMapper(); String msg = objectMapper.toJson(t) ; mqService.sendMsg("iuap-direct-exchange", "simple_queue_key" , msg); } }
消息接收配置
接收方继承类 com.yonyou.iuap.mq.rabbit.consumer.MqSaasListener ,并覆写 handleMessage() 方法。
public class MqSaasTestListener extends MqSaasListener{ private static Logger logger = LoggerFactory.getLogger(MqSaasTestListener.class); @Override //接收消息处理具体业务逻辑 public void handleMessage(String message){ System.out.println( InvocationInfoProxy.getCallid() ); System.out.println( InvocationInfoProxy.getLocale() ); } }
阿里云MNS传递上下文信息
普通队列发送配置
<bean id="mqService" class="com.yonyou.iuap.mq.mns.AliyunMnsSaasService"> <property name="mnsAccount" ref="mnsAccount"></property> </bean>
发送代码
@Resource(name="mqService")
IMqService mqService ;
//发送普通queue 到ali云
@Test
public void queueMessageContext() throws InterruptedException {
int msgCount = 4 ;
for (int i = 0; i < msgCount; i++) {
TestMq t = new TestMq("testmq" + i, "guyz" + i) ;
JsonMapper objectMapper = new JsonMapper();
String msg = objectMapper.toJson(t) ;
mqService.sendMsg("testali-mq-01", null, msg);
}
}
接收普通queue消息
接收阿里云的普通queue消息是通过主动去阿里获取消息的,需要实现MnsSaasListener类
配置:
<bean id="simpleListener" class="com.yonyou.iuap.mq.mns.MnsSimpleListener"> <constructor-arg index="0"> <value>testali-mq-01</value> </constructor-arg> <constructor-arg index="1"> <ref bean="mnsAccount"/> </constructor-arg> <constructor-arg index="2"> <value>5</value> </constructor-arg> </bean>
接收代码样例
public class MnsSimpleListener extends MnsSaasListener {
public MnsSimpleListener(String queueName, CloudAccount mnsAccount, int waitSeconds) {
super(queueName, mnsAccount, waitSeconds);
}
public void handleMessage(String message){//根据业务需要处理
System.out.println( message );
System.out.println(" ====InvocationInfoProxy.getCallid() " + InvocationInfoProxy.getCallid() );
}
public void afterHandleMessage(){
InvocationInfoProxy.reset() ;
}
}
阿里云MNS topic类型消息发送
<bean id="mqService" class="com.yonyou.iuap.mq.mns.AliyunMnsSaasService"> <property name="mnsAccount" ref="mnsAccount"></property> </bean>
java代码:
@Resource(name="mqService")
IMqService mqService ;
//发送给阿里云 topic ,包含上下文信息
@Test
public void testTopicMns() throws Exception {
String topicName = "test-topic-01";
String msg = "topic context message test! ";
mqService.publishMsg(topicName, msg);
System.out.println("topic方式发送消息完成" );
}
MNS topic 消息订阅
需要编写REST服务,供MNS回调,注意MNS要求服务的路径只能映射一层,如果为/appname/notifications,出现两个/则不合法
要接收上下文信息需要实现 MnsTopicSaasListener
@Controller
public class MnsTopicSaasTestListener extends MnsTopicSaasListener{
private final Logger logger = LoggerFactory.getLogger(getClass());
@RequestMapping(value = "/notifications", method = RequestMethod.POST)
public void receiveMnsTopic(HttpServletRequest request,HttpServletResponse response) {
super.receiveMnsTopic(request, response);
}
// 业务逻辑,拿到消息体后的处理
@Override
public void onMessage(String message) {
//TODO 业务逻辑
System.out.println("get topic message :" + message );
System.out.println(" ====InvocationInfoProxy.getCallid() " + InvocationInfoProxy.getCallid() );
}
}