Commit e8a25b29 by s_guodong

代码整理

parent 1689dc97
...@@ -13,113 +13,111 @@ import java.net.Socket; ...@@ -13,113 +13,111 @@ import java.net.Socket;
public class ShortSocketHandle implements Runnable { public class ShortSocketHandle implements Runnable {
private Object[] args; private Object[] args;
private Socket socket; private Socket socket;
private String interfaceName; private String interfaceName;
private String transName; private String transName;
private boolean has_head = true;// 是否有报文头 private boolean has_head = true;// 是否有报文头
private String head_len_type = IServerInstance.HEAD_LEN_TYPE_10;// 报文头长度是二进制还是十进制 private String head_len_type = IServerInstance.HEAD_LEN_TYPE_10;// 报文头长度是二进制还是十进制
private int head_len = 0;// 报文头长度 private int head_len = 0;// 报文头长度
private boolean is_contain_head_len = false;// 报文头长度是否包含报文头 private boolean is_contain_head_len = false;// 报文头长度是否包含报文头
private int fill_len = 0;// 默认为0 private int fill_len = 0;// 默认为0
private boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true private boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true
private int body_offset = 0;// 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度 private int body_offset = 0;// 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度
private String encoding = "UTF-8"; private String encoding = "UTF-8";
private Logger logger; private Logger logger;
public ShortSocketHandle(Socket socket, String interfaceName, String transName, boolean has_head, String head_len_type, int head_len, public ShortSocketHandle(Socket socket, String interfaceName, String transName, boolean has_head, String head_len_type, int head_len,
boolean is_contain_head_len, int fill_len, boolean is_contain_fill_len, int body_offset, Object[] args, String encoding, Logger logger) { boolean is_contain_head_len, int fill_len, boolean is_contain_fill_len, int body_offset, Object[] args, String encoding, Logger logger) {
this.socket = socket; this.socket = socket;
this.interfaceName = interfaceName; this.interfaceName = interfaceName;
this.transName = transName; this.transName = transName;
this.has_head = has_head; this.has_head = has_head;
this.head_len_type = head_len_type; this.head_len_type = head_len_type;
this.head_len = head_len; this.head_len = head_len;
this.is_contain_head_len = is_contain_head_len; this.is_contain_head_len = is_contain_head_len;
this.fill_len = fill_len; this.fill_len = fill_len;
this.is_contain_fill_len = is_contain_fill_len; this.is_contain_fill_len = is_contain_fill_len;
this.body_offset = body_offset; this.body_offset = body_offset;
this.args = args; this.args = args;
this.encoding = encoding; this.encoding = encoding;
this.logger = logger; this.logger = logger;
} }
@Override @Override
public void run() { public void run() {
if (has_head) { if (has_head) {
handleWithHead(); handleWithHead();
} else { } else {
handleWithNoHead(); handleWithNoHead();
} }
} }
public void handleWithHead() { public void handleWithHead() {
byte[] headLenBytes = new byte[head_len];
byte[] headLenBytes = new byte[head_len]; int headLen = 0;
int headLen = 0;
int returnLen = 0;
int returnLen = 0; try {
try { IOUtils.readFully(socket.getInputStream(), headLenBytes);
IOUtils.readFully(socket.getInputStream(), headLenBytes); if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) {
if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) { headLen = Integer.parseInt(new String(headLenBytes));
headLen = Integer.parseInt(new String(headLenBytes)); } else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) {
} else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) { headLen = CommonFunctionUtils.bytesToInt(headLenBytes);
headLen = CommonFunctionUtils.bytesToInt(headLenBytes); }
} if (is_contain_head_len) {
if (is_contain_head_len) { headLen = headLen - head_len;
headLen = headLen - head_len; }
} if (fill_len != 0) {
if (fill_len != 0) { if (!is_contain_fill_len) {
if (!is_contain_fill_len) { headLen = headLen + fill_len;
headLen = headLen + fill_len; }
} }
}
headLen = headLen + body_offset;
headLen = headLen + body_offset; byte[] databuffer = new byte[headLen];
byte[] databuffer = new byte[headLen]; IOUtils.readFully(socket.getInputStream(), databuffer);
IOUtils.readFully(socket.getInputStream(), databuffer); Context context = new Context();
// logger.debug("socket接收"); context.addVariable("transaction", "__val", databuffer);
Context context = new Context(); ResultMsg resultMsg = new Client().call(context, interfaceName, transName, args);
context.addVariable("transaction", "__val", databuffer); byte[] returnBytes = null;
ResultMsg resultMsg = new Client().call(context, interfaceName, transName, args); Object obj = resultMsg.getContent();
byte[] returnBytes = null; if (obj instanceof String) {
Object obj = resultMsg.getContent(); returnBytes = ((String) obj).getBytes(encoding);
if (obj instanceof String) {
returnBytes = ((String) obj).getBytes(encoding); } else {
returnBytes = (byte[]) obj;
} else { }
returnBytes = (byte[]) obj;
} returnLen = returnBytes.length;
returnLen = returnBytes.length; byte[] header = null;
byte[] header = null; if (is_contain_head_len) {
returnLen = returnLen + head_len;
if (is_contain_head_len) { }
returnLen = returnLen + head_len; if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) {
} header = String.format("%0" + head_len + "d", returnLen).getBytes();
if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_10)) { } else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) {
header = String.format("%0" + head_len + "d", returnLen).getBytes(); header = CommonFunctionUtils.intToBytes(returnLen);
} else if (head_len_type.equals(IServerInstance.HEAD_LEN_TYPE_2)) { }
header = CommonFunctionUtils.intToBytes(returnLen);
} IOUtils.write(header, socket.getOutputStream());
IOUtils.write(returnBytes, socket.getOutputStream());
IOUtils.write(header, socket.getOutputStream()); } catch (IOException e) {
IOUtils.write(returnBytes, socket.getOutputStream()); // TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) { logger.error(e.getMessage(), e);
// TODO Auto-generated catch block IOUtils.closeQuietly(socket);
e.printStackTrace(); } finally {
IOUtils.closeQuietly(socket); IOUtils.closeQuietly(socket);
} finally { }
IOUtils.closeQuietly(socket);
} }
} public void handleWithNoHead() {
public void handleWithNoHead() { }
}
} }
...@@ -21,203 +21,157 @@ import java.util.concurrent.ExecutorService; ...@@ -21,203 +21,157 @@ import java.util.concurrent.ExecutorService;
/** /**
* @author hujun * @author hujun
* * <p>
* socket原生模式 ------------modified by weicong on 2016/4/01 ----------- * socket原生模式 ------------modified by weicong on 2016/4/01 -----------
* 兼容sslsocketserver * 兼容sslsocketserver
*/ */
public class SocketServerMode implements IServerMode { public class SocketServerMode implements IServerMode {
private List<String> trustlist = new ArrayList<String>(); // 信任的IP列表 private List<String> trustlist = new ArrayList<String>(); // 信任的IP列表
private boolean isOpen = true; // socket服务是否打开 private boolean isOpen = true; // socket服务是否打开
private ServerSocket serverSocket; // socket服务对象 private ServerSocket serverSocket; // socket服务对象
private Socket socket; private Socket socket;
private String trustips; // 信任的IP列表 private String trustips; // 信任的IP列表
private int port; private int port;
private int queueLen; private int queueLen;
private int size; private int size;
private String delayTime; private String delayTime;
private String interfaceName; private String interfaceName;
private String transName; private String transName;
private int timeout; private int timeout;
private String expression; private String expression;
private Object[] args; private Object[] args;
private String serverId; private String serverId;
private Logger logger; private Logger logger;
private String mode = ""; private String mode = "";
private boolean has_head = true;// 是否有报文头 private boolean has_head = true;// 是否有报文头
private String head_len_type = "10";// 报文头长度是二进制还是十进制 private String head_len_type = "10";// 报文头长度是二进制还是十进制
private int head_len = 0;// 报文头长度 private int head_len = 0;// 报文头长度
private boolean is_contain_head_len = false;// 报文头长度是否包含报文头 private boolean is_contain_head_len = false;// 报文头长度是否包含报文头
private int fill_len = 0;// 默认为0 private int fill_len = 0;// 默认为0
private boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true private boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true
// false // false
private int body_offset = 0;// 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度 private int body_offset = 0;// 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度
private String encoding = "UTF-8"; private String encoding = "UTF-8";
public SocketServerMode(String mode, String trustips, int port, int queueLen, int size, String delayTime, String interfaceName, String transName, public SocketServerMode(String mode, String trustips, int port, int queueLen, int size, String delayTime, String interfaceName, String transName,
int timeout, String expression, Object[] args, String serverId, Logger logger, boolean has_head, String head_len_type, int head_len, int timeout, String expression, Object[] args, String serverId, Logger logger, boolean has_head, String head_len_type, int head_len,
boolean is_contain_head_len, int fill_len, boolean is_contain_fill_len, int body_offset, String encoding) { boolean is_contain_head_len, int fill_len, boolean is_contain_fill_len, int body_offset, String encoding) {
this.mode = mode; this.mode = mode;
this.trustips = trustips; this.trustips = trustips;
this.port = port; this.port = port;
this.queueLen = queueLen; this.queueLen = queueLen;
this.size = size; this.size = size;
this.delayTime = delayTime; this.delayTime = delayTime;
this.interfaceName = interfaceName; this.interfaceName = interfaceName;
this.transName = transName; this.transName = transName;
this.timeout = timeout; this.timeout = timeout;
this.expression = expression; this.expression = expression;
this.args = args; this.args = args;
this.serverId = serverId; this.serverId = serverId;
this.logger = logger; this.logger = logger;
this.has_head = has_head; this.has_head = has_head;
this.head_len_type = head_len_type; this.head_len_type = head_len_type;
this.head_len = head_len; this.head_len = head_len;
this.is_contain_head_len = is_contain_head_len; this.is_contain_head_len = is_contain_head_len;
this.fill_len = fill_len; this.fill_len = fill_len;
this.is_contain_fill_len = is_contain_fill_len; this.is_contain_fill_len = is_contain_fill_len;
this.body_offset = body_offset; this.body_offset = body_offset;
this.encoding = encoding; this.encoding = encoding;
} }
public SocketServerMode(String mode, String trustips, int port, int queueLen, int size, String delayTime, String interfaceName, String transName, public SocketServerMode(String mode, String trustips, int port, int queueLen, int size, String delayTime, String interfaceName, String transName,
int timeout, String expression, Object[] args, String serverId, Logger logger) { int timeout, String expression, Object[] args, String serverId, Logger logger) {
this.mode = mode; this.mode = mode;
this.trustips = trustips; this.trustips = trustips;
this.port = port; this.port = port;
this.queueLen = queueLen; this.queueLen = queueLen;
this.size = size; this.size = size;
this.delayTime = delayTime; this.delayTime = delayTime;
this.interfaceName = interfaceName; this.interfaceName = interfaceName;
this.transName = transName; this.transName = transName;
this.timeout = timeout; this.timeout = timeout;
this.expression = expression; this.expression = expression;
this.args = args; this.args = args;
this.serverId = serverId; this.serverId = serverId;
this.logger = logger; this.logger = logger;
} }
/*
* public static class Builder { private Logger logger;
*
* private String trustips; // 信任的IP列表 private int port; private int
* queueLen; private int size; private String delayTime; private String
* interfaceName; private String transName; private int timeout; private
* String expression; private Object[] args; private String serverId;
*
* public Builder(int port, int queueLen, int size) { this.port = port;
* this.queueLen = queueLen; this.size = size; }
*
* public Builder setTrustips(String trustips) { this.trustips = trustips;
* return this; }
*
* public Builder setDelayTime(String delayTime) { this.delayTime =
* delayTime; return this; }
*
* public Builder setInterfaceName(String interfaceName) {
* this.interfaceName = interfaceName; return this; }
*
* public Builder setExpression(String expression) { this.expression =
* expression; return this; }
*
* public Builder setTimeout(int timeout) { this.timeout = timeout; return
* this; }
*
* public Builder setTransName(String transName) { this.transName =
* transName; return this; }
*
* public Builder setArgs(Object[] args) { this.args = args; return this; }
*
* public Builder setServerId(String serverId) { this.serverId = serverId;
* return this; }
*
* public Builder setLogger(Logger logger) { this.logger = logger; return
* this; }
*
* public SocketServerMode build() { return new SocketServerMode(this); }
*
* }
*/
@Override @Override
public void startup() { public void startup() {
try { try {
serverSocket = new ServerSocket(port, queueLen); serverSocket = new ServerSocket(port, queueLen);
logger.info(IServerInstance.LOG_FLAG + "SocketServer started ." + IServerInstance.LOG_FLAG); logger.info(IServerInstance.LOG_FLAG + "SocketServer started ." + IServerInstance.LOG_FLAG);
// 获取上下文中的线程池大小 // 获取上下文中的线程池大小
String name = "socketServer_" + interfaceName + "_" + transName; String name = "socketServer_" + interfaceName + "_" + transName;
ExecutorService executorService = ThreadPoolFactory.getFixedExecutor(size, name); ExecutorService executorService = ThreadPoolFactory.getFixedExecutor(size, name);
logger.info("server start at port: " + port); logger.info("server start at port: " + port);
while (isOpen) { while (isOpen) {
socket = serverSocket.accept(); socket = serverSocket.accept();
socket.setSoTimeout(timeout); socket.setSoTimeout(timeout);
String ip = socket.getInetAddress().getHostAddress(); String ip = socket.getInetAddress().getHostAddress();
logger.debug("Client socket from " + socket.getInetAddress() + " : " + socket.getPort() + ", timeout=" + timeout); logger.debug("Client socket from " + socket.getInetAddress() + " : " + socket.getPort() + ", timeout=" + timeout);
if (!StringUtil.isEmpty(delayTime)) { if (!StringUtil.isEmpty(delayTime)) {
Thread.sleep(Long.parseLong(delayTime)); Thread.sleep(Long.parseLong(delayTime));
} }
getTrustIp(trustips); getTrustIp(trustips);
if ("*".equals(trustips) || trustlist.contains(ip)) { if ("*".equals(trustips) || trustlist.contains(ip)) {
if (mode.equals(SocketProxyFactry.ORG_SOCKET_TYPE)) { if (mode.equals(SocketProxyFactry.ORG_SOCKET_TYPE)) {
if (DateUtil.isDateValid(expression)) { if (DateUtil.isDateValid(expression)) {
executorService.submit(new SocketClient(interfaceName, transName, socket, args)); executorService.submit(new SocketClient(interfaceName, transName, socket, args));
// counter.incConnectionCount(); } else {
// logger.info("Connection count: " + logger.debug("time is invalid.");
// counter.getConnCount()); IOUtils.closeQuietly(socket);
// logger.debug("result:" + }
// future.get().getContent()); } else {
} else { executorService.submit(new ShortSocketHandle(socket, interfaceName, transName, has_head, head_len_type, head_len,
logger.debug("time is invalid."); is_contain_head_len, fill_len, is_contain_fill_len, body_offset, args, encoding, logger));
IOUtils.closeQuietly(socket); }
} } else {
} else { logger.debug("[" + ip + "] is not trusted ip. ");
executorService.submit(new ShortSocketHandle(socket, interfaceName, transName, has_head, head_len_type, head_len, IOUtils.closeQuietly(socket);
is_contain_head_len, fill_len, is_contain_fill_len, body_offset, args, encoding, logger)); }
} }
} else { close();
logger.debug("[" + ip + "] is not trusted ip. "); } catch (Exception e) {
IOUtils.closeQuietly(socket); if ((e instanceof SocketException) && e.getMessage().equals("socket closed"))
} ;
} else {
close(); throw new InterfaceException("02301", e);
} catch (Exception e) { }
if ((e instanceof SocketException) && e.getMessage().equals("socket closed")) } finally {
; if (socket != null && !socket.isClosed())
else { IOUtils.closeQuietly(socket);
throw new InterfaceException("02301", e); }
}
} finally {
if (socket != null && !socket.isClosed())
IOUtils.closeQuietly(socket);
}
} }
@Override @Override
public void close() { public void close() {
this.isOpen = false; this.isOpen = false;
try { try {
if (serverSocket != null && !serverSocket.isClosed()) { if (serverSocket != null && !serverSocket.isClosed()) {
IOUtils.closeQuietly(serverSocket); IOUtils.closeQuietly(serverSocket);
logger.info("SocketServer[id:" + serverId + "] is closed."); logger.info("SocketServer[id:" + serverId + "] is closed.");
} }
} catch (Exception e) { } catch (Exception e) {
throw new InterfaceException("02303", e); throw new InterfaceException("02303", e);
} }
} }
/** /**
* 获取允许 * 获取允许
* *
* @param trustips * @param trustips
*/ */
private void getTrustIp(String trustips) { private void getTrustIp(String trustips) {
String[] trusts = trustips.split(","); String[] trusts = trustips.split(",");
for (String trust : trusts) for (String trust : trusts)
this.trustlist.add(trust); this.trustlist.add(trust);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment