package com.brilliance.eibs.server;

import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.Client;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMQServer extends AbsServer {

    private DefaultMQPushConsumer consumer;

    public RocketMQServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
        super(context, serviceDef, elParser);
    }

    @Override
    public void run() {
        logger.info( LOG_FLAG + "RocketMQServer starting ." + LOG_FLAG);

        final String interfaceName = getRequiredPropertyValue("interfaceName");
        final String transactionName = getRequiredPropertyValue("transactionName");

        String groupName = getRequiredPropertyValue("groupName");
        String serverAddress = getRequiredPropertyValue("serverAddress");
        String topic = getRequiredPropertyValue("topic");
        String subExpression = getPropertyValue("subExpression", "*");
        // boolean isPush = getPropertyValue("push",false);
        try {
            // Instantiate with specified consumer group name.
            consumer = new DefaultMQPushConsumer(groupName);

            // Specify name server addresses.
            consumer.setNamesrvAddr(serverAddress);

            // Subscribe one more more topics to consume.
            consumer.subscribe(topic, subExpression);

            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {

                    Client client = new Client();
                    client.call(interfaceName, transactionName, new Object[] {msgs});

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            // Launch the consumer instance.
            consumer.start();
        } catch (MQClientException e) {
            logger.error( LOG_FLAG + "RocketMQServer error." + LOG_FLAG, e);
            close();
        }

        logger.info( LOG_FLAG + "RocketMQServer is finished." + LOG_FLAG);
    }

    @Override
    public void close() {
        if (null != consumer)
            consumer.shutdown();
    }

}