package com.brilliance.eibs.server; import com.brilliance.eibs.core.model.IServiceDef; import com.brilliance.eibs.core.service.Context; import com.brilliance.eibs.el.ApacheEL; import com.brilliance.eibs.main.Client; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class RocketMQServer extends AbsServer { private DefaultMQPushConsumer consumer; public RocketMQServer(Context context, IServiceDef serviceDef, ApacheEL elParser) { super(context, serviceDef, elParser); } @Override public void run() { logger.info( LOG_FLAG + "RocketMQServer starting ." + LOG_FLAG); final String interfaceName = getRequiredPropertyValue("interfaceName"); final String transactionName = getRequiredPropertyValue("transactionName"); String groupName = getRequiredPropertyValue("groupName"); String serverAddress = getRequiredPropertyValue("serverAddress"); String topic = getRequiredPropertyValue("topic"); String subExpression = getPropertyValue("subExpression", "*"); // boolean isPush = getPropertyValue("push",false); try { // Instantiate with specified consumer group name. consumer = new DefaultMQPushConsumer(groupName); // Specify name server addresses. consumer.setNamesrvAddr(serverAddress); // Subscribe one more more topics to consume. consumer.subscribe(topic, subExpression); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { Client client = new Client(); client.call(interfaceName, transactionName, new Object[] {msgs}); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Launch the consumer instance. consumer.start(); } catch (MQClientException e) { logger.error( LOG_FLAG + "RocketMQServer error." + LOG_FLAG, e); close(); } logger.info( LOG_FLAG + "RocketMQServer is finished." + LOG_FLAG); } @Override public void close() { if (null != consumer) consumer.shutdown(); } }