BatchFilter

  • 多线程批量文件解析处理

插件公共配置

    <filters>
        <filter id="publicFilter" class="PublicFilter" />
        <filter id="batchFilter" class="BatchFilter">
            <parameter name="lines" value="10" /><!-- 每个线程读取的行数 -->
            <parameter name="threadnum" value="10" /><!-- 线程池大小 -->
            <parameter name="call_interface" value="test1" /><!-- 处理业务的接口 -->
            <parameter name="call_transaction" value="BATCH_CALL" /><!-- 处理业务的交易 -->
        </filter>
        <filter id="fileFilter" class="FileFilter">
            <parameter name="record_strategy" value="bysplit" /><!--行与行策略 -->
            <parameter name="record" value="\r\n" /><!--行与行换行符隔开 -->
            <parameter name="field_strategy" value="bysplit" /><!--列于列策略 -->
                <parameter name="field" value="|+|" /><!--列之间的分隔符 -->
            </filter>    
    </filters>
  • batchfilter的属性配置:
属性 描述 值类型 示例
id 标识 string batchFilter
type 实例 string BatchFilter
  • parameter:配置参数项。属性配置:
属性 描述 值类型 示例
Name 标识 string lines、threadnum、call_interface、call_transaction
value string 当前字段的值

接口配置实现

属性 描述 值类型 示例
ref 引用 string 引用已经定义好的batchFilter对象
type 数据流向 string in
<transaction id="testbatch" detail="e:/total.txt">
            <step id="1">
                <!-- 读取批量文件 -->
                <connection ref="fileConnection" type="in">
                    <property name="path" value="e:/123.txt" />
                    <property name="type" value="list" /><!-- 按行读取为List<String>,如果不配则整个文件读到一个String -->
                </connection>
<!-- 分多线程处理,每个线程产生一个结果,所有结果存放在一个list里面 -->
                <filter ref="batchFilter" type="in">
                </filter>
                <filter ref="publicFilter" type="in">
                    <!-- 获取所有结果List -->
                    <field tag="results" value="${this}" scope="step" />
                    <!-- 获取结果个数 -->
                    <field tag="len" value="${getArrayLen()}" scope="transaction">
                        <argument id="1" value="${results}" />
                    </field>
                </filter>
                <foreach tag="i" begin="0" end="${len}" step="1">
                    <!-- 获取所有结果,这个过程是阻塞的,确保所有线程执行完后返回 -->
                    <filter ref="publicFilter" type="in">
                        <!-- 获取子线程call返回的ResultMsg对象 -->
                        <field tag="result[${i}]" value="${getFutureValue()}" scope="step">
                            <argument id="1" value="${results[i]}" />
                        </field>
                        <if condition="${result[i].success}">
                            <log value="${result[i].content}" />
                            <log value="子线程${i}执行成功\r\n" />
                        </if>
                        <else>
                            <log value="${result[i].content}" />
                            <log value="子线程${i}执行失败\r\n" />
                        </else>
                    </filter>
                </foreach>
            </step>
        </transaction>
        <transaction id="BATCH_CALL" detail="">
            <process>
                <step id="ready" />
                <step id="handle" />
                <exception id="exception" />
            </process>
            <step id="ready">
                <filter ref="publicFilter" type="in">
                    <!-- 当前行 -->
                    <field tag="curLine" value="${this[1]}" scope="transaction" />
                    <!-- 子线程需要处理的文件内容 -->
                    <field value="${this[0]}" scope="this" />
                </filter>
            </step>
            <step id="handle">
                <filter ref="publicFilter" type="in">
                    <field tag="len" value="${getArrayLen()}" scope="step">
                        <argument id="1" value="${this}" />
                    </field>
                </filter>
                <filter ref="fileFilter" type="in">
                    <foreach tag="i" begin="0" end="${len}" step="1" condition="${next()}">
                        <log value="正在执行第${i+curLine}行\n" />
                        <field tag="t1_no[${i}]]" etag="0" type="int" scope="transaction" />
                        <field tag="t2_no[${i}]]" etag="0" type="int" scope="transaction" />
                    </foreach>
                </filter>
                <filter ref="publicFilter" type="in">
                    <field value="${super.logbuffer}" scope="this" />
                </filter>
            </step>
            <step id="exception">
                <!-- 执行失败,返回错误信息 -->
                <filter ref="publicFilter" type="in">
                    <field value="第${curLine}行到第${curLine+rows-1}行执行失败\n" scope="this" />
                </filter>
            </step>
        </transaction>

results matching ""

    No results matching ""