MQFilter

该插件是消息队列插件,主要是将实现消息的收发,收到消息存入到队列中,并通过对队列的监听实现消息的即时消费。

插件配置

  • MQFilter插件所在位置:lib==>be-esb-plugin-jms-2.0.jar==>com==>brilliance==>eibs==>core==>service==>instance==>impl==>MQFilter
  • MQFilter插件需要继承插件共有的父类AbsFilter

1666144275235

  • commons.xml的filters中注册该插件,该插件无parameter属性
<filter id="mqFilter" class="MQFilter"/> <!-- mq收发 -->
属性 说明
id 注册时指定的id命名属性,后续在使用时通过该名称进行插件的调用
class 声明插件时映射的插件类的位置,指向该插件的类文件,注意插件存放的包结构必须严格保持一致才可生效(com.brilliance.eibs.core.service.instance.impl)
  • 在使用MQFilter插件时,需要对MQ的连接MQConnection在commons.xml中进行配置
    • 该插件类的位置和MQFilter插件类位置相同,同样,需要在commons.xml的connections中注册该插件,注意该插件有一些配置属性
    • 该类继承AbsConnection连接插件共同父类
<!--连接MQ的配置 发送配置 暂留并置空 -->
<connection class="MQConnection" id="sendMqConnection1"><!--指定连接类-->
    <property name="ip" value="#{MQ_IP}"/><!--指定连接的ip地址-->
    <property name="port" value="#{MQ_PORT}"/><!--指定连接端口-->
    <property name="queueManager" value="#{MQ_MGR}"/><!--连接的队列管理器-->
    <property name="channel" value="#{MQ_CHN}"/><!--指定连接信道-->
    <property name="queue" value="#{MQ_SEND_QUEUE}"/><!--指定消息发送队列-->
    <property name="CCSID" value="#{MQ_CCSID}"/><!--定义字符集顺序的标识数码-->
</connection>

属性配置

属性 描述 是否必须 示例
ref 引用插件名称 引用commons.xml注册的MQFilter插件指定的id值
type 指定插件功能类型 1.当filter的type为out时,field的value值通过send函数发送信息到mq队列;
2.当filter的type为in时,当需要循环读mq队列信息时,设置foreach的condition条件为#next,可以逐条读取;
3.当filter的type为in时,field的value值通过recevie函数读取当前mq队列信息;

示例

实现MQ消息队列插件的基本使用

<!--======给mq发送消息======-->
<transaction id="mqsendtest" version="2.0">
    <step id="1">
        <!--先进行配置插件的连接-->
        <connection ref="sendMqConnection1"/>
        <!--再利用mq插件进行发送消息-->
        <filter ref="mqFilter" type="out">
            <field value="${__this.send('你好java,好难呀,再发一个')}"/>
        </filter>
    </step>
    <step id="exception">
        <import ref="comException" file="#{COREIMPORT}"/>
    </step>
</transaction>

<!--======从mq接收消息======-->
<transaction id="mqrcvtest" version="2.0">
    <step id="1">
        <!--先进行配置插件的连接-->
        <connection ref="sendMqConnection1"/>
        <!--再利用插件进行消息的获取消费-->
        <filter ref="mqFilter" type="in">
            <!--通过next依次处理队列中的消息-->
            <if condition="${__this.next()}">
                <log value="${__this.receive()}"/>
                <if condition="${!result.success}">
                    <!--消息处理成功则进行commit操作,否则进行回滚-->
                    <field etag="commit"/>
                </if>
            </if>
        </filter>
    </step>
    <step id="exception">
        <import ref="comException" file="#{COREIMPORT}"/>
    </step>
</transaction>

MQ收发String类型的消息

  • MQ发送String类型消息
<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.brilliance.com.cn/interface" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.brilliance.com.cn/interface ../etc/schema/interface.xsd">

    <interface id="mqsend" type="client">
        <transaction id="mqsend">
            <!--发送 -->
            <step id="send">
                <connection  ref="sendMqConnection1" type="in"/>
                <filter ref="mqFilter" type="out">
                        <field value="${__this.send('it is a test')}"/>
                </filter>
            </step>
        </transaction>
    </interface>
</root>
  • MQ接收String类型消息
<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.brilliance.com.cn/interface" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.brilliance.com.cn/interface ../etc/schema/interface.xsd">

    <interface id="actmq" type="server">
        <transaction id="kjmq" version="2.0">
            <!--接收 -->
            <step id="recv">
                <!-- 连接MQ -->
                <connection ref="sendMqConnection1" type="in"/>
                <!-- 取出队列所有消息 -->
                <filter ref="mqFilter" type="in" encoding="UTF-8">
                    <while condition="${__this.next()}">
                        <!-- 接收消息通知,并通过call节点处理消息 -->
              <field tag="reqmsg" value="${__this.receive()}" scope="transaction" />
            <call tag="result" type="chain" transactionName="mqchain"  
             interfaceName="mqchain" scope="transaction">
            <argument value="${reqmsg}"/><!--Data-->
                        </call>
                        <if condition="${!result.success}">
                                 <!--消息处理成功则进行commit操作 -->
                            <field etag="commit"/>
                        </if>
                    </while>
                </filter>
            </step>
        </transaction>
    </interface>
</root>

MQ收发压缩文件zip或者tar

  • MQ发送压缩文件
<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.brilliance.com.cn/interface" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.brilliance.com.cn/interface ../etc/schema/interface.xsd">

    <interface id="mqreceive" type="client">
        <transaction id="mqreceive">
            <!--发送 -->
            <step id="recv">

                <connection  ref="sendMqConnection1" type="in"  />
                <filter ref="mqFilter" type="out">
                        <field value="${__this.sendFile(‘d:/szsb.zip’)}"/>
                </filter>
            </step>
        </transaction>
    </interface>
</root>
  • MQ接收压缩文件
<?xml version="1.0" encoding="UTF-8"?>
<root xmlns="http://www.brilliance.com.cn/interface" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.brilliance.com.cn/interface ../etc/schema/interface.xsd">

    <interface id="mqreceive" type="client">
        <transaction id="mqreceive">
            <!--接收 -->
            <step id="recv">
                <filter type="in" ref="publicFilter">
                    <field tag="i" value="0" scope="transaction" type="int" />
                    <field tag="rcvpath" value="/sstf/xmls/rcv" scope="transaction" />
                </filter>
                <connection  ref="sendMqConnection1" type="in"  />
                <filter ref="mqFilter" type="in">
                    <while condition="${next()}">
                        <field tag="buffpath" value="${rcvpath}/buff${i}.zip" type="String" />
                        <field value="${buffpath}" />
                        <field tag="abc" value="${receiveFile(buffpath)}" />
                        <field tag="i" value="${i+1}" scope="transaction"  />
                    </while>
                </filter>
                <filter type="out" ref="publicFilter">
                    <!--逐个解压接收的文件-->
                    <foreach tag="j" end="${i}">
                        <field tag="buffpath" value="${rcvpath}/buff${j}.zip" />
                        <field tag="aaa" value="${unzipFile(buffpath,rcvpath)}" scope="transaction" />    
                        <!--解压后删除压缩文件-->
                        <field value="${deleteFile(buffpath)}" />
                    </foreach>
                    <if condition="${i==0}">
                        <field value="false" />
                    </if>
                    <else>
                        <field value="true" />
                    </else>
                </filter>
            </step>
        </transaction>
    </interface>
</root>

通过增加监听模式实现自动拉取MQ消息

  • 为了实现mq队列消息的自动消费,可以通过cron定时任务进行定时拉取,还可以通过监听模式来实现
  • 通过在service中添加jmsmq监听模式(基于jms协议的mq队列监听模式),实现发送到mq队列消息的自动监听拉取
<!-- 增加jmsmq监听模式 -->
<service id="jmsService" state="off" type="jms"><!-- 设置为on时表示开启对队列的监听 -->
    <!-- 链接工厂引用,具体配置根据不同MQ厂商配置【必配】-->
    <property name="connectionFactory" value="com.ibm.mq.jms.MQQueueConnectionFactory">
        <argument tag="hostName" value="#{MQ_SERVER}-0"/><!-- 指定MQ服务的ip地址  -->
        <argument tag="port" value="#{MQ_SERVER}-0"/><!-- 指定MQ服务端口号  -->
        <argument tag="CCSID" value="#{MQ_CCSID}"/><!-- 服务器MQ服务使用的编码1381代表GBK、1208代表UTF  -->
        <argument tag="queueManager" value="#{MQ_MGR}"/><!-- 指定队列管理器 -->
        <argument tag="channel" value="#{MQ_CHN}"/><!-- 指定信道 -->
        <argument tag="transportType" value="1"/><!-- 指定传输类型 -->
        <argument tag="username" value="app"/><!-- 指定连接mq用户名 -->
        <argument tag="password" value=""/><!-- 指定连接mq用户密码 -->
    </property>
    <!-- 下面配置基于spring jms而来 -->
    <!-- 设置订阅类型,包括queue,topic,durableTopic,sharedTopic和sharedDurableTopic,【默认:queue】 -->
    <property name="destinationType" value="queue"/>
    <!-- JMS资源缓存级别,包括none,connection,session,consumer和auto,默认是session,建议不要修改 -->
    <!-- <property name="cache" value="connection"/> -->
    <!-- 初始消费者-最大消费者线程数 【默认:1-1】-->
    <property name="concurrency" value="5-10"/>
    <!-- 每个消费者处理多少消息,然后退出线程,后续由taskExecutor重新调度【默认:5】 -->
    <property name="prefetch" value="5"/>
    <!-- 接受mq消息超时时间(毫秒)【默认:1秒】 -->
    <property name="receiveTimeout" value="1000"/>
    <!-- 设置监听的队列/主题名称,支持用逗号配置多个监听队列【必配】 -->
    <property name="destination" value="#{MQ_RECV_QUEUE}"/><!--此处配置多个接收队列,请用,分割-->
    <!-- 下面两个参数指定具体处理消息的接口配置【必配】 -->
    <property name="interfaceName" value="jmslistener"/>
    <property name="transactionName" value="elcsIn"/>
</service>

results matching ""

    No results matching ""