Commit 09b1071c by s_guodong

代码整理(日志打印,去掉注释掉的代码,if等加大括号...)

parent 2ff433cd
......@@ -18,66 +18,62 @@ import java.util.concurrent.Future;
public class BatchFilter extends AbsFilter {
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "BatchFiter[批处理] is running." +
// LOG_FLAG);
propertySaveToContext();
String threadnumStr = (String) context.getVariable("threadnum");
String linesStr = (String) context.getVariable("lines");
String interfaceName = (String) context.getVariable("call_interface");
String transactionName = (String) context.getVariable("call_transaction");
int threadNum = Integer.parseInt(threadnumStr);
int eachReadLine = Integer.parseInt(linesStr);
List<String> contentList;
Object object = context.getObject();
if (object instanceof List) {
contentList = (List<String>) object;
} else {
String filepath = (String) context.getVariable("filepath");
FileInputStream fis = null;
try {
fis = new FileInputStream(filepath);
contentList = IOUtils.readLines(fis);
} catch (FileNotFoundException e) {
throw new InterfaceException("00601", "file [" + filepath + "] not found.", e);
} catch (IOException e) {
throw new InterfaceException("00602", "file [" + filepath + "] read error.", e);
} finally {
if (fis != null)
IOUtils.closeQuietly(fis);
}
}
String execName = "batchFilter_" + interfaceName + "_" + transactionName;
ExecutorService exec = ThreadPoolFactory.getFixedExecutor(threadNum, execName);
int lineCount = contentList.size();
int curLine = 0;
List<Future<ResultMsg>> resultMsgList = new ArrayList<Future<ResultMsg>>();
int count = 0;
while (curLine < lineCount) {
List<String> list = new ArrayList<String>();
for (int i = 0; i < eachReadLine; i++) {
if (curLine >= lineCount)
break;
list.add(contentList.get(curLine));
curLine++;
}
Future<ResultMsg> futureResultMsg = exec.submit(new CallableClient(interfaceName, transactionName, new Object[] { list,
count * eachReadLine + 1, context }));
count++;
resultMsgList.add(futureResultMsg);
}
context.setObject(resultMsgList);
// exec.shutdown();
ThreadPoolFactory.shutdown(execName);
@Override
public void execute(Context context) {
propertySaveToContext();
String threadnumStr = (String) context.getVariable("threadnum");
String linesStr = (String) context.getVariable("lines");
String interfaceName = (String) context.getVariable("call_interface");
String transactionName = (String) context.getVariable("call_transaction");
int threadNum = Integer.parseInt(threadnumStr);
int eachReadLine = Integer.parseInt(linesStr);
List<String> contentList;
Object object = context.getObject();
if (object instanceof List) {
contentList = (List<String>) object;
} else {
String filepath = (String) context.getVariable("filepath");
FileInputStream fis = null;
try {
fis = new FileInputStream(filepath);
contentList = IOUtils.readLines(fis);
} catch (FileNotFoundException e) {
throw new InterfaceException("00601", "file [" + filepath + "] not found.", e);
} catch (IOException e) {
throw new InterfaceException("00602", "file [" + filepath + "] read error.", e);
} finally {
if (fis != null) {
IOUtils.closeQuietly(fis);
}
}
}
String execName = "batchFilter_" + interfaceName + "_" + transactionName;
ExecutorService exec = ThreadPoolFactory.getFixedExecutor(threadNum, execName);
int lineCount = contentList.size();
int curLine = 0;
List<Future<ResultMsg>> resultMsgList = new ArrayList<Future<ResultMsg>>();
int count = 0;
while (curLine < lineCount) {
List<String> list = new ArrayList<String>();
for (int i = 0; i < eachReadLine; i++) {
if (curLine >= lineCount) {
break;
}
list.add(contentList.get(curLine));
curLine++;
}
Future<ResultMsg> futureResultMsg = exec.submit(new CallableClient(interfaceName, transactionName, new Object[]{list,
count * eachReadLine + 1, context}));
count++;
resultMsgList.add(futureResultMsg);
}
context.setObject(resultMsgList);
ThreadPoolFactory.shutdown(execName);
}
// logger.info( LOG_FLAG +
// "BatchFiter[批处理] wish to finish running" + LOG_FLAG);
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
}
......@@ -17,108 +17,70 @@ import java.util.concurrent.Future;
public class BatchFilter2 extends AbsFilter {
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "BatchFiter[批处理] is running." +
// LOG_FLAG);
// System.out.println(new Date().getMinutes() + ":" + new
// Date().getTime());
propertySaveToContext();
String threadnumStr = (String) context.getVariable("threadnum");
String linesStr = (String) context.getVariable("lines");
String interfaceName = (String) context.getVariable("call_interface");
String transactionName = (String) context.getVariable("call_transaction");
String filepath = (String) context.getVariable("filepath");
int threadNum = Integer.parseInt(threadnumStr);
int eachReadLine = Integer.parseInt(linesStr);
File f = new File(filepath);
String execName = "batchFilter2_" + interfaceName + "_" + transactionName;
ExecutorService exec = ThreadPoolFactory.getFixedExecutor(threadNum, execName);
// 文件总行数
int lineCount = getLinesOfFile(f);
// 记录文件当前行数
int curLine = 1;
List<Future<ResultMsg>> resultMsgList = new ArrayList<Future<ResultMsg>>();
@Override
public void execute(Context context) {
propertySaveToContext();
String threadnumStr = (String) context.getVariable("threadnum");
String linesStr = (String) context.getVariable("lines");
String interfaceName = (String) context.getVariable("call_interface");
String transactionName = (String) context.getVariable("call_transaction");
String filepath = (String) context.getVariable("filepath");
int threadNum = Integer.parseInt(threadnumStr);
int eachReadLine = Integer.parseInt(linesStr);
File f = new File(filepath);
String execName = "batchFilter2_" + interfaceName + "_" + transactionName;
ExecutorService exec = ThreadPoolFactory.getFixedExecutor(threadNum, execName);
// 文件总行数
int lineCount = getLinesOfFile(f);
// 记录文件当前行数
int curLine = 1;
List<Future<ResultMsg>> resultMsgList = new ArrayList<Future<ResultMsg>>();
while (curLine <= lineCount) {
int endLine = curLine + eachReadLine;
int subThreadReadCount = eachReadLine;
if (endLine > lineCount)
subThreadReadCount = lineCount - curLine + 1;
Future<ResultMsg> futureResultMsg = exec.submit(new CallableClient(interfaceName, transactionName, new Object[] { filepath, curLine,
subThreadReadCount }));
curLine = endLine;
resultMsgList.add(futureResultMsg);
}
while (curLine <= lineCount) {
int endLine = curLine + eachReadLine;
int subThreadReadCount = eachReadLine;
if (endLine > lineCount) {
subThreadReadCount = lineCount - curLine + 1;
}
Future<ResultMsg> futureResultMsg = exec.submit(new CallableClient(interfaceName, transactionName, new Object[]{filepath, curLine,
subThreadReadCount}));
curLine = endLine;
resultMsgList.add(futureResultMsg);
}
context.setObject(resultMsgList);
ThreadPoolFactory.shutdown(execName);
}
context.setObject(resultMsgList);
// exec.shutdown();
ThreadPoolFactory.shutdown(execName);
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
/*
* propertySaveToContext(); String threadnumStr = (String)
* context.getVariable("threadnum"); String linesStr = (String)
* context.getVariable("lines"); String interfaceName = (String)
* context.getVariable("call_interface"); String transactionName =
* (String) context.getVariable("call_transaction"); int threadNum =
* Integer.parseInt(threadnumStr); int eachReadLine =
* Integer.parseInt(linesStr); List<String> contentList; Object object =
* context.getObject(); if (object instanceof List) { contentList =
* (List<String>) object; } else { String filepath = (String)
* context.getVariable("filepath"); try { contentList =
* IOUtils.readLines(new FileInputStream(filepath)); } catch
* (FileNotFoundException e) { throw new InterfaceException("00601",
* "file [" + filepath + "] not found.", e); } catch (IOException e) {
* throw new InterfaceException("00602", "file [" + filepath +
* "] read error.", e); } } ExecutorService exec =
* Executors.newFixedThreadPool(threadNum); int lineCount =
* contentList.size(); int curLine = 0; List<Future<ResultMsg>>
* resultMsgList = new ArrayList<Future<ResultMsg>>(); int count = 0;
* while (curLine < lineCount) { List<String> list = new
* ArrayList<String>(); for (int i = 0; i < eachReadLine; i++) { if
* (curLine >= lineCount) break; list.add(contentList.get(curLine));
* curLine++; } Future<ResultMsg> futureResultMsg = exec.submit(new
* CallableClient(interfaceName, transactionName, new Object[]{list,
* count * eachReadLine + 1})); count++;
* resultMsgList.add(futureResultMsg); }
* context.setObject(resultMsgList); exec.shutdown();
*/
/**
* 快速计算文件的行数
**/
public int getLinesOfFile(File f) {
int lines = 0;
long fileLength = f.length();
LineNumberReader rf = null;
try {
rf = new LineNumberReader(new FileReader(f));
if (rf != null) {
rf.skip(fileLength);
lines = rf.getLineNumber() + 1;
logger.info("文件行数为:" + lines);
rf.close();
}
// logger.info( LOG_FLAG +
// "BatchFiter[批处理] wish to finish running" + LOG_FLAG);
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
/**
* 快速计算文件的行数
* **/
public int getLinesOfFile(File f) {
int lines = 0;
long fileLength = f.length();
LineNumberReader rf = null;
try {
rf = new LineNumberReader(new FileReader(f));
if (rf != null) {
rf.skip(fileLength);
lines = rf.getLineNumber() + 1;
logger.info( "文件行数为:" + lines);
rf.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
rf.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return lines;
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
try {
rf.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return lines;
}
}
......@@ -3,7 +3,6 @@ package com.brilliance.eibs.core.service.instance.impl;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IFieldDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.impl.AbsFilter;
import com.brilliance.eibs.util.StringUtil;
import java.beans.BeanInfo;
......@@ -20,16 +19,13 @@ public class BeanFilter extends AbsFilter {
private Class<?> typeClass;
private Object object;
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "BeanFilter is begining to runnig." +
// LOG_FLAG);
context.setCurrentInstance(this);
String type = getFilterDef().getType();
if (type.equals("out")) {
// logger.info( LOG_FLAG + "BeanFilter组装" + LOG_FLAG);
String classname = getRequiredPropertyValue("classname");
try {
// typeClass = ClassPathUpdater.dynamicLoadClass(classname);
typeClass = Class.forName(classname);
} catch (Exception e) {
throw new InterfaceException("00701", "class [" + classname + "] not found.", e);
......@@ -43,20 +39,17 @@ public class BeanFilter extends AbsFilter {
super.execute(context);
if (type.equals("out")) {
context.setObject(object);
// saveToContext(filterDef.getScope(), filterDef.getTag(), object);
} else {
// logger.info( LOG_FLAG + "BeanFilter解析" + LOG_FLAG);
}
// logger.info( LOG_FLAG + "BeanFilter has finished running." +
// LOG_FLAG);
}
@Override
protected void packField(IFieldDef fieldDef, Object fieldValue) {
fieldValue = dealValueByType(fieldValue, fieldDef.getType());
String etag = (String) elParser.getExPression(context, fieldDef.getEtag(), null);
if (StringUtil.isEmpty(etag))
if (StringUtil.isEmpty(etag)) {
return;
}
BeanInfo beanInfo = null;
try {
beanInfo = Introspector.getBeanInfo(typeClass);
......@@ -78,18 +71,21 @@ public class BeanFilter extends AbsFilter {
try {
if (fieldValue instanceof List && "add".equals(context.getVariable("handle_list"))) {
List<Object> list = (List<Object>) descriptor.getReadMethod().invoke(object, (Object[]) null);
if (list != null)
if (list != null) {
list.addAll((List<Object>) fieldValue);
} else
}
} else {
descriptor.getWriteMethod().invoke(object, args);
}
} catch (Exception e) {
throw new InterfaceException("00704", "write property [" + propertyName + "] error", e);
}
break;
}
}
if (j >= propertyDescriptors.length)
if (j >= propertyDescriptors.length) {
throw new InterfaceException("00707", "property [" + etag + "] of class [" + typeClass + "] not exists.");
}
}
@Override
......
......@@ -10,126 +10,100 @@ import java.io.InputStream;
/**
* 命令行连接
*
*
* @author gechengyang
*
*/
public class CommandConnection extends AbsConnection {
private String type = "";
private String encode = "GBK";
private String type = "";
private String encode = "GBK";
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "CommandConnection is running." +
// LOG_FLAG);
encode = connectionDef.getEncoding();
@Override
public void execute(Context context) {
encode = connectionDef.getEncoding();
context.setCurrentInstance(this);
String command = getPropertyValue("command", false);
// 是否有返回值
type = getPropertyValue("type", true);
int success = 1;
if (!StringUtil.isEmpty(command)) {
success = executeShell(command, context);
logger.info("shell [" + command + "] invoke " + (success == 0 ? true : false));
}
if (success == 0) {
context.getResultMsg().setSuccess(true);
} else {
context.getResultMsg().setSuccess(false);
}
}
context.setCurrentInstance(this);
// String command = connectionDef.getProperty("command").getValue();
// /command=getPropertyValue(command, false);
String command = getPropertyValue("command", false);
// 是否有返回值
type = getPropertyValue("type", true);
int success = 1;
if (!StringUtil.isEmpty(command)) {
success = executeShell(command, context);
logger.info( "shell [" + command + "] invoke " + (success == 0 ? true : false));
}
if (success == 0)
context.getResultMsg().setSuccess(true);
else
context.getResultMsg().setSuccess(false);
// logger.info( LOG_FLAG +
// "CommandConnection has finished running." + LOG_FLAG);
}
public int executeShell(String shellCommand, Context context) {
shellCommand = shellCommand.trim();
shellCommand = shellCommand.replaceAll("\\s+", " ");
logger.info("[command is]" + shellCommand);
int success = 0;
StringBuffer stringBuffer = new StringBuffer();
InputStream in = null;
InputStream errorin = null;
int a = 0;
try {
Process pid = null;
Runtime runtime = Runtime.getRuntime();
if (shellCommand.contains("*")) {
logger.debug("[command include *]");
pid = runtime.exec(new String[]{"/bin/sh", "-c", shellCommand}, null, null);
} else if (!shellCommand.contains("|")) {
String[] cmdarray = shellCommand.split("\\ ");
logger.info("[exec is ready]" + CommonFunctionUtils.toJson(cmdarray));
pid = runtime.exec(cmdarray);
} else {
pid = runtime.exec(new String[]{"/bin/sh", "-c", shellCommand}, null, null);
}
// 执行Shell命令
if (pid != null) {
in = pid.getInputStream();
errorin = pid.getErrorStream();
stringBuffer.append(loadStream(in) + "\r\n");
String error = loadStream(errorin);
logger.info("error=" + error);
stringBuffer.append(error + "\r\n");
a = pid.waitFor();
logger.info("pid flag=" + a);
} else {
stringBuffer.append("没有pid\r\n");
}
} catch (Exception ioe) {
logger.info("ioeerror", ioe);
success = 1;
stringBuffer.append("执行Shell命令时发生异常:\r\n").append(ioe.getMessage()).append("\r\n");
throw new InterfaceException("00801", "read file error.", ioe);
} finally {
String shellMsg = stringBuffer.toString().trim();
logger.info("shell mssage:" + shellMsg);
if (!StringUtil.isEmpty(type)) {
type = type.toLowerCase();
if (type.equals("string")) {
context.setObject(shellMsg);
} else if (type.equals("int")) {
context.setObject(Integer.parseInt(shellMsg));
}
}
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(errorin);
}
return success;
}
public int executeShell(String shellCommand, Context context) {
shellCommand = shellCommand.trim();
shellCommand = shellCommand.replaceAll("\\s+", " ");
logger.info( "[command is]" + shellCommand);
int success = 0;
StringBuffer stringBuffer = new StringBuffer();
// 格式化日期时间,记录日志时使用
// DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss ");
InputStream in = null;
InputStream errorin = null;
int a = 0;
try {
// stringBuffer.append(dateFormat.format(new
// Date())).append("准备执行Shell命令 ").append(shellCommand).append(" \r\n");
Process pid = null;
Runtime runtime = Runtime.getRuntime();
if (shellCommand.contains("*")) {
logger.debug( "[command include *]");
pid = runtime.exec(new String[] { "/bin/sh", "-c", shellCommand }, null, null);
} else if (!shellCommand.contains("|")) {
String[] cmdarray = shellCommand.split("\\ ");// 空格
logger.info( "[exec is ready]" + CommonFunctionUtils.toJson(cmdarray));
pid = runtime.exec(cmdarray);
} else {
pid = runtime.exec(new String[] { "/bin/sh", "-c", shellCommand }, null, null);
}
// 执行Shell命令
if (pid != null) {
// stringBuffer.append("进程号:").append(pid.toString()).append("\r\n");
in = pid.getInputStream();
errorin = pid.getErrorStream();
stringBuffer.append(loadStream(in) + "\r\n");
String error = loadStream(errorin);
logger.info( "error=" + error);
stringBuffer.append(error + "\r\n");
a = pid.waitFor();
logger.info( "pid flag=" + a);
} else {
stringBuffer.append("没有pid\r\n");
}
} catch (Exception ioe) {
logger.info( "ioeerror", ioe);
success = 1;
// stringBuffer.append("执行Shell命令时发生异常:\r\n").append(ioe.getMessage()).append("\r\n");
stringBuffer.append("执行Shell命令时发生异常:\r\n").append(ioe.getMessage()).append("\r\n");
throw new InterfaceException("00801", "read file error.", ioe);
} finally {
String shellMsg = stringBuffer.toString().trim();
logger.info( "shell mssage:" + shellMsg);
if (!StringUtil.isEmpty(type)) {
type = type.toLowerCase();
if (type.equals("string")) {
context.setObject(shellMsg);
} else if (type.equals("int")) {
context.setObject(Integer.parseInt(shellMsg));
}
}
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(errorin);
}
/*
* if (a != 0) throw new InterfaceException("00801", "error accured");
*/
return success;
}
public String loadStream(InputStream in) {
String str = "";
try {
str = IOUtils.toString(in, encode);
} catch (Exception e) {
logger.error( "loadStream", e);
// TODO: handle exception
} finally {
IOUtils.closeQuietly(in);
}
/*
* in = new BufferedInputStream(in); StringBuffer buffer = new
* StringBuffer(); try { while ((ptr = in.read()) != -1) {
* buffer.append((char) ptr); } } catch (IOException e) { // TODO
* Auto-generated catch block e.printStackTrace(); } return
* buffer.toString();
*/
return str;
}
public String loadStream(InputStream in) {
String str = "";
try {
str = IOUtils.toString(in, encode);
} catch (Exception e) {
logger.error("loadStream", e);
} finally {
IOUtils.closeQuietly(in);
}
return str;
}
}
......@@ -19,8 +19,7 @@ public class ExcelTempateFilter extends AbsFilter {
context.setCurrentInstance(this);
String type = getType();
if (!WRITE_FLG.equals(type)) // type="out"
{
if (!WRITE_FLG.equals(type)) {
throw new InterfaceException("03401", "不支持的操作类型");
}
Object content = context.getObject();
......@@ -39,23 +38,20 @@ public class ExcelTempateFilter extends AbsFilter {
@Override
public Object getFieldValue(IFieldDef fieldDef) throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
protected void packField(IFieldDef fieldDef, Object fieldValue) {
String etag = elParser.getExPression(context, fieldDef.getEtag(), null).toString();
boolean defaultStyle = true;
if (!StringUtil.isEmpty(fieldDef.getTag())) {
defaultStyle = false;
}
String[] targetIndexs = etag.split(",");
if (targetIndexs.length < 2) {
throw new InterfaceException("03404", "etag坐标数据格式错误");
}
int yIndex = 0;
int xIndex = 0;
try {
......
......@@ -2,7 +2,6 @@ package com.brilliance.eibs.core.service.instance.impl;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.impl.AbsConnection;
import com.brilliance.eibs.util.StringUtil;
import org.apache.commons.io.IOUtils;
......@@ -11,160 +10,144 @@ import java.util.List;
/**
* 文件连接,提供对文件的读取保存功能
*
* @author hujun
*
* @author hujun
*/
public class FileConnection extends AbsConnection {
private static final String EXCEL_SUFFIX = "excel";
private static final String XLS_EXCEL_SUFFIX = "xls";
private static final String XLSX_EXCEL_SUFFIX = "xlsx";
@Override
public void execute(Context context) {
// logger.debug( LOG_FLAG + "FileConnection is running ." +
// LOG_FLAG);
context.setCurrentInstance(this);
// 获取操作类型
String type = getType();
// 获取文件路径
String path = getPropertyValue(PATH_SYMBOL);
if (isRead(type)) {
logger.debug( "读取文件[" + path + "].");
try {
doRead(path);
} catch (IOException e) {
throw new InterfaceException("00801", "read file error.", e);
}
}
if (isWrite(type)) {
logger.debug( "保存文件[" + path + "].");
try {
doWrite(path);
} catch (IOException e) {
throw new InterfaceException("00802", "write to file error.", e);
}
}
// logger.debug( LOG_FLAG + "FileConnection is finished." +
// LOG_FLAG);
}
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
// 获取操作类型
String type = getType();
// 获取文件路径
String path = getPropertyValue(PATH_SYMBOL);
if (isRead(type)) {
logger.debug("读取文件[" + path + "].");
try {
doRead(path);
} catch (IOException e) {
throw new InterfaceException("00801", "read file error.", e);
}
}
if (isWrite(type)) {
logger.debug("保存文件[" + path + "].");
try {
doWrite(path);
} catch (IOException e) {
throw new InterfaceException("00802", "write to file error.", e);
}
}
}
private boolean isRead(String type) {
return READ_FLG.equals(type);
}
private boolean isRead(String type) {
return READ_FLG.equals(type);
}
private boolean isWrite(String type) {
return WRITE_FLG.equals(type);
}
private boolean isWrite(String type) {
return WRITE_FLG.equals(type);
}
private void doRead(String path) throws IOException {
Object content = read(path);
// logger.debug("Read the contents is " + content + ".");
// 读取文件写入上下文
context.setObject(content);
}
private void doRead(String path) throws IOException {
Object content = read(path);
// 读取文件写入上下文
context.setObject(content);
}
private void doWrite(String path) throws IOException {
// 获取需要写入的文件
File file = new File(path);
boolean append = Boolean.valueOf(getPropertyValue("append", true));
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
if (!append || !file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
throw new InterfaceException("00803", "create new file error.", e);
}
}
Object obj = context.getObject();
if (obj == null || obj.toString().equals("")) {
logger.warn( "No content to write.");
}
// logger.trace("Write the contents is " +
// System.getProperty("line.separator") + obj.toString());
write(file, obj, append);
}
private void doWrite(String path) throws IOException {
// 获取需要写入的文件
File file = new File(path);
boolean append = Boolean.valueOf(getPropertyValue("append", true));
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
if (!append || !file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
throw new InterfaceException("00803", "create new file error.", e);
}
}
Object obj = context.getObject();
if (obj == null || obj.toString().equals("")) {
logger.warn("No content to write.");
}
write(file, obj, append);
}
/**
* 文件写入操作
*
* @param path
* 文件路徑
* @param content
* 文件内容
* @throws IOException
*/
private void write(File file, Object content, boolean isAppend) throws IOException {
FileOutputStream fos = new FileOutputStream(file, isAppend);
try {
if (content instanceof byte[]) {
logger.info( "file 类型:二进制文件");
IOUtils.write((byte[]) content, fos);
logger.info( "文件长度是=" + ((byte[]) content).length);
return;
}
PrintWriter pw = null;
try {
pw = new PrintWriter(new OutputStreamWriter(fos, connectionDef.getEncoding()), true);
pw.print(content);
pw.flush();
} finally {
IOUtils.closeQuietly(pw);
}
} finally {
IOUtils.closeQuietly(fos);
}
}
/**
* 文件写入操作
*
* @param file 文件路徑
* @param content 文件内容
* @param isAppend
* @throws IOException
*/
private void write(File file, Object content, boolean isAppend) throws IOException {
FileOutputStream fos = new FileOutputStream(file, isAppend);
try {
if (content instanceof byte[]) {
logger.info("file 类型:二进制文件");
IOUtils.write((byte[]) content, fos);
logger.info("文件长度是=" + ((byte[]) content).length);
return;
}
PrintWriter pw = null;
try {
pw = new PrintWriter(new OutputStreamWriter(fos, connectionDef.getEncoding()), true);
pw.print(content);
pw.flush();
} finally {
IOUtils.closeQuietly(pw);
}
} finally {
IOUtils.closeQuietly(fos);
}
}
/**
* 文件读取操作
*
* @param path
* 文件路徑
* @return 文件内容
* @throws IOException
*/
private Object read(String path) throws IOException {
BufferedReader br = null;
FileInputStream fis = null;
try {
String type = getPropertyValue("type", true);
fis = new FileInputStream(path);
if ("list".equals(type)) {
List<String> list = IOUtils.readLines(fis, connectionDef.getEncoding());
IOUtils.closeQuietly(fis);
String filterLines = getPropertyValue("removeLines", true);
if (!StringUtil.isEmpty(filterLines)) {
String[] lines = filterLines.split(",");
for (String line : lines) {
list.remove(Integer.parseInt(line));
}
}
return list;
}
if ("byte".equals(type)) {
File file = new File(path);
int length = (int) file.length();
byte[] bytes = new byte[length];
IOUtils.readFully(fis, bytes);
return bytes;
}
/**
* 文件读取操作
*
* @param path 文件路徑
* @return 文件内容
* @throws IOException
*/
private Object read(String path) throws IOException {
BufferedReader br = null;
FileInputStream fis = null;
try {
String type = getPropertyValue("type", true);
fis = new FileInputStream(path);
if ("list".equals(type)) {
List<String> list = IOUtils.readLines(fis, connectionDef.getEncoding());
IOUtils.closeQuietly(fis);
String filterLines = getPropertyValue("removeLines", true);
if (!StringUtil.isEmpty(filterLines)) {
String[] lines = filterLines.split(",");
for (String line : lines) {
list.remove(Integer.parseInt(line));
}
}
return list;
}
if ("byte".equals(type)) {
File file = new File(path);
int length = (int) file.length();
byte[] bytes = new byte[length];
IOUtils.readFully(fis, bytes);
return bytes;
}
StringBuffer s = new StringBuffer();
br = new BufferedReader(new InputStreamReader(fis, connectionDef.getEncoding()));
int tempchar;
while ((tempchar = br.read()) != -1) {
s.append((char) tempchar);
}
return s.toString();
} finally {
IOUtils.closeQuietly(fis);
IOUtils.closeQuietly(br);
}
}
StringBuffer s = new StringBuffer();
br = new BufferedReader(new InputStreamReader(fis, connectionDef.getEncoding()));
int tempchar;
while ((tempchar = br.read()) != -1) {
s.append((char) tempchar);
}
return s.toString();
} finally {
IOUtils.closeQuietly(fis);
IOUtils.closeQuietly(br);
}
}
}
......@@ -2,7 +2,6 @@ package com.brilliance.eibs.core.service.instance.impl;
import com.brilliance.eibs.core.model.IFieldDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.impl.AbsFilter;
import com.brilliance.eibs.util.StringUtil;
import java.io.UnsupportedEncodingException;
......@@ -12,109 +11,99 @@ import java.util.Map;
/**
* Fix报文处理
*
* @author 葛成洋
*
* @author 葛成洋
*/
public class FixFilter extends AbsFilter {
String encoding;
String FIX_VERSION_TAG = "8";
String FIX_VERSION_VALUE = "";
String BODY_LEN_TAG = "9";
String BODY_LEN_VALUE = "";
String FIX_TAIL_TAG = "10";
String FIX_TAIL_VALUE = "";
Map<String, Object> map = new LinkedHashMap<String, Object>();
public void execute(Context context) {
// logger.info( LOG_FLAG + "FixFilter is begining to runnig." +
// LOG_FLAG);
context.setCurrentInstance(this);
encoding = filterDef.getEncoding();
String type = getFilterDef().getType();
if (type.equals("in")) {
// logger.info( LOG_FLAG + "Fix报文解析" + LOG_FLAG);
Object object = context.getObject();
String str = null;
if (object instanceof byte[]) {
try {
str = new String((byte[]) object, encoding);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
str = (String) context.getObject();
}
Map<String, Object> map = new HashMap<String, Object>();
char c = 0x01;
String splitSym = String.valueOf(c);
String[] tags = str.split(splitSym);
for (String tag : tags) {
if (!StringUtil.isEmpty(tag)) {
String keys[] = tag.split("=");
map.put(keys[0], keys[1]);
}
}
context.setObject(map);
} else {
// logger.info( LOG_FLAG + "FixFilter定长报文组装" + LOG_FLAG);
}
super.execute(context);
// 组装完后,存入上下文resource的object
if (type.equals("out")) {
StringBuffer sb = new StringBuffer();
char c = 0x01;
// sb.append(FIX_VERSION_TAG + "=" + FIX_VERSION_VALUE).append(c);
for (String key : map.keySet()) {
sb.append(key + "=" + map.get(key)).append(c);
}
try {
BODY_LEN_VALUE = String.valueOf(sb.toString().getBytes(encoding).length);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
StringBuffer allSb = new StringBuffer();
allSb.append(FIX_VERSION_TAG + "=" + FIX_VERSION_VALUE).append(c);
allSb.append(BODY_LEN_TAG + "=" + BODY_LEN_VALUE).append(c);
allSb.append(sb);
allSb.append(FIX_TAIL_TAG + "=" + FIX_TAIL_VALUE).append(c);
context.setObject(allSb.toString());
}
logger.info( "FixFilter is finished");
}
protected void packField(IFieldDef fieldDef, Object fieldValue) {
String etag = fieldDef.getEtag();
if (StringUtil.isEmpty(etag))
return;
etag = elParser.getExPression(context, etag, null).toString();
if (etag.equals(FIX_VERSION_TAG)) {
FIX_VERSION_VALUE = fieldValue.toString();
} else if (etag.equals(FIX_TAIL_TAG)) {
FIX_TAIL_VALUE = fieldValue.toString();
} else if (etag.equals(BODY_LEN_TAG)) {
return;
} else {
map.put(etag, fieldValue.toString());
}
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
// String etag = elParser.getExPression(context, fieldDef.getEtag(),
// null).toString();
return null;
}
String encoding;
String FIX_VERSION_TAG = "8";
String FIX_VERSION_VALUE = "";
String BODY_LEN_TAG = "9";
String BODY_LEN_VALUE = "";
String FIX_TAIL_TAG = "10";
String FIX_TAIL_VALUE = "";
Map<String, Object> map = new LinkedHashMap<String, Object>();
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
encoding = filterDef.getEncoding();
String type = getFilterDef().getType();
if (type.equals("in")) {
Object object = context.getObject();
String str = null;
if (object instanceof byte[]) {
try {
str = new String((byte[]) object, encoding);
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage(), e);
}
} else {
str = (String) context.getObject();
}
Map<String, Object> map = new HashMap<String, Object>();
char c = 0x01;
String splitSym = String.valueOf(c);
String[] tags = str.split(splitSym);
for (String tag : tags) {
if (!StringUtil.isEmpty(tag)) {
String keys[] = tag.split("=");
map.put(keys[0], keys[1]);
}
}
context.setObject(map);
} else {
}
super.execute(context);
// 组装完后,存入上下文resource的object
if (type.equals("out")) {
StringBuffer sb = new StringBuffer();
char c = 0x01;
for (String key : map.keySet()) {
sb.append(key + "=" + map.get(key)).append(c);
}
try {
BODY_LEN_VALUE = String.valueOf(sb.toString().getBytes(encoding).length);
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage(), e);
}
StringBuffer allSb = new StringBuffer();
allSb.append(FIX_VERSION_TAG + "=" + FIX_VERSION_VALUE).append(c);
allSb.append(BODY_LEN_TAG + "=" + BODY_LEN_VALUE).append(c);
allSb.append(sb);
allSb.append(FIX_TAIL_TAG + "=" + FIX_TAIL_VALUE).append(c);
context.setObject(allSb.toString());
}
logger.info("FixFilter is finished");
}
@Override
protected void packField(IFieldDef fieldDef, Object fieldValue) {
String etag = fieldDef.getEtag();
if (StringUtil.isEmpty(etag)) {
return;
}
etag = elParser.getExPression(context, etag, null).toString();
if (etag.equals(FIX_VERSION_TAG)) {
FIX_VERSION_VALUE = fieldValue.toString();
} else if (etag.equals(FIX_TAIL_TAG)) {
FIX_TAIL_VALUE = fieldValue.toString();
} else if (etag.equals(BODY_LEN_TAG)) {
return;
} else {
map.put(etag, fieldValue.toString());
}
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
}
\ No newline at end of file
......@@ -3,7 +3,6 @@ package com.brilliance.eibs.core.service.instance.impl;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IFieldDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.impl.AbsFilter;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
......@@ -13,128 +12,88 @@ import java.util.Map;
/**
* Json报文处理
* @author xiaoyuanzhen
*
* @author xiaoyuanzhen
*/
public class GsonFilter
extends AbsFilter
{
public class GsonFilter extends AbsFilter {
private Object jsonObject;
private String jsontype;
private Object jsonObject;
private String jsontype;
@Override
public void execute(Context context)
{
logger.info(LOG_FLAG + "GsonFilter is begining to runnig." + LOG_FLAG);
context.setCurrentInstance(this);
String type = filterDef.getType();
if (type.equals("in"))
{
logger.info(LOG_FLAG + "GsonFilter解析" + LOG_FLAG);
String json = (String) context.getObject();
jsontype = getRequiredPropertyValue("jsontype");
try
{
Object obj = jsonToObject(json, jsontype);
context.setObject(obj);
logger.debug("json to " + "[" + obj.getClass() + "] :" + obj);
}
catch (JsonSyntaxException e)
{
throw new InterfaceException("01301", e);
}
super.execute(context);
@Override
public void execute(Context context) {
logger.info(LOG_FLAG + "GsonFilter is begining to runnig." + LOG_FLAG);
context.setCurrentInstance(this);
String type = filterDef.getType();
if (type.equals("in")) {
logger.info(LOG_FLAG + "GsonFilter解析" + LOG_FLAG);
String json = (String) context.getObject();
jsontype = getRequiredPropertyValue("jsontype");
try {
Object obj = jsonToObject(json, jsontype);
context.setObject(obj);
logger.debug("json to " + "[" + obj.getClass() + "] :" + obj);
} catch (JsonSyntaxException e) {
throw new InterfaceException("01301", e);
}
super.execute(context);
} else {
logger.info(LOG_FLAG + "GsonFilter组装" + LOG_FLAG);
Object object = context.getObject();
super.execute(context);
String jsonStr = null;
Gson gson = new Gson();
jsonStr = gson.toJson(object);
context.setObject(jsonStr);
logger.debug("gson result:" + jsonStr);
}
logger.info(LOG_FLAG + "GsonFilter has finished runnig." + LOG_FLAG);
}
else
{
logger.info(LOG_FLAG + "GsonFilter组装" + LOG_FLAG);
Object object = context.getObject();
super.execute(context);
String jsonStr = null;
Gson gson = new Gson();
// jsonStr = gson.toJson(jsonObject);
jsonStr = gson.toJson(object);
context.setObject(jsonStr);
//saveToContext(getFilterDef().getScope(), getFilterDef().getTag(), context.getObject());
logger.debug("gson result:" + jsonStr);
/**
* json转化为Java对象
*
* @param json
* @return
*/
private Object jsonToObject(String json, String jsontype) {
Class<?> beantype = null;
if (json.startsWith("[") || jsontype.equals("list")) {
beantype = List.class;
} else if (jsontype.equals("map")) {
beantype = HashMap.class;
} else {
try {
beantype = Class.forName(jsontype);
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
return new Gson().fromJson(json, beantype);
}
logger.info(LOG_FLAG + "GsonFilter has finished runnig." + LOG_FLAG);
}
/**
* json转化为Java对象
* @param json
* @return
*/
private Object jsonToObject(String json, String jsontype)
{
Class<?> beantype = null;
if (json.startsWith("[") || jsontype.equals("list"))
beantype = List.class;
else if (jsontype.equals("map"))
beantype = HashMap.class;
else
{
try
{
beantype=Class.forName(jsontype);
}
catch (ClassNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
return new Gson().fromJson(json, beantype);
}
//解包 处理etag
@Override
public Object getFieldValue(IFieldDef fieldDef)
{
return null;
}
/**
* 往当前组装的对象中添加值
* @param key
* @param value
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void addElement(String key, Object value)
{
if (jsonObject instanceof Map)
{
((Map) jsonObject).put(key, value);
/**
* 往当前组装的对象中添加值
*
* @param key
* @param value
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void addElement(String key, Object value) {
if (jsonObject instanceof Map) {
((Map) jsonObject).put(key, value);
} else if (jsonObject instanceof List) {
((List) jsonObject).add(value);
}
}
else if (jsonObject instanceof List)
{
((List) jsonObject).add(value);
}
}
//组包
@Override
protected void packField(IFieldDef fieldDef, Object fieldValue)
{
/*String etag = (String) elParser.getExPression(context, fieldDef.getEtag(), null);
Object jsonValue = null;
if (fieldValue instanceof String)
{
try
{
jsonValue = jsonToObject((String) fieldValue);
}
catch (Exception e)
{}
}
if (jsonValue != null)
{
fieldValue = jsonValue;
@Override
protected void packField(IFieldDef fieldDef, Object fieldValue) {
}
addElement(etag, fieldValue);*/
}
}
......@@ -5,7 +5,6 @@ import com.brilliance.eibs.core.model.IConnectionDef;
import com.brilliance.eibs.core.model.IPropertyDef;
import com.brilliance.eibs.core.model.impl.ConnectionDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.impl.AbsConnection;
import com.brilliance.eibs.core.service.instance.plugin.ConnectionAdaptor;
import com.brilliance.eibs.core.service.instance.plugin.DBConnPool;
import com.brilliance.eibs.core.service.instance.plugin.JdbcExecutor;
......@@ -25,9 +24,8 @@ public class JdbcConnection extends AbsConnection {
public static final String SQL_SESSION = "__sqlSession";
public final String CNT = "_cnt";
@Override
public void execute(Context context) {
// logger.debug( LOG_FLAG + "JdbcConnection is running." +
// LOG_FLAG);
context.setCurrentInstance(this);
super.execute(context);
ConnectionAdaptor connectionAdator = null;
......@@ -56,29 +54,26 @@ public class JdbcConnection extends AbsConnection {
context.getResource().setCntConnection(cntConnection);
}
} catch (Exception e) {
throw new InterfaceException("01501", e);
}
try {
executor = new JdbcExecutor(context, connection);
} catch (Exception e) {
try {
if (executor != null)
if (executor != null) {
executor.close();
}
} catch (SQLException e1) {
throw new InterfaceException("01503", e1);
}
throw new InterfaceException("01502", e);
}
context.getResource().setExecutor(executor);
} else {
try {
context.setRollbackFlag(1);
connectionDefTmp.setId(connectionDefTmp.getId() + joinedId);
connection = createConnectionJoined(connectionDefTmp);
// connection.setAutoCommit(false);
context.getResource().setConnection(connection);
} catch (Exception e) {
throw new InterfaceException("01501", e);
}
......@@ -86,24 +81,21 @@ public class JdbcConnection extends AbsConnection {
executor = new JdbcExecutor(context, connection);
} catch (Exception e) {
try {
if (executor != null)
if (executor != null) {
executor.close();
}
} catch (SQLException e1) {
throw new InterfaceException("01503", e1);
}
throw new InterfaceException("01502", e);
}
context.getResource().setExecutor(executor);
}
// logger.debug( LOG_FLAG +
// "JdbcConnection has finished runnig." + LOG_FLAG);
context.getResource().setExecutor(executor);
}
private ConnectionAdaptor createConnection(IConnectionDef connectionDefintion) throws SQLException, ClassNotFoundException {
String id = connectionDefintion.getId();
ConnectionAdaptor connectionAdaptor = context.getConnectionAdaptors().get(id);
if (null == connectionAdaptor || connectionAdaptor.isClosed()) {
connectionAdaptor = new ConnectionAdaptor(DBConnPool.getConnection(connectionDefintion, LogUtil.getLogger(context)));
context.getConnectionAdaptors().put(id, connectionAdaptor);
......@@ -111,24 +103,7 @@ public class JdbcConnection extends AbsConnection {
return connectionAdaptor;
}
private Connection createConnectionJoined(IConnectionDef connectionDefintion) throws Exception {
/*
* String id = connectionDefintion.getId();
*
* ConnectionAdaptor connectionAdaptor =
* context.getConnectionAdaptors().get(id); Connection connection =
* connectionAdaptor.getConnection(); if (null == connectionAdaptor ||
* connectionAdaptor.isClosed()) { logger.info(
* "jdbcConnections has closed or is null"); JotmHelper jotmHelper =
* context.getJotmHelper(); if (jotmHelper == null) { jotmHelper = new
* JotmHelper(); jotmHelper.startTMService(); // jotmHelper.begin();
* context.setJotmHelper(jotmHelper); } connection =
* jotmHelper.getConnection(connectionDefintion);
*
* logger.info( "get jdbcConnections again");
* context.getConnectionAdaptors().put(id, new
* ConnectionAdaptor(connection, null)); } return connection;
*/
private Connection createConnectionJoined(IConnectionDef connectionDefintion) {
return null;
}
}
......@@ -14,101 +14,96 @@ import java.util.Hashtable;
/**
* 数据库连接类,使用JNDI数据源获取数据库连接
*
* @author xiaoyuanzhen
*
* @author xiaoyuanzhen
*/
public class JndiDsConnection extends AbsConnection {
/**
* 数据源
*/
private DataSource dataSource;
/**
* 是否为CNT表单独建立一个连接
*/
private boolean useCnt;
public void execute(Context context) {
// logger.info( LOG_FLAG + "JndiConnection is running." +
// LOG_FLAG);
/**
* 数据源
*/
private DataSource dataSource;
context.setCurrentInstance(this);
super.execute(context);
/**
* 是否为CNT表单独建立一个连接
*/
private boolean useCnt;
Connection connection = null;
JdbcExecutor executor = null;
String id = connectionDef.getId();
try {
useCnt = Boolean.valueOf(getPropertyValue("use_table_cnt", true));
dataSource = getDataSource();
connection = dataSource.getConnection();
connection.setAutoCommit(false);
context.getConnectionAdaptors().put(id, new ConnectionAdaptor(connection));
context.getResource().setConnection(connection);
if (useCnt) {
Connection cntConnection = createConnection(id + "_cnt");
cntConnection.setAutoCommit(false);
context.getResource().setCntConnection(cntConnection);
}
} catch (Exception e) {
throw new InterfaceException("01501", e);
}
try {
executor = new JdbcExecutor(context, connection);
} catch (Exception e) {
try {
if (executor != null)
executor.close();
} catch (SQLException e1) {
throw new InterfaceException("01503", e1);
}
throw new InterfaceException("01502", e);
}
context.getResource().setExecutor(executor);
// logger.info( LOG_FLAG + "JndiConnection finished." +
// LOG_FLAG);
}
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
super.execute(context);
/**
* 获取数据源
*
* @return
* @throws Exception
*/
private DataSource getDataSource() throws Exception {
String providerUrl = getPropertyValue("provider_url", true);
String initialFactory = getPropertyValue("factory", true);
String dsName = getPropertyValue("name", false);
Hashtable<String, String> env = new Hashtable<String, String>();
Connection connection = null;
JdbcExecutor executor = null;
String id = connectionDef.getId();
try {
useCnt = Boolean.valueOf(getPropertyValue("use_table_cnt", true));
dataSource = getDataSource();
connection = dataSource.getConnection();
connection.setAutoCommit(false);
context.getConnectionAdaptors().put(id, new ConnectionAdaptor(connection));
context.getResource().setConnection(connection);
if (useCnt) {
Connection cntConnection = createConnection(id + "_cnt");
cntConnection.setAutoCommit(false);
context.getResource().setCntConnection(cntConnection);
}
} catch (Exception e) {
throw new InterfaceException("01501", e);
}
try {
executor = new JdbcExecutor(context, connection);
} catch (Exception e) {
try {
if (executor != null) {
executor.close();
}
} catch (SQLException e1) {
throw new InterfaceException("01503", e1);
}
throw new InterfaceException("01502", e);
}
context.getResource().setExecutor(executor);
}
if (!StringUtil.isEmpty(providerUrl)) {
env.put(javax.naming.Context.PROVIDER_URL, providerUrl);
}
if (!StringUtil.isEmpty(initialFactory)) {
env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, initialFactory);
}
/**
* 获取数据源
*
* @return
* @throws Exception
*/
private DataSource getDataSource() throws Exception {
String providerUrl = getPropertyValue("provider_url", true);
String initialFactory = getPropertyValue("factory", true);
String dsName = getPropertyValue("name", false);
Hashtable<String, String> env = new Hashtable<String, String>();
InitialContext ctx = new InitialContext(env);
return (DataSource) ctx.lookup(dsName);
}
if (!StringUtil.isEmpty(providerUrl)) {
env.put(javax.naming.Context.PROVIDER_URL, providerUrl);
}
if (!StringUtil.isEmpty(initialFactory)) {
env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, initialFactory);
}
InitialContext ctx = new InitialContext(env);
return (DataSource) ctx.lookup(dsName);
}
/**
* 获取数据库连接
*
* @param id
* @return
* @throws SQLException
*/
private Connection createConnection(String id) throws SQLException {
ConnectionAdaptor connAdaptor = context.getConnectionAdaptors().get(id);
if (connAdaptor == null || connAdaptor.isClosed()) {
Connection conn = dataSource.getConnection();
connAdaptor = new ConnectionAdaptor(conn);
context.getConnectionAdaptors().put(id, connAdaptor);
}
return connAdaptor.getConnection();
}
/**
* 获取数据库连接
*
* @param id
* @return
* @throws SQLException
*/
private Connection createConnection(String id) throws SQLException {
ConnectionAdaptor connAdaptor = context.getConnectionAdaptors().get(id);
if (connAdaptor == null || connAdaptor.isClosed()) {
Connection conn = dataSource.getConnection();
connAdaptor = new ConnectionAdaptor(conn);
context.getConnectionAdaptors().put(id, connAdaptor);
}
return connAdaptor.getConnection();
}
}
......@@ -12,184 +12,160 @@ import java.util.Date;
/**
* ResultSet结果集处理过滤器,执行该过滤器之后默认的会关闭ResultSet
*
* @author hujun
*
* @author hujun
*/
public class ResultSetFilter
extends AbsFilter
{
extends AbsFilter {
private ResultSet rs;
private ResultSet rs;
@Override
public void execute(Context context)
{
logger.info(LOG_FLAG + "ResultSetFilter数据库结果集解析处理中." + LOG_FLAG);
context.setCurrentInstance(this);
rs = (ResultSet) context.getObject();
if (rs == null)
throw new InterfaceException("01901");
JdbcExecutor executor = null;
try
{
executor = context.getResource().getExecutor();
super.execute(context);
}
finally
{
close(executor, "01902");
@Override
public void execute(Context context) {
logger.info(LOG_FLAG + "ResultSetFilter数据库结果集解析处理中." + LOG_FLAG);
context.setCurrentInstance(this);
rs = (ResultSet) context.getObject();
if (rs == null) {
throw new InterfaceException("01901");
}
JdbcExecutor executor = null;
try {
executor = context.getResource().getExecutor();
super.execute(context);
} finally {
close(executor, "01902");
}
logger.info(LOG_FLAG + "ResultSetFilter has finisehd running." + LOG_FLAG);
}
logger.info(LOG_FLAG + "ResultSetFilter has finisehd running." + LOG_FLAG);
}
/**
* 关闭ResultSet,并拋出error系统异常
*
* @param rs
* @param error
*/
private void close(JdbcExecutor executor, String error)
{
try
{
executor.closeWithoutConn();
}
catch (SQLException e)
{
throw new InterfaceException(error, "Close resultSet and preparedStatement exception occurs.", e);
/**
* 关闭ResultSet,并拋出error系统异常
*
* @param executor
* @param error
*/
private void close(JdbcExecutor executor, String error) {
try {
executor.closeWithoutConn();
} catch (SQLException e) {
throw new InterfaceException(error, "Close resultSet and preparedStatement exception occurs.", e);
}
}
}
/**
* 获取ResultSet的记录数
*
* @return
* @throws SQLException
*/
public int getSize()
throws SQLException
{
rs.last();
int num = rs.getRow();
//将游标重新复位到初始
rs.beforeFirst();
return num;
}
/**
* 获取ResultSet的记录数
*
* @return
* @throws SQLException
*/
public int getSize()
throws SQLException {
rs.last();
int num = rs.getRow();
//将游标重新复位到初始
rs.beforeFirst();
return num;
}
/**
* 获取ResultSet当前指向记录列名为s的Double类型的数据,如果数据库记录为空,返回""
*
* @param s
* @return
* @throws SQLException
*/
public String getString(String s)
throws SQLException
{
return rs.getString(s);
}
/**
* 获取ResultSet当前指向记录列名为s的Double类型的数据,如果数据库记录为空,返回""
*
* @param s
* @return
* @throws SQLException
*/
public String getString(String s)
throws SQLException {
return rs.getString(s);
}
/**
* 获取ResultSet当前指向记录列名为s的Double类型的数据,如果数据库记录为空,返回0.0
*
* @param s
* @return
* @throws SQLException
*/
public double getDouble(String s)
throws SQLException
{
return rs.getDouble(s);
}
/**
* 获取ResultSet当前指向记录列名为s的Double类型的数据,如果数据库记录为空,返回0.0
*
* @param s
* @return
* @throws SQLException
*/
public double getDouble(String s)
throws SQLException {
return rs.getDouble(s);
}
/**
* 获取ResultSet当前指向记录列名为s的Date类型的数据,默认会将java.sql.Date转换成java.util.Date
*
* @param s
* @return
* @throws SQLException
*/
public Date getDate(String s)
throws SQLException
{
java.sql.Date date = rs.getDate(s);
if (null != date)
return new Date(rs.getDate(s).getTime());
return null;
}
/**
* 获取ResultSet当前指向记录列名为s的Date类型的数据,默认会将java.sql.Date转换成java.util.Date
*
* @param s
* @return
* @throws SQLException
*/
public Date getDate(String s) throws SQLException {
java.sql.Date date = rs.getDate(s);
if (null != date) {
return new Date(rs.getDate(s).getTime());
}
return null;
}
/**
* 获取ResultSet当前指向记录列名为s的int类型的数据,如果数据库记录为空,返回0
*
* @param s
* @return
* @throws SQLException
*/
public int getInt(String s)
throws SQLException
{
return rs.getInt(s);
}
/**
* 获取ResultSet当前指向记录列名为s的int类型的数据,如果数据库记录为空,返回0
*
* @param s
* @return
* @throws SQLException
*/
public int getInt(String s) throws SQLException {
return rs.getInt(s);
}
/**
* 获取ResultSet当前指向记录列名为s的数据
* @param s
* @return
* @throws SQLException
*/
public Object getObject(String s)
throws SQLException
{
return rs.getObject(s);
}
/**
* 获取ResultSet当前指向记录列名为s的数据
*
* @param s
* @return
* @throws SQLException
*/
public Object getObject(String s) throws SQLException {
return rs.getObject(s);
}
/**
* 将ResultSet的游标下移一位
*
* @return
* @throws SQLException
*/
public boolean next()
throws SQLException
{
return rs.next();
}
/**
* 将ResultSet的游标下移一位
*
* @return
* @throws SQLException
*/
public boolean next() throws SQLException {
return rs.next();
}
@Override
public Object getFieldValue(IFieldDef fieldDef)
{
String type = fieldDef.getType();
String arg = fieldDef.getEtag();
try
{
if (FIELD_TYPE_DATE.equals(type))
return getDate(arg);
if (FIELD_TYPE_STRING.equals(type))
{
try
{
if (getString(arg) != null)
return new String(getString(arg).getBytes(), filterDef.getEncoding());
else
{
return null;
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
String type = fieldDef.getType();
String arg = fieldDef.getEtag();
try {
if (FIELD_TYPE_DATE.equals(type)) {
return getDate(arg);
}
if (FIELD_TYPE_STRING.equals(type)) {
try {
if (getString(arg) != null) {
return new String(getString(arg).getBytes(), filterDef.getEncoding());
} else {
return null;
}
} catch (UnsupportedEncodingException e) {
throw new InterfaceException("00403");
}
}
if (FIELD_TYPE_INT.equals(type)) {
return getInt(arg);
}
if (FIELD_TYPE_DOUBLE.equals(type)) {
return getDouble(arg);
}
return getObject(arg);
} catch (SQLException e) {
throw new InterfaceException("01903", "get result of [" + arg + "] error.", e);
}
catch (UnsupportedEncodingException e)
{
throw new InterfaceException("00403");
}
}
if (FIELD_TYPE_INT.equals(type))
return getInt(arg);
if (FIELD_TYPE_DOUBLE.equals(type))
return getDouble(arg);
return getObject(arg);
}
catch (SQLException e)
{
throw new InterfaceException("01903", "get result of [" + arg + "] error.", e);
}
}
}
......@@ -18,134 +18,134 @@ import java.util.Map;
*/
public class RibbonFilter extends AbsFilter {
@SuppressWarnings("unchecked")
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
Map<Object, Object> params = (Map<Object, Object>) context.getObject();// 必须得从上下文中拿到服务列表
List<Server> servers = servers(params.get("server"));
String anInterface = (String) params.get("interface");
String transaction = (String) params.get("transaction");
ResultMsg value = null;
boolean flag = false;
StringBuffer errmsg = null;
Logger log = (Logger) context.getVariable(Constants.Log_VAR);
for (Server server : servers) {
try {
Map<Object, Object> tmpParams = new HashMap<Object, Object>(
params);
tmpParams.put("ip", server.getHost());
tmpParams.put("port", server.getPort());
value = new Client().call(anInterface, transaction,
new Object[] { tmpParams }, log);
if (value.isSuccess()) {
flag = true;
break;
} else {
errmsg = new StringBuffer();
errmsg.append("try call ");
errmsg.append(server);
errmsg.append(" failed ,failed message:");
errmsg.append(value.getDescription());
server.setDsp(value.getDescription());
logger.error( errmsg.toString());
}
} catch (Throwable e) {
errmsg = new StringBuffer();
errmsg.append("try call ");
errmsg.append(server);
errmsg.append(" failed ,caused by :");
errmsg.append(e.getMessage());
server.setDsp(e.getMessage());
logger.error( errmsg.toString());
}
}
if (flag) {
super.execute(context);
context.setObject(value);
} else {
throw new InterfaceException("T0001", "try call all servers " + servers
+ " failed");
}
}
@Override
public Object getFieldValue(IFieldDef iFieldDef) throws Exception {
return null;
}
private List<Server> servers(Object obj) {
return servers(requireNonNull(obj).toString());
}
private Object requireNonNull(Object obj) {
if(obj==null){
throw new RuntimeException("obj is null");
}
return obj;
}
private List<Server> servers(String str) {
requireNonNull(str);
List<Server> list = new ArrayList<Server>();
for (String s : str.split(",")) {
String[] split = s.split(":");
list.add(new Server(split[0], Integer.parseInt(split[1])));
}
return list;
}
class Server {
private String host;
private int port;
private String dsp;
public Server(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDsp() {
return dsp;
}
public void setDsp(String dsp) {
this.dsp = dsp;
}
@Override
public String toString() {
return "[host=" + host + ", dsp=" + dsp + "]";
}
}
public static void main(String[] args) {
List<Server> lst = new ArrayList<Server>();
RibbonFilter rf = new RibbonFilter();
Server server1 = rf.new Server("127.0.0.1", 11);
server1.setDsp("xxxx");
Server server2 = rf.new Server("127.0.0.2", 22);
server2.setDsp("tyyy");
lst.add(server1);
lst.add(server2);
System.out.println(lst);
}
@SuppressWarnings("unchecked")
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
// 必须得从上下文中拿到服务列表
Map<Object, Object> params = (Map<Object, Object>) context.getObject();
List<Server> servers = servers(params.get("server"));
String anInterface = (String) params.get("interface");
String transaction = (String) params.get("transaction");
ResultMsg value = null;
boolean flag = false;
StringBuffer errmsg = null;
Logger log = (Logger) context.getVariable(Constants.Log_VAR);
for (Server server : servers) {
try {
Map<Object, Object> tmpParams = new HashMap<Object, Object>(params);
tmpParams.put("ip", server.getHost());
tmpParams.put("port", server.getPort());
value = new Client().call(anInterface, transaction,
new Object[]{tmpParams}, log);
if (value.isSuccess()) {
flag = true;
break;
} else {
errmsg = new StringBuffer();
errmsg.append("try call ");
errmsg.append(server);
errmsg.append(" failed ,failed message:");
errmsg.append(value.getDescription());
server.setDsp(value.getDescription());
logger.error(errmsg.toString());
}
} catch (Throwable e) {
errmsg = new StringBuffer();
errmsg.append("try call ");
errmsg.append(server);
errmsg.append(" failed ,caused by :");
errmsg.append(e.getMessage());
server.setDsp(e.getMessage());
logger.error(errmsg.toString());
}
}
if (flag) {
super.execute(context);
context.setObject(value);
} else {
throw new InterfaceException("T0001", "try call all servers " + servers
+ " failed");
}
}
@Override
public Object getFieldValue(IFieldDef iFieldDef) throws Exception {
return null;
}
private List<Server> servers(Object obj) {
return servers(requireNonNull(obj).toString());
}
private Object requireNonNull(Object obj) {
if (obj == null) {
throw new RuntimeException("obj is null");
}
return obj;
}
private List<Server> servers(String str) {
requireNonNull(str);
List<Server> list = new ArrayList<Server>();
for (String s : str.split(",")) {
String[] split = s.split(":");
list.add(new Server(split[0], Integer.parseInt(split[1])));
}
return list;
}
class Server {
private String host;
private int port;
private String dsp;
public Server(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDsp() {
return dsp;
}
public void setDsp(String dsp) {
this.dsp = dsp;
}
@Override
public String toString() {
return "[host=" + host + ", dsp=" + dsp + "]";
}
}
public static void main(String[] args) {
List<Server> lst = new ArrayList<Server>();
RibbonFilter rf = new RibbonFilter();
Server server1 = rf.new Server("127.0.0.1", 11);
server1.setDsp("xxxx");
Server server2 = rf.new Server("127.0.0.2", 22);
server2.setDsp("tyyy");
lst.add(server1);
lst.add(server2);
System.out.println(lst);
}
}
......@@ -4,7 +4,6 @@ import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IFieldDef;
import com.brilliance.eibs.core.model.ISubfieldDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.impl.AbsFilter;
import com.brilliance.eibs.core.service.instance.plugin.MsgFormTemplate;
import com.brilliance.eibs.core.service.instance.plugin.MsgFormTemplate.IMsgAssembleFilter;
import com.brilliance.eibs.core.service.instance.plugin.MsgFormTemplate.IMsgParseFilter;
......@@ -29,13 +28,11 @@ import java.util.*;
public class XmlFilter extends AbsFilter {
private Document doc;
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "XMLFilter is begining to runnig." +
// LOG_FLAG);
context.setCurrentInstance(this);
String type = getType();
if (type.equals("in")) {
// logger.info( LOG_FLAG + "XMLFilter解析" + LOG_FLAG);
try {
// 获取传入的xml字符串,构建Document文档对象
String xmlstr = context.getObject().toString();
......@@ -44,7 +41,6 @@ public class XmlFilter extends AbsFilter {
throw new InterfaceException("02801", e);
}
} else {
// logger.info( LOG_FLAG + "XMLFilter组装" + LOG_FLAG);
String x = getPropertyValue("doc");
if (!StringUtil.isEmpty(x) && Boolean.valueOf(x)) {
try {
......@@ -60,16 +56,8 @@ public class XmlFilter extends AbsFilter {
if (type.equals("out")) {
String formatstr = format(doc);
context.getResource().setObject(formatstr);
// context.getResource().setObject(doc.asXML());
// context.addVariable("global",
// IExecutableInstance.AES_ENCRYPT_PACKAGE_SYMBOL,
// formatstr);
// saveToContext(getFilterDef().getScope(), getFilterDef().getTag(),
// context.getObject());
logger.trace(formatstr);
}
// logger.info( LOG_FLAG + "XMLFilter has finished running." +
// LOG_FLAG);
}
/**
......@@ -81,16 +69,14 @@ public class XmlFilter extends AbsFilter {
doc = DocumentHelper.createDocument();
String encoding = getFilterDef().getEncoding();
doc.setXMLEncoding(encoding);
// doc.setRootElement(DocumentHelper.createElement(root));
Map<String, String> propMap = filterDef.getCommonsPropertyMap();
if (propMap.containsKey("xmlns")) {
// modified by weicong on 2018/12/12 支持动态参数
doc.addElement(root,
(String) elParser.getExPression(context, propMap.get("xmlns"), null));
// propMap.remove("xmlns");
} else
} else {
doc.addElement(root);
}
Iterator<String> iterator = propMap.keySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
......@@ -105,18 +91,19 @@ public class XmlFilter extends AbsFilter {
/**
* 按指定路径和值添加xml节点
*
* @param path
* @param node
* @param value
*/
private void addPath(Node node, Object value) {
if (null == value)
if (null == value) {
value = "";
else if (!(value instanceof Node)) {
} else if (!(value instanceof Node)) {
String encoding = filterDef.getEncoding();
try {
if (null != encoding && ("ISO-8859-1".equalsIgnoreCase(encoding)
|| "ISO8859-1".equalsIgnoreCase(encoding)))
|| "ISO8859-1".equalsIgnoreCase(encoding))) {
encoding = "GBK";
}
value = new String(value.toString().getBytes(encoding), encoding);
} catch (UnsupportedEncodingException e) {
throw new InterfaceException("00403", "unsupported encoding : " + encoding, e);
......@@ -142,6 +129,7 @@ public class XmlFilter extends AbsFilter {
*
* @param fieldDef
*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
protected void packField(IFieldDef fieldDef, Object value) {
// ------------modified by weicong on 2016/3/16 -----------
......@@ -166,11 +154,10 @@ public class XmlFilter extends AbsFilter {
value = cache;
}
sbf.setTag(sbf.getKey());
value = ((AbsFilter) context.getCurrentInstance()).valFilter(value, sbf,
context);
} else
value = ((AbsFilter) context.getCurrentInstance()).valFilter(value, sbf,
context);
value = ((AbsFilter) context.getCurrentInstance()).valFilter(value, sbf, context);
} else {
value = ((AbsFilter) context.getCurrentInstance()).valFilter(value, sbf, context);
}
if (sbf.getSubfields().size() > 0) {
String rawetag = sbf.getEtag();
String nwetag = fieldDef.getEtag() + "(" + i + ")." + rawetag;
......@@ -181,8 +168,9 @@ public class XmlFilter extends AbsFilter {
String etag = fieldDef.getEtag() + "(" + i + ")." + sbf.getEtag();
Node node = getNode(etag);
if (node != null)
if (node != null) {
addPath(node, value);
}
}
}
}
......@@ -192,8 +180,9 @@ public class XmlFilter extends AbsFilter {
@Override
public void assembelMsg(IFieldDef fieldDef, Object fieldValue) {
Node node = getNode(path);
if (node != null)
if (node != null) {
addPath(node, fieldValue);
}
}
});
}
......@@ -214,12 +203,14 @@ public class XmlFilter extends AbsFilter {
}
createDocument(path.substring(0, index));
}
if (cur == null)
if (cur == null) {
rsnode = doc.getRootElement().addElement(lasted);
else
} else {
rsnode = cur.addElement(lasted);
if (value != null)
}
if (value != null) {
addPath(rsnode, value);
}
return rsnode;
}
......@@ -273,11 +264,11 @@ public class XmlFilter extends AbsFilter {
return mft.getFieldValue(fieldDef, new IMsgParseFilter() {
@Override
public Object parseMsg(IFieldDef fieldDef) {
if (XmlUtil.isXpath(fieldDef.getType()))
if (XmlUtil.isXpath(fieldDef.getType())) {
return XmlUtil.getXmlNodeValueByXpath(doc, etag);
else
} else {
return XmlUtil.getXmlNodeValue(doc, etag);
}
}
@SuppressWarnings("unchecked")
......@@ -306,19 +297,17 @@ public class XmlFilter extends AbsFilter {
Object val = XmlUtil.getXmlNodeValue(doc, etag);
val = ((AbsFilter) context.getCurrentInstance()).valFilter(val, sbf,
context);
if (val instanceof Map)
if (val instanceof Map) {
rsmap.putAll((Map<? extends String, ? extends Object>) val);
else
} else {
rsmap.put(sbf.getTag(), val);
}
}
}
rs.add(rsmap);
}
}
return rs;
// return XmlUtil.getElementsOfListformat(doc, etag, fieldDef,
// context);
}
});
}
......@@ -364,9 +353,6 @@ public class XmlFilter extends AbsFilter {
} else {
Element subElement = curElement.element(layers[i]);
if (subElement == null) {
// throw new InterfaceException("02802",
// "can not find sub element [" + layers[i] +
// "] in path [" + path + "]");
return 0;
}
curElement = subElement;
......@@ -429,11 +415,11 @@ public class XmlFilter extends AbsFilter {
curElement = subElement;
} else {
Element subElement = null;
if (layers[i].contains(":"))
subElement =
curElement.element(layers[i].substring(layers[i].indexOf(':') + 1));
else
if (layers[i].contains(":")) {
subElement = curElement.element(layers[i].substring(layers[i].indexOf(':') + 1));
} else {
subElement = curElement.element(layers[i]);
}
if (subElement == null) {
subElement = curElement.addElement(layers[i]);
}
......@@ -465,7 +451,6 @@ public class XmlFilter extends AbsFilter {
* @return
*/
public Node getNode(String path) {
if (doc == null) {
int index = path.indexOf('.');
if (index == -1) {
......
......@@ -33,13 +33,12 @@ public class HttpConnection extends AbsConnection {
private Map<String, Object> headMap = new HashMap<String, Object>();
/**
* @author gechengyang Http连接 满足get和post两种请求方式
* @author gechengyang
* Http连接 满足get和post两种请求方式
**/
@SuppressWarnings("unchecked")
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "HttpConnection is Running." +
// LOG_FLAG);
context.setCurrentInstance(this);
super.execute(context);
......@@ -48,9 +47,10 @@ public class HttpConnection extends AbsConnection {
IPropertyDef headProdef = getProperty(HEAD, true);
if (headProdef != null) {
String head = headProdef.getValue();
if (!StringUtil.isEmpty(head))
if (!StringUtil.isEmpty(head)) {
headMap = (Map<String, Object>) elParser.getExPression(context, head, null);
logger.info( "head" + headMap);
}
logger.info("head" + headMap);
}
IPropertyDef isFormProdef = getProperty(ISFORM, true);
......@@ -68,10 +68,8 @@ public class HttpConnection extends AbsConnection {
HttpConnectionManagerParams params = httpConnectionManager.getParams();
params.setConnectionTimeout(getPropertyValueInt(HTTP_CONNECTIONTIMEOUT));
params.setSoTimeout(getPropertyValueInt(HTTP_SOTIMEOUT));
params.setDefaultMaxConnectionsPerHost(getPropertyValueInt(HTTP_DEFAULTCONNPERHOST));// very
// important!!
params.setMaxTotalConnections(getPropertyValueInt(HTTP_MAXCONNS));// very
// important!!
params.setDefaultMaxConnectionsPerHost(getPropertyValueInt(HTTP_DEFAULTCONNPERHOST));
params.setMaxTotalConnections(getPropertyValueInt(HTTP_MAXCONNS));
client = new HttpClient(httpConnectionManager);
// 设置编码
String conncharset = connectionDef.getEncoding();
......@@ -98,16 +96,11 @@ public class HttpConnection extends AbsConnection {
DocumentFragment df =
Html2Xml.getSourceNodeByStr(contextstr, true, responseCharset);
contextstr = Html2Xml.genToXml(df, responseCharset);
logger.info( "Html to XML =" + contextstr);
logger.info("Html to XML =" + contextstr);
}
context.setObject(contextstr);
}
((MultiThreadedHttpConnectionManager) client.getHttpConnectionManager()).shutdown();
// logger.info( LOG_FLAG +
// "HttpConnection has finished running." + LOG_FLAG);
}
public String doGet() {
......@@ -115,10 +108,6 @@ public class HttpConnection extends AbsConnection {
GetMethod getMethod = new GetMethod(url);
InputStream in = null;
// 使用系统提供的默认的恢复策略
// getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
// new DefaultHttpMethodRetryHandler(3, false));
// getMethod.getParams().setParameter(CoreProtocolPNames.HTTP_CONTENT_CHARSET,
// charset);
return getContent(getMethod, in);
}
......@@ -132,7 +121,6 @@ public class HttpConnection extends AbsConnection {
PostMethod postMethod = new PostMethod(url);
int size = prodef.getArguments().size();
// 如果参数为Map类型 Modifed By gechengyang 20150701
if (size == 1) {
Object obj =
elParser.getExPression(context, prodef.getArguments().get(0).getValue(), null);
......@@ -149,7 +137,6 @@ public class HttpConnection extends AbsConnection {
prodef.setArguments(list);
}
}
// /////////////////////////
if (size > 0) {
modeparamsid = new Object[size];
modeparamsvalue = new Object[size];
......@@ -183,7 +170,7 @@ public class HttpConnection extends AbsConnection {
String location = null;
if (locationHeader != null) {
location = locationHeader.getValue();
logger.info( "The page was redirected to:" + location);
logger.info("The page was redirected to:" + location);
doGet(location);
}
}
......@@ -193,14 +180,11 @@ public class HttpConnection extends AbsConnection {
throw new InterfaceException("01402", "Http I/O error.", e);
}
try {
// postMethod.releaseConnection();
String ret = postMethod.getResponseBodyAsString();
postMethod.releaseConnection();
return ret;
} catch (IOException e) {
throw new InterfaceException("01402", "Rcv ret error.", e);
throw new InterfaceException("01402", "Rcv ret error.", e);
}
}
......@@ -211,17 +195,11 @@ public class HttpConnection extends AbsConnection {
int statusCode = client.executeMethod(getMethod);
if (statusCode == HttpStatus.SC_OK) {
// 读取内容
in = getMethod.getResponseBodyAsStream();
String responsecharset = getMethod.getRequestCharSet();
this.responseCharset = responsecharset;
logger.info( "http response charset=" + responsecharset);
logger.info("http response charset=" + responsecharset);
StringBuffer sb = new StringBuffer();
// int a = 0;
// byte buffer[] = new byte[1024];
// while ((a = in.read(buffer)) != -1) {
// sb.append(new String(buffer, 0, a, responsecharset));
// }
InputStreamReader isr = new InputStreamReader(in, responsecharset);
char[] buffer = new char[12];
int len = 0;
......@@ -230,7 +208,7 @@ public class HttpConnection extends AbsConnection {
}
// 处理内容
str = sb.toString();
logger.info( "http filter str= " + str);
logger.info("http filter str= " + str);
}
} catch (HttpException e) {
// 发生致命的异常,可能是协议不对或者返回的内容有问题
......@@ -254,13 +232,6 @@ public class HttpConnection extends AbsConnection {
URL realUrl = new URL(url);
// 打开和URL之间的连接
URLConnection conn = realUrl.openConnection();
// 设置通用的请求属性
/*
* conn.setRequestProperty("accept", "application/json");
* conn.setRequestProperty("connection", "Keep-Alive");
* conn.setRequestProperty("Content-Type", "application/json;charset=utf-8");
* conn.setRequestProperty("Authorization", vo.getAuth());
*/
for (String key : map.keySet()) {
conn.setRequestProperty(key, map.get(key).toString());
}
......@@ -281,7 +252,7 @@ public class HttpConnection extends AbsConnection {
result += line;
}
} catch (Exception e) {
throw new InterfaceException("01402", "发送 POST 请求出现异常!", e);
throw new InterfaceException("01402", "发送 POST 请求出现异常!", e);
}
// 使用finally块来关闭输出流、输入流
finally {
......@@ -293,7 +264,7 @@ public class HttpConnection extends AbsConnection {
in.close();
}
} catch (IOException ex) {
throw new InterfaceException("01402", "关闭连接异常!", ex);
throw new InterfaceException("01402", "关闭连接异常!", ex);
}
}
return result;
......
......@@ -56,8 +56,10 @@ public class HttpsConnection extends AbsConnection {
public static final String STORE_PATH = "storepath";
public static final String STORE_PWD = "storepwd";
public static final String STORE_PRO = "storepro";
public static final String DEF_PRO = "TLSv1.2";//"SSLv3", "TLSv1", "TLSv1.1", "TLSv1.2"
/**
* "SSLv3", "TLSv1", "TLSv1.1", "TLSv1.2"
*/
public static final String DEF_PRO = "TLSv1.2";
public static final String MAPPING = "mapping";
public static final String BODY = "body";
public static final String HEAD = "head";
......@@ -67,10 +69,10 @@ public class HttpsConnection extends AbsConnection {
private Charset charset = Charset.forName("UTF-8");
private CloseableHttpClient httpClient;
@Override
public void execute(Context context) {
params = (Map<String, Object>) context.getObject();//必须得从上下文中拿到参数列表
// 必须得从上下文中拿到参数列表
params = (Map<String, Object>) context.getObject();
context.setCurrentInstance(this);
super.execute(context);
String type = getPropertyValue(TYPE, false);
......@@ -81,13 +83,13 @@ public class HttpsConnection extends AbsConnection {
String proxyHost = getPropertyValue(PROXYHOST, true);
String proxyPort = getPropertyValue(PROXYPORT, true);
String mapping = getPropertyValue(MAPPING, false);
String charsetLocal = getPropertyValue(CHARSET, true);//true,存在为null,则返回
String charsetLocal = getPropertyValue(CHARSET, true);
String httpRetry = getPropertyValue(HTTPRETRY, false);
String connectTimeout = getPropertyValue(CONNECT_TIMEOUT, false);
String readTimeout = getPropertyValue(READ_TIMEOUT, false);
String storePath = getPropertyValue(STORE_PATH, false);
String storePwd = getPropertyValue(STORE_PWD, false);
String storePro = StringUtil.isEmpty(getPropertyValue(STORE_PRO, true))? DEF_PRO :
String storePro = StringUtil.isEmpty(getPropertyValue(STORE_PRO, true)) ? DEF_PRO :
getPropertyValue(STORE_PRO, true);
HttpClientBuilder clientBuilder = HttpClientBuilder.create();
RequestConfig.Builder requestConfig = RequestConfig.custom();
......@@ -107,7 +109,7 @@ public class HttpsConnection extends AbsConnection {
try {
security = getSecurity(storePath, storePwd, storePro);
} catch (Exception e) {
logger.error( "create SSLConnectionSocketFactory failed:", e);
logger.error("create SSLConnectionSocketFactory failed:", e);
throw new InterfaceException("11001", "create SSLConnectionSocketFactory failed.", e);
}
httpClient = clientBuilder
......@@ -151,21 +153,21 @@ public class HttpsConnection extends AbsConnection {
throw new InterfaceException("11001", "HttpConnection status code is not 200 ,but is " + statuscode);
}
} catch (IOException e) {
logger.error( "url " + url, e);
logger.error("url " + url, e);
throw new InterfaceException("11001", "HttpConnection exception occurs.", e);
} finally {
if (response != null) {
try {
response.close();
} catch (IOException e) {
logger.error( "close CloseableHttpResponse failed" + url, e);
logger.error("close CloseableHttpResponse failed" + url, e);
}
}
if (httpClient != null) {
try {
httpClient.close();
} catch (IOException e) {
logger.error( "close httpClient failed" + url, e);
logger.error("close httpClient failed" + url, e);
}
}
}
......
......@@ -22,76 +22,60 @@ import org.apache.http.protocol.BasicHttpContext;
import java.net.URI;
public class RestHttpConnection
extends AbsConnection
{
/**
* @author gechengyang
*RestHttp连接 满足get和post put三种请求方式
* **/
@Override
public void execute(Context context)
{
logger.info(LOG_FLAG + "RestHttpConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
// super.execute(context);
String username=getPropertyValue("username");
String password=getPropertyValue("lpassword");
String url=getPropertyValue("url");
String mode=getPropertyValue("mode");
//获取表单数据 TODO
// IPropertyDef prodef = getProperty("mode");
DefaultHttpClient client = new DefaultHttpClient();
StringEntity entity=null;
try
{
URI uri = new URI(url);
client.getCredentialsProvider().setCredentials(new AuthScope(uri.getHost(), uri.getPort()),
new UsernamePasswordCredentials(username, password));
HttpHost host = new HttpHost(uri.getHost(), uri.getPort(), "http");
AuthCache authCache = new BasicAuthCache();
BasicScheme basicAuth = new BasicScheme();
authCache.put(host, basicAuth);
BasicHttpContext localcontext = new BasicHttpContext();
localcontext.setAttribute(ClientContext.AUTH_CACHE, authCache);
HttpResponse response=null;
if("post".equals(mode))
{
HttpPost post = new HttpPost(url);
//发送的消息体
String sendData=(String) context.getObject();
entity = new StringEntity(sendData);
post.setEntity(entity);
response = client.execute(host, post, localcontext);
}
else if("put".equals(mode))
{
HttpPut put=new HttpPut(url);
//发送的消息体
String sendData=(String) context.getObject();
entity = new StringEntity(sendData);
put.setEntity(entity);
response = client.execute(host, put, localcontext);
}
else if("get".equals(mode))
{
HttpGet get=new HttpGet(url);
response = client.execute(host, get, localcontext);
}
String returnData= IOUtils.toString(response.getEntity().getContent());
context.setObject(returnData);
public class RestHttpConnection extends AbsConnection {
/**
* @author gechengyang
* RestHttp连接 满足get和post put三种请求方式
**/
@Override
public void execute(Context context) {
logger.info(LOG_FLAG + "RestHttpConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
String username = getPropertyValue("username");
String password = getPropertyValue("lpassword");
String url = getPropertyValue("url");
String mode = getPropertyValue("mode");
DefaultHttpClient client = new DefaultHttpClient();
StringEntity entity = null;
try {
URI uri = new URI(url);
client.getCredentialsProvider().setCredentials(new AuthScope(uri.getHost(), uri.getPort()),
new UsernamePasswordCredentials(username, password));
HttpHost host = new HttpHost(uri.getHost(), uri.getPort(), "http");
AuthCache authCache = new BasicAuthCache();
BasicScheme basicAuth = new BasicScheme();
authCache.put(host, basicAuth);
BasicHttpContext localcontext = new BasicHttpContext();
localcontext.setAttribute(ClientContext.AUTH_CACHE, authCache);
HttpResponse response = null;
if ("post".equals(mode)) {
HttpPost post = new HttpPost(url);
//发送的消息体
String sendData = (String) context.getObject();
entity = new StringEntity(sendData);
post.setEntity(entity);
response = client.execute(host, post, localcontext);
} else if ("put".equals(mode)) {
HttpPut put = new HttpPut(url);
//发送的消息体
String sendData = (String) context.getObject();
entity = new StringEntity(sendData);
put.setEntity(entity);
response = client.execute(host, put, localcontext);
} else if ("get".equals(mode)) {
HttpGet get = new HttpGet(url);
response = client.execute(host, get, localcontext);
}
String returnData = IOUtils.toString(response.getEntity().getContent());
context.setObject(returnData);
} catch (Exception e) {
throw new InterfaceException("01401", "HTTP error.", e);
}
logger.info(LOG_FLAG + "RestHttpConnection has finished running." + LOG_FLAG);
}
catch (Exception e)
{
throw new InterfaceException("01401", "HTTP error.", e);
}
logger.info(LOG_FLAG + "RestHttpConnection has finished running." + LOG_FLAG);
}
}
}
......@@ -47,7 +47,7 @@ public class HttpsService extends AbsServer {
@Override
public void run() {
logger.info( LOG_FLAG + "starting HttpService[id:" + serviceDef.getId() + "]." + LOG_FLAG);
logger.info(LOG_FLAG + "starting HttpService[id:" + serviceDef.getId() + "]." + LOG_FLAG);
try {
String ksttyp = getPropertyValue(Key_Store_Type, Def_Key_Store_Type);
String kmftyp = getPropertyValue(Key_Manager_Factory, Def_Key_Manager_Factory);
......@@ -77,13 +77,13 @@ public class HttpsService extends AbsServer {
@Override
public void handle(HttpExchange httpExchange) {
String dsturi = httpExchange.getRequestURI().getPath();
Headers header=httpExchange.getRequestHeaders();
Headers header = httpExchange.getRequestHeaders();
try {
String requestBody = IOUtils.toString(httpExchange.getRequestBody(), encode);
for (Map.Entry<String, String[]> entry : handleMap.entrySet()) {
String orguri = entry.getKey();
if (dsturi.equals(orguri)) {
Object rtnVal = new Client().call(entry.getValue()[0], entry.getValue()[1], new Object[]{requestBody,header}).getContent();
Object rtnVal = new Client().call(entry.getValue()[0], entry.getValue()[1], new Object[]{requestBody, header}).getContent();
if (rtnVal instanceof String) {
response(httpExchange, (String) rtnVal, encode);
} else {
......@@ -92,18 +92,18 @@ public class HttpsService extends AbsServer {
return;
}
}
logger.warn( "当前请求没有匹配的交易处理:" + dsturi);
logger.warn("当前请求没有匹配的交易处理:" + dsturi);
} catch (Exception e) {
logger.error( "当前请求处理异常:" + dsturi, e);
logger.error("当前请求处理异常:" + dsturi, e);
}
}
});
server.start();
} catch (Exception e) {
logger.error( "https server start failed:" + e.getMessage(), e);
logger.error("https server start failed:" + e.getMessage(), e);
throw new InterfaceException("01410", "https server start failed", e);
}
logger.info( LOG_FLAG + "HttpsService[id:" + serviceDef.getId() + "] is started." + LOG_FLAG);
logger.info(LOG_FLAG + "HttpsService[id:" + serviceDef.getId() + "] is started." + LOG_FLAG);
}
private Map<String, String[]> checkConfigValidity(String uri, String interfaceName, String transactionName) {
......@@ -130,15 +130,15 @@ public class HttpsService extends AbsServer {
private SSLContext getSslContext(String storePath, String storePwd, String ksttyp, String kmftyp) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException, KeyManagementException {
KeyStore ks = KeyStore.getInstance(ksttyp);
//载入证书
// 载入证书
ks.load(new FileInputStream(storePath), storePwd.toCharArray());
//建立一个密钥管理工厂
// 建立一个密钥管理工厂
KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmftyp);
//初始化工厂
// 初始化工厂
kmf.init(ks, storePwd.toCharArray());
//建立证书实体
// 建立证书实体
SSLContext sslContext = SSLContext.getInstance(Def_SSL_Context);
//初始化证书
// 初始化证书
sslContext.init(kmf.getKeyManagers(), null, null);
return sslContext;
}
......@@ -155,14 +155,14 @@ public class HttpsService extends AbsServer {
os.write(rtnmsg.getBytes(encode));
os.flush();
} catch (Exception e) {
logger.error( "response occure exception:" + e.getMessage(), e);
logger.error("response occure exception:" + e.getMessage(), e);
throw new InterfaceException("01410", "response occure exception", e);
} finally {
if (os != null) {
try {
os.close();
} catch (IOException e) {
logger.error( "OutputStream close occure exception:" + e.getMessage(), e);
logger.error("OutputStream close occure exception:" + e.getMessage(), e);
throw new InterfaceException("01410", "OutputStream close occure exception", e);
}
}
......
......@@ -6,7 +6,7 @@ import com.brilliance.eibs.util.StringUtil;
/**
* FTP连接
*
*
* @author gechengyang
**/
public class FtpConnection2 extends AbsConnection {
......@@ -14,8 +14,9 @@ public class FtpConnection2 extends AbsConnection {
String encoding;
FtpClient client;
@Override
public void execute(Context context) {
logger.info( LOG_FLAG + "FtpConnection is Running." + LOG_FLAG);
logger.info(LOG_FLAG + "FtpConnection is Running." + LOG_FLAG);
encoding = connectionDef.getEncoding();
context.setCurrentInstance(this);
// 获取超时时间
......@@ -33,26 +34,25 @@ public class FtpConnection2 extends AbsConnection {
boolean deleteFlag = Boolean.valueOf(getPropertyValue("delete", true));
String filenameRegex = getPropertyValue("filenameregex", true);
String passiveModeStr = getPropertyValue("passivemode", true);
boolean passiveMode = StringUtil.isEmpty(passiveModeStr)?false:Boolean.valueOf(passiveModeStr);
boolean passiveMode = StringUtil.isEmpty(passiveModeStr) ? false : Boolean.valueOf(passiveModeStr);
client = new FtpClient(url, port, connectTimeout);
client.login(username, password,passiveMode);
client.login(username, password, passiveMode);
try {
if (isDownload(type)) {
logger.debug( "FTP begin to Download.");
logger.debug("FTP begin to Download.");
client.download(ctlPath, ctlFilenameregex, remotepath, localpath, filenameRegex,
arc, deleteFlag);
context.setObject(client.getFilenames());
}
if (isUpload(type)) {
logger.debug( "FTP begin to Upload.");
logger.debug("FTP begin to Upload.");
client.upload(localpath, remotepath, filenameRegex, arc);
}
} finally {
client.disConnect();
}
logger.info( "FtpConnection is finished.");
logger.info("FtpConnection is finished.");
}
private boolean isDownload(String type) {
......
......@@ -20,9 +20,8 @@ public class FtpConnection_beta extends AbsConnection {
String encoding = "";
FTPClient ftp = null;
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "FtpConnection is Running." +
// LOG_FLAG);
encoding = connectionDef.getEncoding();
context.setCurrentInstance(this);
String username = getPropertyValue("username");
......@@ -85,7 +84,6 @@ public class FtpConnection_beta extends AbsConnection {
close(ftpExcutor);
return;
}
ftp.setDataTimeout(receiveTimeout);
try {
ftp.setSoTimeout(soTimeout);
......@@ -95,21 +93,7 @@ public class FtpConnection_beta extends AbsConnection {
if (isDownload(type)) {
logger.debug("FTP begin to Download.");
try {
// 解决下载文件夹时只有一个文件下载,备份出错问题 不用特殊判断,直接调用downloadDirFiles即可
// TODO modified by yangrui 20150917
// FTPFile[] ftpFiles = ftp.listFiles(new
// String(remotepath.getBytes(encoding), encoding));
// if (ftpFiles.length == 1 && ftpFiles[0].isFile())
// {
// String remote = remotepath.substring(0,
// remotepath.lastIndexOf("/") + 1);
// ftpExcutor.downSingleFile(ftpFiles[0].getName(), remote,
// localpath, arc, fileDirectorName);
// return;
// }
ftpExcutor.downloadDirFiles(fileName, remotepath, localpath, arc, fileDirectorName);
} catch (IOException e) {
close(ftpExcutor);
throw new InterfaceException("01204", "ftp download error:", e);
......
......@@ -9,6 +9,7 @@ import javax.jms.*;
public class JmsFilter extends AbsFilter {
private MessageProducer producer;
@Override
public void execute(final Context context) {
logger.info("Running JmsFilter.");
context.setCurrentInstance(this);
......@@ -16,6 +17,7 @@ public class JmsFilter extends AbsFilter {
MessageConsumer comsumer = (MessageConsumer) context.getObject();
try {
comsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
try {
if (m instanceof TextMessage) { // 接收文本消息
......@@ -41,21 +43,18 @@ public class JmsFilter extends AbsFilter {
}
handmessage();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error(e.getMessage(), e);
}
}
});
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error(e.getMessage(), e);
}
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
// TODO Auto-generated method stub
return null;
}
......
......@@ -10,70 +10,46 @@ import java.util.Hashtable;
/**
* mq连接器
*
* @author hujun
*
* @author hujun
*/
public class MQConnection extends AbsConnection {
@Override
public void execute(Context context) {
logger.info( LOG_FLAG + "MQConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
try {
// MQEnvironment.port = getPropertyValueInt(PORT_SYMBOL);
// logger.debug("mq port=" + MQEnvironment.port);
// MQEnvironment.hostname = getPropertyValue(IP_SYMBOL);
// logger.debug("mq ip=" + MQEnvironment.hostname);
// MQEnvironment.channel = getPropertyValue(MQ_CHANNEL_SYMBOL);
// MQEnvironment.CCSID = getPropertyValueInt(MQ_CCSID_SYMBOL);
// String userId= getPropertyValue("userID",true);
int port = getPropertyValueInt(PORT_SYMBOL);
logger.debug( "mq port=" + port);
String hostname = getPropertyValue(IP_SYMBOL);
logger.debug( "mq ip=" + hostname);
String channel = getPropertyValue(MQ_CHANNEL_SYMBOL);
int CCSID = getPropertyValueInt(MQ_CCSID_SYMBOL);
String userId = getPropertyValue("userID", true);
String password = getPropertyValue("password", true);
// if(StringUtil.isEmpty(userId))
// {
// MQEnvironment.userID =userId;
// }
logger.debug( "mq cssid=" + CCSID);
String managerName = getPropertyValue(MQ_QUEUE_MANAGER_SYMBOL);
// addProp2Context(MQ_QUEUE_SYMBOL);
addProp2Context("transaction", MQ_QUEUE_SYMBOL);
Object obj[] = new Object[3];
Hashtable<String, Object> options = new Hashtable<String, Object>();
options.put("hostname", hostname);
options.put("port", port);
options.put("channel", channel);
options.put("CCSID", CCSID);
if (!StringUtil.isEmpty(userId)) {
options.put("userID", userId);
}
if (!StringUtil.isEmpty(password)) {
options.put("password", password);
}
MQQueueManager manager=new MQQueueManager(managerName, options);
obj[0] = manager;
obj[1] = getPropertyValue(MQ_QUEUE_SYMBOL, true);
obj[2] = getPropertyValue(MQ_RECEIVERTIMEOUT, true);
// context.setObject(manager);
context.setObject(obj);
// int openOptions;
// boolean istrue=true;
// if (istrue)
// openOptions = MQConstants.MQOO_OUTPUT |
// MQConstants.MQOO_FAIL_IF_QUIESCING;
// else
// // openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF |
// MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
// openOptions = CMQC.MQOO_INPUT_AS_Q_DEF | CMQC.MQOO_INQUIRE;
// manager.accessQueue(queueName, openOptions);
} catch (MQException e) {
throw new InterfaceException("01701", e);
}
logger.info( LOG_FLAG + "MQConnection is finished." + LOG_FLAG);
}
@Override
public void execute(Context context) {
logger.info(LOG_FLAG + "MQConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
try {
int port = getPropertyValueInt(PORT_SYMBOL);
logger.debug("mq port=" + port);
String hostname = getPropertyValue(IP_SYMBOL);
logger.debug("mq ip=" + hostname);
String channel = getPropertyValue(MQ_CHANNEL_SYMBOL);
int CCSID = getPropertyValueInt(MQ_CCSID_SYMBOL);
String userId = getPropertyValue("userID", true);
String password = getPropertyValue("password", true);
logger.debug("mq cssid=" + CCSID);
String managerName = getPropertyValue(MQ_QUEUE_MANAGER_SYMBOL);
addProp2Context("transaction", MQ_QUEUE_SYMBOL);
Object obj[] = new Object[3];
Hashtable<String, Object> options = new Hashtable<String, Object>();
options.put("hostname", hostname);
options.put("port", port);
options.put("channel", channel);
options.put("CCSID", CCSID);
if (!StringUtil.isEmpty(userId)) {
options.put("userID", userId);
}
if (!StringUtil.isEmpty(password)) {
options.put("password", password);
}
MQQueueManager manager = new MQQueueManager(managerName, options);
obj[0] = manager;
obj[1] = getPropertyValue(MQ_QUEUE_SYMBOL, true);
obj[2] = getPropertyValue(MQ_RECEIVERTIMEOUT, true);
context.setObject(obj);
} catch (MQException e) {
throw new InterfaceException("01701", e);
}
logger.info(LOG_FLAG + "MQConnection is finished." + LOG_FLAG);
}
}
......@@ -10,75 +10,81 @@ import java.util.Properties;
public class RocketMQOpsProducerConnection extends AbsConnection {
@Override
public void execute(Context context) {
logger.info( LOG_FLAG
+ "RocketMQOpsProducerConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
Object obj = context.getObject();
if (null != obj) {
Properties producerProperties = new Properties();
Producer producer = null;
String Producer_ID = ""; // 生产者ID
String ACCESS_KEY = ""; // 阿里云身份验证AccessKey
String SECRET_KEY = ""; // 阿里云身份验证SecretKey
String ONS_ADDR = ""; // 消息队列服务器地址
String NAMESRV_ADDR = ""; // 消息队列服务器地址
String TOPIC; // 消息主题
String TAG; // 消息标签
String charset;// 字符集
charset = getPropertyValue("charset", "utf-8");
Producer_ID = getPropertyValue("GroupId");
TOPIC = getPropertyValue("topic");
TAG = getPropertyValue("tag");
if (StringUtil.isEmpty(Producer_ID) || StringUtil.isEmpty(TOPIC)
|| StringUtil.isEmpty(TAG)) {
throw new IllegalArgumentException(
"[GroupId],[topic],[tag]三个属性必须配置");
}
ACCESS_KEY = getPropertyValue("AccessKey");
SECRET_KEY = getPropertyValue("SecretKey");
ONS_ADDR = getPropertyValue("ONSAddr","");
NAMESRV_ADDR = getPropertyValue("NamesrvAddr","");
producerProperties.setProperty(PropertyKeyConst.GROUP_ID,
Producer_ID);
producerProperties.setProperty(PropertyKeyConst.AccessKey,
ACCESS_KEY);
producerProperties.setProperty(PropertyKeyConst.SecretKey,
SECRET_KEY);
if (!StringUtil.isEmpty(ONS_ADDR)) {
producerProperties.put(PropertyKeyConst.ONSAddr, ONS_ADDR);
} else if (!StringUtil.isEmpty(NAMESRV_ADDR)) {
producerProperties.put(PropertyKeyConst.NAMESRV_ADDR,
NAMESRV_ADDR);
} else {
logger.error( LOG_FLAG + "不支持的接入方式." + LOG_FLAG);
throw new IllegalAccessError("不支持的接入方式.");
}
try {
producer = ONSFactory.createProducer(producerProperties);
producer.start();
Message message = new Message(TOPIC, TAG, obj.toString()
.getBytes(charset));
SendResult sendResult = producer.send(message);
assert sendResult != null;
context.setObject("SEND_OK");
logger.info(
new Date() + " Send mq message success! Topic is:"
+ TOPIC + " Tag is: " + TAG + " msgId is: "
+ sendResult.getMessageId());
} catch (Exception e) {
logger.error( new Date()
+ " Send mq message failed! Topic is:" + TOPIC
+ " Tag is: " + TAG);
context.setObject("SEND_FAILED");
throw new InterfaceException("03501", e);
} finally {
if (producer != null)
producer.shutdown();
}
}
logger.info( LOG_FLAG
+ "RocketMQOpsProducerConnection is finished." + LOG_FLAG);
}
@Override
public void execute(Context context) {
logger.info(LOG_FLAG
+ "RocketMQOpsProducerConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
Object obj = context.getObject();
if (null != obj) {
Properties producerProperties = new Properties();
Producer producer = null;
// 生产者ID
String Producer_ID = "";
// 阿里云身份验证AccessKey
String ACCESS_KEY = "";
// 阿里云身份验证SecretKey
String SECRET_KEY = "";
// 消息队列服务器地址
String ONS_ADDR = "";
// 消息队列服务器地址
String NAMESRV_ADDR = "";
String TOPIC; // 消息主题
String TAG; // 消息标签
String charset;// 字符集
charset = getPropertyValue("charset", "utf-8");
Producer_ID = getPropertyValue("GroupId");
TOPIC = getPropertyValue("topic");
TAG = getPropertyValue("tag");
if (StringUtil.isEmpty(Producer_ID) || StringUtil.isEmpty(TOPIC)
|| StringUtil.isEmpty(TAG)) {
throw new IllegalArgumentException(
"[GroupId],[topic],[tag]三个属性必须配置");
}
ACCESS_KEY = getPropertyValue("AccessKey");
SECRET_KEY = getPropertyValue("SecretKey");
ONS_ADDR = getPropertyValue("ONSAddr", "");
NAMESRV_ADDR = getPropertyValue("NamesrvAddr", "");
producerProperties.setProperty(PropertyKeyConst.GROUP_ID,
Producer_ID);
producerProperties.setProperty(PropertyKeyConst.AccessKey,
ACCESS_KEY);
producerProperties.setProperty(PropertyKeyConst.SecretKey,
SECRET_KEY);
if (!StringUtil.isEmpty(ONS_ADDR)) {
producerProperties.put(PropertyKeyConst.ONSAddr, ONS_ADDR);
} else if (!StringUtil.isEmpty(NAMESRV_ADDR)) {
producerProperties.put(PropertyKeyConst.NAMESRV_ADDR,
NAMESRV_ADDR);
} else {
logger.error(LOG_FLAG + "不支持的接入方式." + LOG_FLAG);
throw new IllegalAccessError("不支持的接入方式.");
}
try {
producer = ONSFactory.createProducer(producerProperties);
producer.start();
Message message = new Message(TOPIC, TAG, obj.toString()
.getBytes(charset));
SendResult sendResult = producer.send(message);
assert sendResult != null;
context.setObject("SEND_OK");
logger.info(
new Date() + " Send mq message success! Topic is:"
+ TOPIC + " Tag is: " + TAG + " msgId is: "
+ sendResult.getMessageId());
} catch (Exception e) {
logger.error(new Date()
+ " Send mq message failed! Topic is:" + TOPIC
+ " Tag is: " + TAG);
context.setObject("SEND_FAILED");
throw new InterfaceException("03501", e);
} finally {
if (producer != null) {
producer.shutdown();
}
}
}
logger.info(LOG_FLAG
+ "RocketMQOpsProducerConnection is finished." + LOG_FLAG);
}
}
......@@ -17,7 +17,7 @@ public class RocketMQProducerConnection extends AbsConnection {
@Override
public void execute(Context context) {
logger.info( LOG_FLAG + "RocketMQProducerConnection is Running." + LOG_FLAG);
logger.info(LOG_FLAG + "RocketMQProducerConnection is Running." + LOG_FLAG);
context.setCurrentInstance(this);
DefaultMQProducer producer = null;
......@@ -25,7 +25,8 @@ public class RocketMQProducerConnection extends AbsConnection {
String groupName = getPropertyValue("groupName");
String serverAddress = getPropertyValue("serverAddress");
String topic = getPropertyValue("topic");
String subExpression = getPropertyValue("subExpression", "");;
String subExpression = getPropertyValue("subExpression", "");
;
// Instantiate with a producer group name.
producer = new DefaultMQProducer(groupName);
......@@ -36,15 +37,16 @@ public class RocketMQProducerConnection extends AbsConnection {
Object obj = context.getObject();
if (null != obj) {
logger.debug( "send_message="+obj);
logger.debug("send_message=" + obj);
String encoding = connectionDef.getEncoding();
Message msg =
new Message(topic /* Topic */,
new Message(topic,
(obj.toString()).getBytes(StringUtil.isEmpty(encoding)
? RemotingHelper.DEFAULT_CHARSET
: encoding)); /* Message body */
if (!StringUtil.isEmpty(subExpression))
if (!StringUtil.isEmpty(subExpression)) {
msg.setTags(subExpression);
}
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
context.setObject(sendResult.getSendStatus());
......@@ -63,6 +65,6 @@ public class RocketMQProducerConnection extends AbsConnection {
} finally {
producer.shutdown();
}
logger.info( LOG_FLAG + "RocketMQProducerConnection is finished." + LOG_FLAG);
logger.info(LOG_FLAG + "RocketMQProducerConnection is finished." + LOG_FLAG);
}
}
......@@ -23,7 +23,7 @@ public class RocketMQServer extends AbsServer {
@Override
public void run() {
logger.info( LOG_FLAG + "RocketMQServer starting ." + LOG_FLAG);
logger.info(LOG_FLAG + "RocketMQServer starting ." + LOG_FLAG);
final String interfaceName = getRequiredPropertyValue("interfaceName");
final String transactionName = getRequiredPropertyValue("transactionName");
......@@ -45,32 +45,30 @@ public class RocketMQServer extends AbsServer {
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
ConsumeConcurrentlyContext context) {
Client client = new Client();
client.call(interfaceName, transactionName, new Object[] {msgs});
client.call(interfaceName, transactionName, new Object[]{msgs});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch the consumer instance.
consumer.start();
} catch (MQClientException e) {
logger.error( LOG_FLAG + "RocketMQServer error." + LOG_FLAG, e);
logger.error(LOG_FLAG + "RocketMQServer error." + LOG_FLAG, e);
close();
}
logger.info( LOG_FLAG + "RocketMQServer is finished." + LOG_FLAG);
logger.info(LOG_FLAG + "RocketMQServer is finished." + LOG_FLAG);
}
@Override
public void close() {
if (null != consumer)
if (null != consumer) {
consumer.shutdown();
}
}
}
......@@ -22,17 +22,28 @@ public class RocketMQServerOns extends AbsServer {
public void run() {
logger.info(LOG_FLAG + "RocketMQOnsServer starting ." + LOG_FLAG);
Properties properties;
String CONSUMER_ID = ""; //消费者ID
String ACCESS_KEY = ""; //阿里云身份验证AccessKey
String SECRET_KEY = ""; //阿里云身份验证SecretKey
String ONS_ADDR = ""; //消息队列服务器地址
String NAMESRV_ADDR = ""; //消息队列服务器地址
String[] TOPIC; //消息主题组
String[] TAG; //消息标签组
int CONSUME_THREAD_NUM = 0; //消费线程数
int MAX_RECONSUME_TIMES = 0; //消息消费失败的最大重试次数
int CONSUME_TIMEOUT = 0; //每条消息消费的最大超时时间
int SUSPEND_TIME_MILLIS = 0; //消息消费失败的重试间隔时间(只适用于顺序消息)
// 消费者ID
String CONSUMER_ID = "";
// 阿里云身份验证AccessKey
String ACCESS_KEY = "";
// 阿里云身份验证SecretKey
String SECRET_KEY = "";
// 消息队列服务器地址
String ONS_ADDR = "";
// 消息队列服务器地址
String NAMESRV_ADDR = "";
// 消息主题组
String[] TOPIC;
// 消息标签组
String[] TAG;
// 消费线程数
int CONSUME_THREAD_NUM = 0;
// 消息消费失败的最大重试次数
int MAX_RECONSUME_TIMES = 0;
// 每条消息消费的最大超时时间
int CONSUME_TIMEOUT = 0;
// 消息消费失败的重试间隔时间(只适用于顺序消息)
int SUSPEND_TIME_MILLIS = 0;
String interfaceName = getRequiredPropertyValue("interfaceName");
String transactionName = getRequiredPropertyValue("transactionName");
......@@ -48,16 +59,16 @@ public class RocketMQServerOns extends AbsServer {
MAX_RECONSUME_TIMES = getPropertyValue("MaxReconsumeTimes", 1);
CONSUME_TIMEOUT = getPropertyValue("ConsumeTimeout", 15);
SUSPEND_TIME_MILLIS = getPropertyValue("SuspendTimeMillis", 3000);
//校验topic和tag数量是否一致,支持多消息订阅
// 校验topic和tag数量是否一致,支持多消息订阅
checkTopicAndTag(TOPIC, TAG);
properties = new Properties();
//消息消费者ID
// 消息消费者ID
properties.put(PropertyKeyConst.GROUP_ID, CONSUMER_ID);
//阿里云身份验证AccessKey
// 阿里云身份验证AccessKey
properties.put(PropertyKeyConst.AccessKey, ACCESS_KEY);
//阿里云身份验证secretKey
// 阿里云身份验证secretKey
properties.put(PropertyKeyConst.SecretKey, SECRET_KEY);
//消息队列服务器地址
// 消息队列服务器地址
if (!StringUtil.isEmpty(ONS_ADDR)) {
properties.put(PropertyKeyConst.ONSAddr, ONS_ADDR);
} else if (!StringUtil.isEmpty(NAMESRV_ADDR)) {
......@@ -66,14 +77,14 @@ public class RocketMQServerOns extends AbsServer {
logger.error(LOG_FLAG + "不支持的接入方式." + LOG_FLAG);
throw new IllegalAccessError("不支持的接入方式.");
}
//设置 Consumer 实例的消费线程数,默认:64 (订阅 消费者)
// 设置 Consumer 实例的消费线程数,默认:64 (订阅 消费者)
properties.put(PropertyKeyConst.ConsumeThreadNums, CONSUME_THREAD_NUM);
//设置消息消费失败的最大重试次数,默认:16 (订阅 消费者)
// 设置消息消费失败的最大重试次数,默认:16 (订阅 消费者)
properties.put(PropertyKeyConst.MaxReconsumeTimes, MAX_RECONSUME_TIMES);
//设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败, (订阅 消费者)
//等下次重新投递再次消费。每个业务需要设置一个合理的值,单位(分钟)。默认:15
// 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败, (订阅 消费者)
// 等下次重新投递再次消费。每个业务需要设置一个合理的值,单位(分钟)。默认:15
properties.put(PropertyKeyConst.ConsumeTimeout, CONSUME_TIMEOUT);
//只适用于顺序消息,设置消息消费失败的重试间隔时间 单位(毫秒) (订阅 消费者)
// 只适用于顺序消息,设置消息消费失败的重试间隔时间 单位(毫秒) (订阅 消费者)
properties.put(PropertyKeyConst.SuspendTimeMillis, SUSPEND_TIME_MILLIS);
try {
logger.info("开始创建Consumer...");
......@@ -81,8 +92,9 @@ public class RocketMQServerOns extends AbsServer {
for (int i = 0; i < TOPIC.length; i++) {
String tp = TOPIC[i];
String tg = TAG[i];
if (StringUtil.isEmpty(tp))
if (StringUtil.isEmpty(tp)) {
continue;
}
logger.info("订阅的Topic" + i + ":" + tp);
logger.info("订阅的Tag" + i + ":" + tg);
consumer.subscribe(tp, tg, new MessageListenerImpl(interfaceName, transactionName, context, logger));
......@@ -104,8 +116,9 @@ public class RocketMQServerOns extends AbsServer {
@Override
public void close() {
if (null != consumer)
if (null != consumer) {
consumer.shutdown();
}
}
}
......@@ -132,7 +145,6 @@ class MessageListenerImpl implements MessageListener {
String msgId = message.getMsgID();
try {
topicTagStr = message.getTopic() + "," + message.getTag();
logger.info("msgId=" + msgId + "当前Topic+Tag=" + topicTagStr);
// 查询消息消费处理
Client client = new Client();
......
......@@ -6,46 +6,48 @@ import com.brilliance.eibs.lsocket.AliveClient;
/**
* @author gechengyang socket客户端长连接
* **/
**/
public class AliveSocketConnection extends AbsConnection {
@Override
public void execute(Context context) {
logger.info( "Running SocketClientConnection.");
context.setCurrentInstance(this);
super.execute(context);
setTimeOut();
AliveClient aclient = null;
try {
setTimeOut();
String ip = getPropertyValue(IP_SYMBOL);
int port = getPropertyValueInt(PORT_SYMBOL);
String heartbeat = getPropertyValue(HEAR_BEAT, true);
String content = (String) context.getObject();
aclient = AliveClient.getInstance(ip, port, connectTimeout, logger);
aclient.setHeartbeart(heartbeat);
aclient.put(content);
} catch (Exception e) {
throw new InterfaceException("00014", "SocketClientConnection exception occurs.", e);
}
logger.info( "SocketClientConnection is finished.");
}
@Override
public void execute(Context context) {
logger.info("Running SocketClientConnection.");
context.setCurrentInstance(this);
super.execute(context);
setTimeOut();
AliveClient aclient = null;
try {
setTimeOut();
String ip = getPropertyValue(IP_SYMBOL);
int port = getPropertyValueInt(PORT_SYMBOL);
String heartbeat = getPropertyValue(HEAR_BEAT, true);
String content = (String) context.getObject();
aclient = AliveClient.getInstance(ip, port, connectTimeout, logger);
aclient.setHeartbeart(heartbeat);
aclient.put(content);
} catch (Exception e) {
throw new InterfaceException("00014", "SocketClientConnection exception occurs.", e);
}
logger.info("SocketClientConnection is finished.");
}
/**
* 设置超时时间,包括连接超时,返回超时
*/
protected void setTimeOut() {
setConnectTimeOut();
}
/**
* 设置超时时间,包括连接超时,返回超时
*/
@Override
protected void setTimeOut() {
setConnectTimeOut();
}
/**
* 获取连接超时时间,如果为空,则使用默认时间
*/
protected void setConnectTimeOut() {
int connectTimeout = getPropertyValueInt(CONNECT_TIMEOUT_SYMBOL, true);
if (connectTimeout != 0) {
this.connectTimeout = connectTimeout;
}
logger.debug( "Connect timeout = " + this.connectTimeout);
}
/**
* 获取连接超时时间,如果为空,则使用默认时间
*/
@Override
protected void setConnectTimeOut() {
int connectTimeout = getPropertyValueInt(CONNECT_TIMEOUT_SYMBOL, true);
if (connectTimeout != 0) {
this.connectTimeout = connectTimeout;
}
logger.debug("Connect timeout = " + this.connectTimeout);
}
}
......@@ -15,69 +15,63 @@ import java.util.Map.Entry;
/**
* 201908 weicong 修改
*
*/
public class MinaClientConnection extends AbsConnection {
private MinaNioClient client;
private String charset;
private MinaNioClient client;
private String charset;
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "MinaClientConnection is Running." +
// LOG_FLAG);
charset = getPropertyValue("charset", Charset.defaultCharset().name());
MinaSocketClientAdaptor adaptor =null;
try {
setTimeOut();
context.setCurrentInstance(this);
int port = getPropertyValueInt(PORT_SYMBOL);
logger.info( "socket port=" + port);
String hostname = getPropertyValue(IP_SYMBOL);
logger.info( "socket hostname=" + hostname);
Map<String, String> props=new HashMap<String, String>();
for(Entry<String, IPropertyDef> entry:this.connectionDef.getPropertyMap().entrySet()){
props.put(entry.getKey(), entry.getValue().getValue());
}
client = new MinaNioClient(hostname, port,props);
client.init();
client.setTimeout(connectTimeout==0?1000:connectTimeout);
adaptor = new MinaSocketClientAdaptor(client,charset,receiveTimeout==0?2000:receiveTimeout);
client.loadPloyHandler(new NioProxyHandler(adaptor));
client.connect();
adaptor.setSession(client.getSession());
context.getResource().setSocketProxy(adaptor);
} catch (Exception e) {
if(adaptor!=null){
adaptor.close();
}
throw new InterfaceException("02102", e);
}
// logger.info( LOG_FLAG + "MinaClientConnection is finished." +
// LOG_FLAG);
}
@Override
public void execute(Context context) {
charset = getPropertyValue("charset", Charset.defaultCharset().name());
MinaSocketClientAdaptor adaptor = null;
try {
setTimeOut();
context.setCurrentInstance(this);
int port = getPropertyValueInt(PORT_SYMBOL);
logger.info("socket port=" + port);
String hostname = getPropertyValue(IP_SYMBOL);
logger.info("socket hostname=" + hostname);
Map<String, String> props = new HashMap<String, String>();
for (Entry<String, IPropertyDef> entry : this.connectionDef.getPropertyMap().entrySet()) {
props.put(entry.getKey(), entry.getValue().getValue());
}
client = new MinaNioClient(hostname, port, props);
client.init();
client.setTimeout(connectTimeout == 0 ? 1000 : connectTimeout);
adaptor = new MinaSocketClientAdaptor(client, charset, receiveTimeout == 0 ? 2000 : receiveTimeout);
client.loadPloyHandler(new NioProxyHandler(adaptor));
client.connect();
adaptor.setSession(client.getSession());
context.getResource().setSocketProxy(adaptor);
} catch (Exception e) {
if (adaptor != null) {
adaptor.close();
}
throw new InterfaceException("02102", e);
}
}
private static class NioProxyHandler extends NioProcessHandler {
private static class NioProxyHandler extends NioProcessHandler {
private MinaSocketClientAdaptor adaptor;
private MinaSocketClientAdaptor adaptor;
private NioProxyHandler(MinaSocketClientAdaptor adaptor) {
this.adaptor = adaptor;
}
private NioProxyHandler(MinaSocketClientAdaptor adaptor) {
this.adaptor = adaptor;
}
@Override
protected void process(IoSession session, Object message) {
adaptor.setMessage(message);
}
@Override
protected void process(IoSession session, Object message) {
adaptor.setMessage(message);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
super.exceptionCaught(session, cause);
adaptor.close();
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
super.exceptionCaught(session, cause);
adaptor.close();
}
}
}
}
......@@ -12,14 +12,14 @@ import java.net.Socket;
/**
* ProxyFilter 通信代理
*
* @author gechengyang
*
* @author gechengyang
*/
public class ProxyFilter extends AbsFilter {
SocketProxy socketp = null;
SocketProxy midsp = null;
@Override
public void execute(Context context) {
logger.info(LOG_FLAG + "ProxyFilter is running" + LOG_FLAG);
context.setCurrentInstance(this);
......@@ -29,24 +29,25 @@ public class ProxyFilter extends AbsFilter {
logger.info(LOG_FLAG + "ProxyFilter has finished running" + LOG_FLAG);
}
@Override
protected void packField(IFieldDef fieldDef, Object fieldValue) {
}
@Override
public Object getFieldValue(IFieldDef fieldDef) {
return null;
}
public void run() {
Object obj[] = (Object[]) context.getResource().getObject();
if (obj == null) return;
if (obj == null) {
return;
}
socketp = (SocketProxy) obj[0];
Socket socket = socketp.getSocket();
midsp = (SocketProxy) obj[1];
Socket mids = midsp.getSocket();
try {
byte[] buffer = new byte[1024];
int a = -1;
logger.info("proxyFilter belongs to " + context.getVariable("transactionName"));
......@@ -57,7 +58,6 @@ public class ProxyFilter extends AbsFilter {
} catch (Exception e) {
logger.error("socket error:", e);
throw new InterfaceException("02201");
} finally {
logger.info("[" + context.getVariable("transactionName") + "]sokcet is closed nomally");
IOUtils.closeQuietly(socket);
......@@ -65,16 +65,9 @@ public class ProxyFilter extends AbsFilter {
try {
logger.info("socketProxy Map:" + SocketManagerFactoryUtil.getSocketProxyMap());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error(e.getMessage(), e);
}
/*
* if (socketp!=null&&SocketClientConnection.ali
*
* veProxys.containsKey(socketp)) SocketClientConnection.aliveProxys.remove(socketp);
*/
}
}
}
......@@ -13,108 +13,109 @@ import java.net.SocketTimeoutException;
/**
* Socket客户端连接器
*
*
* @author gechengyang
*/
public class ShortSocketClientConnection extends AbsConnection {
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "SocketClientConnection is Running."
// + LOG_FLAG);
context.setCurrentInstance(this);
ISocketProxy socketProxy = null;
try {
setTimeOut();
int port = getPropertyValueInt(PORT_SYMBOL);
logger.info( "socket port=" + port);
String ip = getPropertyValue(IP_SYMBOL);
logger.info( "socket ip=" + ip);
boolean has_head = true;// 是否有报文头
String head_len_type = "";// 报文头长度是二进制还是十进制
int head_len = 0;// 报文头长度
boolean is_contain_head_len = false;// 报文头长度是否包含报文头
int fill_len = 0;// 默认为0
boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
ISocketProxy socketProxy = null;
try {
setTimeOut();
int port = getPropertyValueInt(PORT_SYMBOL);
logger.info("socket port=" + port);
String ip = getPropertyValue(IP_SYMBOL);
logger.info("socket ip=" + ip);
// 是否有报文头
boolean has_head = true;
// 报文头长度是二进制还是十进制
String head_len_type = "";
// 报文头长度
int head_len = 0;
// 报文头长度是否包含报文头
boolean is_contain_head_len = false;
// 默认为0
int fill_len = 0;
// 报文头长度是否包含报文头和报文体之间数据的长度 true
boolean is_contain_fill_len = false;
has_head = getPropertyValueBoolean(IServerInstance.HAS_CONTAIN_HEAD, true);
head_len_type = getPropertyValue(IServerInstance.HEAD_LEN_TYPE, IServerInstance.HEAD_LEN_TYPE_10);
head_len = getPropertyValue(IServerInstance.HEAD_LEN, 0);
is_contain_head_len = getPropertyValueBoolean(IServerInstance.IS_CONTAIN_HEAD_LEN, false);
fill_len = getPropertyValue(IServerInstance.FILL_LEN, 0);
is_contain_fill_len = getPropertyValueBoolean(IServerInstance.IS_CONTAIN_FILL_LEN, false);
has_head = getPropertyValueBoolean(IServerInstance.HAS_CONTAIN_HEAD, true);
head_len_type = getPropertyValue(IServerInstance.HEAD_LEN_TYPE, IServerInstance.HEAD_LEN_TYPE_10);
head_len = getPropertyValue(IServerInstance.HEAD_LEN, 0);
is_contain_head_len = getPropertyValueBoolean(IServerInstance.IS_CONTAIN_HEAD_LEN, false);
fill_len = getPropertyValue(IServerInstance.FILL_LEN, 0);
is_contain_fill_len = getPropertyValueBoolean(IServerInstance.IS_CONTAIN_FILL_LEN, false);
if (has_head) {
if (has_head) {
int headLen = 0;
byte[] headLenBytes = null;
Object obj = context.getObject();
byte[] sendData = null;
if (obj instanceof String) {
sendData = ((String) obj).getBytes(connectionDef.getEncoding());
} else if (obj instanceof byte[]) {
sendData = (byte[]) obj;
}
headLen = sendData.length;
if (is_contain_head_len) {
headLen = headLen + head_len;
}
if (fill_len != 0) {
if (!is_contain_fill_len) {
headLen = headLen + fill_len;
}
}
if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) {
headLenBytes = String.format("%0" + head_len + "d", headLen).getBytes();
} else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) {
headLenBytes = CommonFunctionUtils.intToBytes(headLen);
}
socketProxy = new SocketProxy(new InetSocketAddress(ip, port), connectTimeout, receiveTimeout, connectionDef.getEncoding(), context);
socketProxy.send(headLenBytes);
socketProxy.send(sendData);
int headLen = 0;
byte[] headLenBytes = null;
Object obj = context.getObject();
byte[] sendData = null;
if (obj instanceof String) {
sendData = ((String) obj).getBytes(connectionDef.getEncoding());
} else if (obj instanceof byte[]) {
sendData = (byte[]) obj;
}
headLen = sendData.length;
if (is_contain_head_len) {
headLen = headLen + head_len;
}
if (fill_len != 0) {
if (!is_contain_fill_len) {
headLen = headLen + fill_len;
}
}
if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) {
headLenBytes = String.format("%0" + head_len + "d", headLen).getBytes();
} else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) {
headLenBytes = CommonFunctionUtils.intToBytes(headLen);
}
socketProxy = new SocketProxy(new InetSocketAddress(ip, port), connectTimeout, receiveTimeout, connectionDef.getEncoding(), context);
socketProxy.send(headLenBytes);
socketProxy.send(sendData);
headLenBytes = new byte[head_len];
IOUtils.readFully(socketProxy.getSocket().getInputStream(), headLenBytes);
if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) {
headLen = Integer.parseInt(new String(headLenBytes));
} else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) {
headLen = CommonFunctionUtils.bytesToInt(headLenBytes);
}
if (is_contain_head_len) {
headLen = headLen - head_len;
}
if (fill_len != 0) {
if (!is_contain_fill_len) {
headLen = headLen + fill_len;
}
}
byte[] databuffer = new byte[headLen];
IOUtils.readFully(socketProxy.getSocket().getInputStream(), databuffer);
context.setObject(databuffer);
} else {
headLenBytes = new byte[head_len];
IOUtils.readFully(socketProxy.getSocket().getInputStream(), headLenBytes);
if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) {
headLen = Integer.parseInt(new String(headLenBytes));
} else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) {
headLen = CommonFunctionUtils.bytesToInt(headLenBytes);
}
if (is_contain_head_len) {
headLen = headLen - head_len;
}
if (fill_len != 0) {
if (!is_contain_fill_len) {
headLen = headLen + fill_len;
}
}
byte[] databuffer = new byte[headLen];
IOUtils.readFully(socketProxy.getSocket().getInputStream(), databuffer);
context.setObject(databuffer);
} else {
}
} catch (SocketTimeoutException e) {
closeSocket(socketProxy);
throw new InterfaceException("02101", e);
} catch (Exception e) {
closeSocket(socketProxy);
throw new InterfaceException("02102", e);
} finally {
closeSocket(socketProxy);
}
}
}
} catch (SocketTimeoutException e) {
closeSocket(socketProxy);
throw new InterfaceException("02101", e);
} catch (Exception e) {
closeSocket(socketProxy);
throw new InterfaceException("02102", e);
} finally {
closeSocket(socketProxy);
}
// logger.info( LOG_FLAG + "SocketClientConnection is finished."
// + LOG_FLAG);
}
/**
* 关闭socket连接
*
* @param proxy
*/
private void closeSocket(ISocketProxy proxy) {
if (proxy != null)
proxy.close();
}
/**
* 关闭socket连接
*
* @param proxy
*/
private void closeSocket(ISocketProxy proxy) {
if (proxy != null) {
proxy.close();
}
}
}
......@@ -15,104 +15,96 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Socket客户端连接器
*
*
* @author hujun ------------modified by weicong on 2016/4/01 -----------
* 兼容sslsocket
* 兼容sslsocket
*/
public class SocketClientConnection extends AbsConnection {
/**
* 保存socket长连接
*/
public static Map<SocketIdentity, ISocketProxy> aliveProxys = new ConcurrentHashMap<SocketIdentity, ISocketProxy>();
/**
* 保存socket长连接
*/
public static Map<SocketIdentity, ISocketProxy> aliveProxys = new ConcurrentHashMap<SocketIdentity, ISocketProxy>();
/**
* @author hujun
*
* 用于标记标识socket长连接
*
*/
private static class SocketIdentity {
private String interfaceId;
private String transactionName;
private String ref;
/**
* @author hujun
* <p>
* 用于标记标识socket长连接
*/
private static class SocketIdentity {
private String interfaceId;
private String transactionName;
private String ref;
private SocketIdentity(String interfaceId, String transactionName, String ref) {
this.interfaceId = interfaceId;
this.transactionName = transactionName;
this.ref = ref;
}
private SocketIdentity(String interfaceId, String transactionName, String ref) {
this.interfaceId = interfaceId;
this.transactionName = transactionName;
this.ref = ref;
}
private static SocketIdentity bulid(String interfaceId, String transactionName, String ref) {
return new SocketIdentity(interfaceId, transactionName, ref);
}
private static SocketIdentity bulid(String interfaceId, String transactionName, String ref) {
return new SocketIdentity(interfaceId, transactionName, ref);
}
@Override
public boolean equals(Object obj) {
boolean b = EqualsBuilder.reflectionEquals(this, obj);
return b;
}
@Override
public boolean equals(Object obj) {
boolean b = EqualsBuilder.reflectionEquals(this, obj);
return b;
}
@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this);
}
@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this);
}
}
}
@Override
public void execute(Context context) {
// logger.info( LOG_FLAG + "SocketClientConnection is Running."
// + LOG_FLAG);
context.setCurrentInstance(this);
ISocketProxy socketProxy = null;
try {
setTimeOut();
final int port = getPropertyValueInt(PORT_SYMBOL);
logger.info( "socket port=" + port);
final String ip = getPropertyValue(IP_SYMBOL);
logger.info( "socket ip=" + ip);
String alive = getPropertyValue(ALIVE_SYMBOL, true);
@Override
public void execute(Context context) {
context.setCurrentInstance(this);
ISocketProxy socketProxy = null;
try {
setTimeOut();
final int port = getPropertyValueInt(PORT_SYMBOL);
logger.info("socket port=" + port);
final String ip = getPropertyValue(IP_SYMBOL);
logger.info("socket ip=" + ip);
String alive = getPropertyValue(ALIVE_SYMBOL, true);
SocketIdentity socketId = SocketIdentity.bulid(context.getResource().getInterfaceName(), context.getResource().getTransactionName(),
getIDef().getRef());
// 长连接池中获取
if (!StringUtil.isEmpty(alive) && Boolean.valueOf(alive)) {
/*
* synchronized (lock) {
*/
socketProxy = aliveProxys.get(socketId);
if (null == socketProxy || !socketProxy.isConnected()) {
socketProxy = new SocketProxy(new InetSocketAddress(ip, port), connectTimeout, receiveTimeout, connectionDef.getEncoding(),
context);
aliveProxys.put(socketId, socketProxy);
}
/* } */
} else {
if (null == socketProxy) {
socketProxy = new SocketProxy(new InetSocketAddress(ip, port), connectTimeout, receiveTimeout, connectionDef.getEncoding(),
context);
}
}
SocketIdentity socketId = SocketIdentity.bulid(context.getResource().getInterfaceName(), context.getResource().getTransactionName(),
getIDef().getRef());
// 长连接池中获取
if (!StringUtil.isEmpty(alive) && Boolean.valueOf(alive)) {
socketProxy = aliveProxys.get(socketId);
if (null == socketProxy || !socketProxy.isConnected()) {
socketProxy = new SocketProxy(new InetSocketAddress(ip, port), connectTimeout, receiveTimeout, connectionDef.getEncoding(),
context);
aliveProxys.put(socketId, socketProxy);
}
} else {
if (null == socketProxy) {
socketProxy = new SocketProxy(new InetSocketAddress(ip, port), connectTimeout, receiveTimeout, connectionDef.getEncoding(),
context);
}
}
context.getResource().setSocketProxy(socketProxy);
} catch (SocketTimeoutException e) {
closeSocket(socketProxy);
throw new InterfaceException("02101", e);
} catch (Exception e) {
closeSocket(socketProxy);
throw new InterfaceException("02102", e);
}
context.getResource().setSocketProxy(socketProxy);
} catch (SocketTimeoutException e) {
closeSocket(socketProxy);
throw new InterfaceException("02101", e);
} catch (Exception e) {
closeSocket(socketProxy);
throw new InterfaceException("02102", e);
}
// logger.info( LOG_FLAG + "SocketClientConnection is finished."
// + LOG_FLAG);
}
}
/**
* 关闭socket连接
*
* @param proxy
*/
private void closeSocket(ISocketProxy proxy) {
if (proxy != null)
proxy.close();
}
/**
* 关闭socket连接
*
* @param proxy
*/
private void closeSocket(ISocketProxy proxy) {
if (proxy != null) {
proxy.close();
}
}
}
......@@ -14,87 +14,86 @@ import java.net.SocketException;
public class ChannelServer extends AbsServer {
private AsyncChannelServer server;
private ChannelListner listener;
public ChannelServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "ChannelServer starting ." + LOG_FLAG);
int port = Integer.valueOf(getRequiredPropertyValue("port"));
int destPort = Integer.valueOf(getRequiredPropertyValue("destPort"));
String destIP = getRequiredPropertyValue("destIP");
String requestInterfaceName = getPropertyValue("request_interfaceName");
String requestTransactionId = getPropertyValue("request_transactionName");
String responseInterfaceName = getPropertyValue("response_interfaceName");
String responseTransactionId = getPropertyValue("response_transactionName");
String length = getPropertyValue("length");
if (StringUtil.isEmpty(length)
|| ((StringUtil.isEmpty(requestInterfaceName) || StringUtil.isEmpty(requestTransactionId)) && (StringUtil
.isEmpty(responseInterfaceName) || StringUtil.isEmpty(responseTransactionId))))
server = new AsyncChannelServer(port, destPort, destIP);
else
server = new AsyncFixedChannelServer(port, destPort, destIP, Integer.valueOf(length), requestInterfaceName, requestTransactionId,
responseInterfaceName, responseTransactionId);
listener = new ChannelListner();
Thread t = new Thread(listener);
t.setName("Asynchronous Channel '" + serviceDef.getId() + "' Server Listener");
t.setDaemon(true);
t.start();
try {
server.startup();
} catch (IOException e) {
if ((e instanceof SocketException) && e.getMessage().equals("socket closed"))
;
else {
throw new InterfaceException("02301", e);
}
}
logger.info( LOG_FLAG + "ChannelServer is finished." + LOG_FLAG);
}
@Override
public void close() {
listener.interrupted();
server.close();
}
private class ChannelListner extends Detect {
private long interval = 60000;
private ChannelListner() {
}
private ChannelListner(long interval) {
this.interval = interval;
}
@Override
protected void exception() {
interrupted();
}
@Override
protected void handle() throws IOException {
int size = server.getPool().size();
logger.info( "Connections size in [" + serviceDef.getId() + "] channel : " + size);
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
logger.error( e.getMessage(),e);
}
}
@Override
protected void close() {
logger.info( "ChannelListner of [" + serviceDef.getId() + "] channel is closed.");
}
}
private AsyncChannelServer server;
private ChannelListner listener;
public ChannelServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "ChannelServer starting ." + LOG_FLAG);
int port = Integer.valueOf(getRequiredPropertyValue("port"));
int destPort = Integer.valueOf(getRequiredPropertyValue("destPort"));
String destIP = getRequiredPropertyValue("destIP");
String requestInterfaceName = getPropertyValue("request_interfaceName");
String requestTransactionId = getPropertyValue("request_transactionName");
String responseInterfaceName = getPropertyValue("response_interfaceName");
String responseTransactionId = getPropertyValue("response_transactionName");
String length = getPropertyValue("length");
if (StringUtil.isEmpty(length)
|| ((StringUtil.isEmpty(requestInterfaceName)
|| StringUtil.isEmpty(requestTransactionId))
&& (StringUtil.isEmpty(responseInterfaceName) || StringUtil.isEmpty(responseTransactionId)))) {
server = new AsyncChannelServer(port, destPort, destIP);
} else {
server = new AsyncFixedChannelServer(port, destPort, destIP, Integer.valueOf(length), requestInterfaceName, requestTransactionId,
responseInterfaceName, responseTransactionId);
}
listener = new ChannelListner();
Thread t = new Thread(listener);
t.setName("Asynchronous Channel '" + serviceDef.getId() + "' Server Listener");
t.setDaemon(true);
t.start();
try {
server.startup();
} catch (IOException e) {
if ((e instanceof SocketException) && e.getMessage().equals("socket closed"))
;
else {
throw new InterfaceException("02301", e);
}
}
logger.info(LOG_FLAG + "ChannelServer is finished." + LOG_FLAG);
}
@Override
public void close() {
listener.interrupted();
server.close();
}
private class ChannelListner extends Detect {
private long interval = 60000;
private ChannelListner() {
}
private ChannelListner(long interval) {
this.interval = interval;
}
@Override
protected void exception() {
interrupted();
}
@Override
protected void handle() throws IOException {
int size = server.getPool().size();
logger.info("Connections size in [" + serviceDef.getId() + "] channel : " + size);
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
@Override
protected void close() {
logger.info("ChannelListner of [" + serviceDef.getId() + "] channel is closed.");
}
}
}
......@@ -23,7 +23,7 @@ public class LinkServer extends AbsServer {
@Override
public void run() {
logger.info(LOG_FLAG + "LinkServer starting ." + LOG_FLAG);
String links = getRequiredPropertyValue(LINKS_SYMBOL);// 127.0.0.1:10001:10002
String links = getRequiredPropertyValue(LINKS_SYMBOL);
int headTagLen = Integer.valueOf(getRequiredPropertyValue(HEAD_TAG_LEN_SYMBOL));
String heartbeat = getPropertyValue(HEARTBEAT_SYMBOL);
String fin = getPropertyValue(FIN_SYMBOL);
......@@ -52,8 +52,9 @@ public class LinkServer extends AbsServer {
@Override
public void close() {
for (LinkDispatch dispatch : dispatchs)
for (LinkDispatch dispatch : dispatchs) {
dispatch.interrupted();
}
executorService.shutdownNow();
}
......
......@@ -24,14 +24,34 @@ public class ShortSocketServer extends AbsServer {
private int timeout;
private IServerMode mode;
private boolean has_head = true;// 是否有报文头
private String head_len_type = IServerInstance.HEAD_LEN_TYPE_10;// 报文头长度是二进制还是十进制
private int head_len = 0;// 报文头长度
private boolean is_contain_head_len = false;// 报文头长度是否包含报文头
private int fill_len = 0;// 默认为0
private boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true
// false
private int body_offset = 0;// 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度
/**
* 是否有报文头
*/
private boolean has_head = true;
/**
* 报文头长度是二进制还是十进制
*/
private String head_len_type = IServerInstance.HEAD_LEN_TYPE_10;
/**
* 报文头长度
*/
private int head_len = 0;
/**
* 报文头长度是否包含报文头
*/
private boolean is_contain_head_len = false;
/**
* 默认为0
*/
private int fill_len = 0;
/**
* 报文头长度是否包含报文头和报文体之间数据的长度 true
*/
private boolean is_contain_fill_len = false;
/**
* 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度
*/
private int body_offset = 0;
private String encoding = "UTF-8";
......
......@@ -11,64 +11,53 @@ import java.net.DatagramSocket;
/**
* udpserver
*
* @author gechengyang
*
* @author gechengyang
*/
public class UdpServer extends AbsServer {
private boolean isOpen = true;
public UdpServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "UdpServer starting ." + LOG_FLAG);
try {
// 服务发布的端口
int port = Integer.valueOf(getRequiredPropertyValue(PORT_SYMBOL));
int buffersize = Integer.valueOf(getRequiredPropertyValue(BUFFER_SIZE));
logger.info( "socketserver port=" + port);
String interfaceName = getRequiredPropertyValue("interfaceName");
String transName = getRequiredPropertyValue("transactionName");
DatagramSocket server = new DatagramSocket(port);
logger.info( "server start at port: " + port);
while (isOpen) {
DatagramPacket dataPacket = null;
byte buffer[] = new byte[buffersize];
dataPacket = new DatagramPacket(buffer, buffer.length);
server.receive(dataPacket);
String revs = new String(buffer, 0, dataPacket.getLength());
logger.info( "dataPackegt.length=" + dataPacket.getLength());
logger.info( "server recive:" + revs);
byte[] b = new byte[dataPacket.getLength()];
System.arraycopy(buffer, 0, b, 0, dataPacket.getLength());
// new Thread(new UdpHandle(interfaceName, transName, revs,
// dataPacket, server)).start();
new Thread(new UdpHandle(interfaceName, transName, b, dataPacket, server)).start();
// byte[] sndinfo = "1234567".getBytes();
// DatagramPacket dataPacket1 = new DatagramPacket(sndinfo,
// sndinfo.length, dataPacket.getAddress(),
// dataPacket.getPort());
// server.send(dataPacket1);
// server.close();
}
close();
} catch (Exception e) {
throw new InterfaceException("03001", e);
} finally {
}
logger.info( LOG_FLAG + "UdpServer is finished." + LOG_FLAG);
}
@Override
public void close() {
}
private boolean isOpen = true;
public UdpServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "UdpServer starting ." + LOG_FLAG);
try {
// 服务发布的端口
int port = Integer.valueOf(getRequiredPropertyValue(PORT_SYMBOL));
int buffersize = Integer.valueOf(getRequiredPropertyValue(BUFFER_SIZE));
logger.info("socketserver port=" + port);
String interfaceName = getRequiredPropertyValue("interfaceName");
String transName = getRequiredPropertyValue("transactionName");
DatagramSocket server = new DatagramSocket(port);
logger.info("server start at port: " + port);
while (isOpen) {
DatagramPacket dataPacket = null;
byte buffer[] = new byte[buffersize];
dataPacket = new DatagramPacket(buffer, buffer.length);
server.receive(dataPacket);
String revs = new String(buffer, 0, dataPacket.getLength());
logger.info("dataPackegt.length=" + dataPacket.getLength());
logger.info("server recive:" + revs);
byte[] b = new byte[dataPacket.getLength()];
System.arraycopy(buffer, 0, b, 0, dataPacket.getLength());
new Thread(new UdpHandle(interfaceName, transName, b, dataPacket, server)).start();
}
close();
} catch (Exception e) {
throw new InterfaceException("03001", e);
} finally {
}
logger.info(LOG_FLAG + "UdpServer is finished." + LOG_FLAG);
}
@Override
public void close() {
}
}
......@@ -10,81 +10,69 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
public abstract class BaseChartBuilder
implements IChartBuilder
{
public abstract class BaseChartBuilder implements IChartBuilder {
private ChartModel model;
private ChartModel model;
public BaseChartBuilder( ChartModel model )
{
this.model = model;
}
protected ChartModel getModel()
{
return model;
}
protected void setModel(ChartModel model)
{
this.model = model;
}
public BaseChartBuilder(ChartModel model) {
this.model = model;
}
@Override
public byte[] generate()
throws IOException
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try
{
desolveMessyCode();
JFreeChart chart = createChart();
ChartUtilities.writeChartAsPNG(baos, chart, model.getWidth(), model.getHeight());
baos.close();
protected ChartModel getModel() {
return model;
}
catch (IOException e)
{
throw new IOException("Image create exception.", e);
protected void setModel(ChartModel model) {
this.model = model;
}
return baos.toByteArray();
}
@Override
public byte[] generate() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
desolveMessyCode();
JFreeChart chart = createChart();
ChartUtilities.writeChartAsPNG(baos, chart, model.getWidth(), model.getHeight());
baos.close();
} catch (IOException e) {
throw new IOException("Image create exception.", e);
}
return baos.toByteArray();
}
@Override
public void fillData()
{
Map<?, ?> data = getModel().getDatas();
if (null != data)
for (Map.Entry<?, ?> entry : data.entrySet())
fillSingleData(entry);
}
@Override
public void fillData() {
Map<?, ?> data = getModel().getDatas();
if (null != data) {
for (Map.Entry<?, ?> entry : data.entrySet()) {
fillSingleData(entry);
}
}
}
protected boolean isNumeric(Object o)
{
return o instanceof Integer || o instanceof Double || o instanceof Float;
}
protected boolean isNumeric(Object o) {
return o instanceof Integer || o instanceof Double || o instanceof Float;
}
protected boolean isNumericArray(Object o)
{
return o instanceof Integer[] || o instanceof Double[] || o instanceof Float[];
}
protected boolean isNumericArray(Object o) {
return o instanceof Integer[] || o instanceof Double[] || o instanceof Float[];
}
protected abstract void fillSingleData(Map.Entry<?, ?> entry);
protected abstract void fillSingleData(Map.Entry<?, ?> entry);
protected abstract JFreeChart createChart();
protected abstract JFreeChart createChart();
private void desolveMessyCode()
{
StandardChartTheme standardChartTheme = new StandardChartTheme("CN");
String font="宋体";
// 设置标题字体
standardChartTheme.setExtraLargeFont(new Font(font, Font.PLAIN, 20));
// 设置图例的字体
standardChartTheme.setRegularFont(new Font(font, Font.PLAIN, 12));
// 设置轴向的字体
standardChartTheme.setLargeFont(new Font(font, Font.PLAIN, 12));
// 应用主题样式
ChartFactory.setChartTheme(standardChartTheme);
}
private void desolveMessyCode() {
StandardChartTheme standardChartTheme = new StandardChartTheme("CN");
String font = "宋体";
// 设置标题字体
standardChartTheme.setExtraLargeFont(new Font(font, Font.PLAIN, 20));
// 设置图例的字体
standardChartTheme.setRegularFont(new Font(font, Font.PLAIN, 12));
// 设置轴向的字体
standardChartTheme.setLargeFont(new Font(font, Font.PLAIN, 12));
// 应用主题样式
ChartFactory.setChartTheme(standardChartTheme);
}
}
......@@ -4,30 +4,23 @@ import org.apache.commons.lang.reflect.ConstructorUtils;
/**
* @author hujun
*
* <p>
* 图表生成器工厂
*/
public class ChartFactory
{
public class ChartFactory {
public static final String CHART_IMPL_PACKAGE = "com.brilliance.eibs.core.service.chart.impl";
public static final String CHART_IMPL_CLASS_SUFFIX = "ChartBuilder";
public static final String CHART_IMPL_PACKAGE = "com.brilliance.eibs.core.service.chart.impl";
public static final String CHART_IMPL_CLASS_SUFFIX = "ChartBuilder";
public static IChartBuilder produce(String type, ChartModel model)
{
try
{
Class<?> clazz = Class.forName(CHART_IMPL_PACKAGE + "." + firstLetterUpper(type) + CHART_IMPL_CLASS_SUFFIX);
return (IChartBuilder) ConstructorUtils.invokeConstructor(clazz, new Object[]{model});
}
catch (Exception e)
{
throw new IllegalArgumentException("Get a " + type.toLowerCase() + " chart generator failure.", e);
public static IChartBuilder produce(String type, ChartModel model) {
try {
Class<?> clazz = Class.forName(CHART_IMPL_PACKAGE + "." + firstLetterUpper(type) + CHART_IMPL_CLASS_SUFFIX);
return (IChartBuilder) ConstructorUtils.invokeConstructor(clazz, new Object[]{model});
} catch (Exception e) {
throw new IllegalArgumentException("Get a " + type.toLowerCase() + " chart generator failure.", e);
}
}
}
private static String firstLetterUpper(String s)
{
return s.replaceFirst(s.substring(0, 1), s.substring(0, 1).toUpperCase());
}
private static String firstLetterUpper(String s) {
return s.replaceFirst(s.substring(0, 1), s.substring(0, 1).toUpperCase());
}
}
......@@ -4,70 +4,58 @@ import java.util.Map;
/**
* @author hujun
*
* <p>
* 图表数据模型
*/
public class ChartModel
{
private String title;
private String[] label;
private int height = 280;
private int width = 500;
private Map<?, ?> datas;
public ChartModel( String title )
{
this.title = title;
}
public int getHeight()
{
return height;
}
public void setHeight(int height)
{
this.height = height;
}
public int getWidth()
{
return width;
}
public void setWidth(int width)
{
this.width = width;
}
public Map<?, ?> getDatas()
{
return datas;
}
public String getTitle()
{
return title;
}
public void setTitle(String title)
{
this.title = title;
}
public String[] getLabel()
{
return label;
}
public void setLabel(String[] label)
{
this.label = label;
}
public void setDatas(Map<?, ?> datas)
{
this.datas = datas;
}
public class ChartModel {
private String title;
private String[] label;
private int height = 280;
private int width = 500;
private Map<?, ?> datas;
public ChartModel(String title) {
this.title = title;
}
public int getHeight() {
return height;
}
public void setHeight(int height) {
this.height = height;
}
public int getWidth() {
return width;
}
public void setWidth(int width) {
this.width = width;
}
public Map<?, ?> getDatas() {
return datas;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String[] getLabel() {
return label;
}
public void setLabel(String[] label) {
this.label = label;
}
public void setDatas(Map<?, ?> datas) {
this.datas = datas;
}
}
......@@ -4,13 +4,11 @@ import java.io.IOException;
/**
* @author hujun
*
* <p>
* 图表生成器接口
*/
public interface IChartBuilder
{
byte[] generate()
throws IOException;
public interface IChartBuilder {
byte[] generate() throws IOException;
void fillData();
void fillData();
}
......@@ -4,7 +4,6 @@ import com.brilliance.eibs.core.service.chart.BaseChartBuilder;
import com.brilliance.eibs.core.service.chart.ChartModel;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.CategoryAxis;
import org.jfree.chart.plot.CategoryPlot;
import org.jfree.chart.renderer.category.BarRenderer;
import org.jfree.chart.renderer.category.CategoryItemRenderer;
......@@ -12,56 +11,47 @@ import org.jfree.data.category.DefaultCategoryDataset;
import java.util.Map.Entry;
public class BarChartBuilder
extends BaseChartBuilder
{
public class BarChartBuilder extends BaseChartBuilder {
private DefaultCategoryDataset dataset = new DefaultCategoryDataset();
private DefaultCategoryDataset dataset = new DefaultCategoryDataset();
public BarChartBuilder( ChartModel model )
{
super(model);
fillData();
}
@Override
protected JFreeChart createChart()
{
String[] labels = getModel().getLabel();
JFreeChart localJFreeChart = ChartFactory.createBarChart3D(getModel().getTitle(), labels != null && labels.length >= 1 ? labels[0]
: null, labels != null && labels.length == 2 ? labels[1] : null, dataset);
CategoryPlot localCategoryPlot = (CategoryPlot) localJFreeChart.getPlot();
CategoryAxis localCategoryAxis = localCategoryPlot.getDomainAxis();
// localCategoryAxis.setTickLabelFont(new Font("宋体",Font.BOLD, 20));
//localCategoryAxis.setCategoryLabelPositions(CategoryLabelPositions.createUpRotationLabelPositions(0.3926990816987241D));
CategoryItemRenderer localCategoryItemRenderer = localCategoryPlot.getRenderer();
localCategoryItemRenderer.setBaseItemLabelsVisible(true);
BarRenderer localBarRenderer = (BarRenderer) localCategoryItemRenderer;
localBarRenderer.setItemMargin(0.2D);
return localJFreeChart;
}
@Override
protected void fillSingleData(Entry<?, ?> entry)
{
Object value = entry.getValue();
Object key = entry.getKey();
if (!isNumeric(value) || !(key instanceof String[] || key instanceof String))
throw new IllegalArgumentException("Data type of bar chart is wrong.");
if (key instanceof String)
{
String category = key == null ? "" : (String) key;
dataset.addValue(Double.valueOf(value.toString()), "", category);
public BarChartBuilder(ChartModel model) {
super(model);
fillData();
}
if (key instanceof String[])
{
String[] category = (String[]) key;
if (category.length != 2)
throw new IllegalArgumentException("Data number of bar chart is wrong.");
dataset.addValue(Double.valueOf(value.toString()), category[1] == null ? "" : category[1], category[0] == null ? "" : category[0]);
@Override
protected JFreeChart createChart() {
String[] labels = getModel().getLabel();
JFreeChart localJFreeChart = ChartFactory.createBarChart3D(getModel().getTitle(), labels != null && labels.length >= 1 ? labels[0]
: null, labels != null && labels.length == 2 ? labels[1] : null, dataset);
CategoryPlot localCategoryPlot = (CategoryPlot) localJFreeChart.getPlot();
CategoryItemRenderer localCategoryItemRenderer = localCategoryPlot.getRenderer();
localCategoryItemRenderer.setBaseItemLabelsVisible(true);
BarRenderer localBarRenderer = (BarRenderer) localCategoryItemRenderer;
localBarRenderer.setItemMargin(0.2D);
return localJFreeChart;
}
}
@Override
protected void fillSingleData(Entry<?, ?> entry) {
Object value = entry.getValue();
Object key = entry.getKey();
if (!isNumeric(value) || !(key instanceof String[] || key instanceof String)) {
throw new IllegalArgumentException("Data type of bar chart is wrong.");
}
if (key instanceof String) {
String category = key == null ? "" : (String) key;
dataset.addValue(Double.valueOf(value.toString()), "", category);
}
if (key instanceof String[]) {
String[] category = (String[]) key;
if (category.length != 2) {
throw new IllegalArgumentException("Data number of bar chart is wrong.");
}
dataset.addValue(Double.valueOf(value.toString()), category[1] == null ? "" : category[1], category[0] == null ? "" : category[0]);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment