Commit 1689dc97 by s_guodong

服务迁移

parent 6088b120
......@@ -8,7 +8,6 @@
<artifactId>be-esb-core</artifactId>
<version>2.0</version>
<properties>
<file.encoding>UTF-8</file.encoding>
<maven.compiler.source>8</maven.compiler.source>
......@@ -58,6 +57,7 @@
<active.broker.version>5.9.0</active.broker.version>
<hessian.version>4.0.66</hessian.version>
<snmp4j.version>2.5.0</snmp4j.version>
<bcprov-jdk15on.version>1.70</bcprov-jdk15on.version>
<!-- 日志 -->
<slf4j-api_version>1.7.25</slf4j-api_version>
<jul-to-slf4j_version>1.7.30</jul-to-slf4j_version>
......@@ -139,24 +139,9 @@
<version>${jms.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>${ibm.mq.allclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>${ons-client.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.jms.version}</version>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>${bcprov-jdk15on.version}</version>
</dependency>
<!--es-->
<dependency>
......@@ -215,17 +200,6 @@
<artifactId>cxf-bundle</artifactId>
<version>${cxf.version}</version>
</dependency>
<!--httpServer中-->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<!--htm2xml-->
<dependency>
<groupId>net.sourceforge.nekohtml</groupId>
......@@ -255,16 +229,6 @@
<artifactId>hessian</artifactId>
<version>${hessian.version}</version>
</dependency>
<dependency>
<groupId>org.snmp4j</groupId>
<artifactId>snmp4j</artifactId>
<version>${snmp4j.version}</version>
</dependency>
<dependency>
<groupId>org.snmp4j</groupId>
<artifactId>snmp4j-agent</artifactId>
<version>${snmp4j.version}</version>
</dependency>
<!--模板引擎-->
<dependency>
<groupId>org.apache.velocity</groupId>
......@@ -359,13 +323,6 @@
<scope>system</scope>
<systemPath>${pom.basedir}/lib/keyczar-0.71g-090613.jar</systemPath>
</dependency>
<dependency>
<groupId>com.tibco</groupId>
<artifactId>tibjms</artifactId>
<version>6.3.0</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/tibjms.jar</systemPath>
</dependency>
</dependencies>
......
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.server.channel.server.impl.AsyncChannelServer;
import com.brilliance.eibs.server.channel.server.impl.AsyncFixedChannelServer;
import com.brilliance.eibs.server.connection.Detect;
import com.brilliance.eibs.util.StringUtil;
import java.io.IOException;
import java.net.SocketException;
public class ChannelServer extends AbsServer {
private AsyncChannelServer server;
private ChannelListner listener;
public ChannelServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "ChannelServer starting ." + LOG_FLAG);
int port = Integer.valueOf(getRequiredPropertyValue("port"));
int destPort = Integer.valueOf(getRequiredPropertyValue("destPort"));
String destIP = getRequiredPropertyValue("destIP");
String requestInterfaceName = getPropertyValue("request_interfaceName");
String requestTransactionId = getPropertyValue("request_transactionName");
String responseInterfaceName = getPropertyValue("response_interfaceName");
String responseTransactionId = getPropertyValue("response_transactionName");
String length = getPropertyValue("length");
if (StringUtil.isEmpty(length)
|| ((StringUtil.isEmpty(requestInterfaceName) || StringUtil.isEmpty(requestTransactionId)) && (StringUtil
.isEmpty(responseInterfaceName) || StringUtil.isEmpty(responseTransactionId))))
server = new AsyncChannelServer(port, destPort, destIP);
else
server = new AsyncFixedChannelServer(port, destPort, destIP, Integer.valueOf(length), requestInterfaceName, requestTransactionId,
responseInterfaceName, responseTransactionId);
listener = new ChannelListner();
Thread t = new Thread(listener);
t.setName("Asynchronous Channel '" + serviceDef.getId() + "' Server Listener");
t.setDaemon(true);
t.start();
try {
server.startup();
} catch (IOException e) {
if ((e instanceof SocketException) && e.getMessage().equals("socket closed"))
;
else {
throw new InterfaceException("02301", e);
}
}
logger.info( LOG_FLAG + "ChannelServer is finished." + LOG_FLAG);
}
@Override
public void close() {
listener.interrupted();
server.close();
}
private class ChannelListner extends Detect {
private long interval = 60000;
private ChannelListner() {
}
private ChannelListner(long interval) {
this.interval = interval;
}
@Override
protected void exception() {
interrupted();
}
@Override
protected void handle() throws IOException {
int size = server.getPool().size();
logger.info( "Connections size in [" + serviceDef.getId() + "] channel : " + size);
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
logger.error( e.getMessage(),e);
}
}
@Override
protected void close() {
logger.info( "ChannelListner of [" + serviceDef.getId() + "] channel is closed.");
}
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.bean.DynamicCompiler;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.util.ClassPathUpdater;
import com.brilliance.eibs.util.StringUtil;
import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import javax.servlet.Servlet;
import javax.tools.DiagnosticCollector;
import javax.tools.JavaFileObject;
import java.io.File;
import java.io.FileOutputStream;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLDecoder;
/**
* http服务
*
* @author xiaoyuanzhen
*
*/
public class HttpServer extends AbsServer {
private Server server;
public HttpServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "starting HttpService[id:" + serviceDef.getId() + "]." + LOG_FLAG);
try {
// String ip=getRequiredPropertyValue(IP_SYMBOL);
String port = getRequiredPropertyValue(PORT_SYMBOL);
String uri = getRequiredPropertyValue(URI_SYMBOL);
String classesname = getRequiredPropertyValue(CLASS_SYMBOL);
String[] clsarray = classesname.split(",");
// 初始化参数
String initparam = getPropertyValue(INITPARA);
String interfaceNam = getPropertyValue("interfaceName");
if (serviceDef.getId().startsWith("_")) {
Server server = new Server(Integer.parseInt(port));
ServletContextHandler context1 = new ServletContextHandler(ServletContextHandler.SESSIONS);
context1.setContextPath("/");
context1.setResourceBase(".");
context1.setClassLoader(Thread.currentThread().getContextClassLoader());
Constructor c1 = Class.forName(clsarray[0]).getDeclaredConstructor(String.class, String.class, Object[].class);
String transactionsNames = getPropertyValue("transactionName");
Servlet b = (Servlet) c1.newInstance(interfaceNam, transactionsNames, args);
context1.addServlet(new ServletHolder(b), uri);
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[] { context1 });
server.setHandler(contexts);
server.start();
server.join();
return;
}
if (serviceDef.getId().startsWith("$")) {
Server server = new Server(Integer.parseInt(port));
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
context.setResourceBase(".");
context.setClassLoader(Thread.currentThread().getContextClassLoader());
String interfaceNames[] = getPropertyValue("interfaceName").split(",");
String transactionsNames[] = getPropertyValue("transactionName").split(",");
String[] uris = getRequiredPropertyValue(URI_SYMBOL).split(",");
server.setHandler(context);
for (int i = 0; i < clsarray.length; i++) {
Constructor c1 = Class.forName(clsarray[i]).getDeclaredConstructor(String.class, String.class, Object[].class);
Servlet b = (Servlet) c1.newInstance(interfaceNames[i], transactionsNames[i], args);
context.addServlet(new ServletHolder(b), uris[i]);
}
server.start();
server.join();
return;
}
// 动态生成
if (!StringUtil.isEmpty(interfaceNam)) {
String interfaceNames[] = getPropertyValue("interfaceName").split(",");
String transactionsNames[] = getPropertyValue("transactionName").split(",");
String[] uris = getRequiredPropertyValue(URI_SYMBOL).split(",");
URL url = Thread.currentThread().getContextClassLoader().getResource("classes/BaseServlet.base");
String basestr = IOUtils.toString(url);
for (int i = 0; i < interfaceNames.length; i++) {
String clsnam = clsarray[i];
String c = clsnam.substring(clsnam.lastIndexOf(".") + 1);
String s = basestr.replaceAll("BaseServlet", c).replace("base1", interfaceNames[i]).replace("base2", transactionsNames[i])
.replace("//", "");
String path = Thread.currentThread().getContextClassLoader().getResource("").toURI().getPath() + "classes/classes/" + c + ".java";
String spath = Thread.currentThread().getContextClassLoader().getResource("").toURI().getPath() + "classes/classes/";
File f = new File(spath);
if (!f.exists())
f.mkdirs();
FileOutputStream fos = new FileOutputStream(path);
fos.write(s.getBytes());
fos.close();
}
generateBeans();
server = new Server(Integer.parseInt(port));
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
server.setHandler(context);
for (int i = 0; i < clsarray.length; i++) {
String clsnam = clsarray[i];
// Class cls = Class.forName(clsnam);
Class cls = ClassPathUpdater.dynamicLoadClass(clsnam);
// Object object = cls.newInstance();
context.addServlet(new ServletHolder(cls), uris[i]);
// context.addServlet(new ServletHolder, pathSpec)
}
server.start();
server.join();
}
// 静态调用API启动Servlet服务.
else {
server = new Server(Integer.parseInt(port));
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
server.setHandler(context);
String initarray[] = null;
if (!StringUtil.isEmpty(initparam))
initarray = initparam.split("#");
for (int i = 0; i < clsarray.length; i++) {
String clsnam = clsarray[i];
String param = initarray[i];
Class cls = Class.forName(clsnam);
ServletHolder holder = new ServletHolder(cls);
if (!StringUtil.isEmpty(param)) {
String m[] = param.split(",");
for (int j = 0; j < m.length; j++) {
String s[] = m[j].split("\\|");
holder.setInitParameter(s[0], s[1]);
}
// logger.info(this.context, context,
// "init Parameters of[" + this.getClass().getName() +
// "] is:" + holder.getInitParameters());
}
context.addServlet(holder, uri);
}
server.start();
server.join();
}
} catch (Exception e) {
logger.error( "jetty服务初始化异常:"+e.getMessage(),e);
throw new InterfaceException("01410", e);
}
logger.info( LOG_FLAG + "HttpService[id:" + serviceDef.getId() + "] is started." + LOG_FLAG);
}
@Override
public void close() {
if (server != null)
try {
server.stop();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// server.destroy();
}
/**
* 判断是否生成过class,如果没生成自动生成
*/
private void generateBeans() {
// String beanProps[] =
// getRequiredPropertyValue(CLASS_SYMBOL).split(",");
// if (!StringUtil.isEmpty(beanProp) &&
// !generatedMap.containsKey(beanProp))
// if (!StringUtil.isEmpty(beanProp) )
// {
// / generatedMap.put(beanProp, true);
logger.info( "Creating HttpServervlet classes.");
DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
logger.info( "Creating HttpServervlet classes1.");
DynamicCompiler dynamicCompilerUtil = new DynamicCompiler();
boolean compilerResult;
try {
/*
* String s =
* URLDecoder.decode(Thread.currentThread().getContextClassLoader
* ().getResource("").toURI().getPath() + "classes", "utf-8"); File
* f = new File(s); if (!f.exists()) f.mkdirs(); compilerResult =
* dynamicCompilerUtil.compiler("UTF-8",
* System.getProperty("java.class.path"), s, "",
* ClassPathUpdater.addClasspath(""), diagnostics);
*/
String s = URLDecoder.decode(Thread.currentThread().getContextClassLoader().getResource("").toURI().getPath() + "classes", "utf-8");
File f = new File(s);
if (!f.exists())
f.mkdirs();
String dirs = ClassPathUpdater.addClasspath("classes");
File dirfile = new File(dirs);
if (!dirfile.exists())
dirfile.mkdirs();
compilerResult = dynamicCompilerUtil.compiler("UTF-8", System.getProperty("java.class.path"), s, "", dirs, diagnostics);
logger.info( "----编译开始-----");
if (compilerResult) {
logger.info( "----编译成功-----");
} else {
logger.info( "+++++编译失败++++++");
}
} catch (Exception e) {
logger.info( "Creating HttpServervlet classes error.");
e.printStackTrace();
// throw new InterfaceException("01411", e);
}
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.el.CommonFunctionUtils;
import com.brilliance.eibs.main.Client;
import com.sun.net.httpserver.*;
import org.apache.commons.io.IOUtils;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HttpsService extends AbsServer {
public static final String Key_Store_Type = "key_store_type";
public static final String Def_Key_Store_Type = "JKS";
public static final String Key_Manager_Factory = "key_manager_factory";
public static final String Def_Key_Manager_Factory = "SunX509";
public static final String Def_SSL_Context = "SSL";
public static final String INTERFACENAME = "interfaceName";
public static final String TRANSACTIONNAME = "transactionName";
public static final String STOREPATH = "storepath";
public static final String STOREPWD = "storepwd";
public static final String ENCODE = "encode";
public static final String DEF_ENCODE = "UTF-8";
public static final String ROOT_PATH = "/";
public static final int Def_BACKLOG = 1024;
public static final String DELIMITER = ",";
public HttpsService(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "starting HttpService[id:" + serviceDef.getId() + "]." + LOG_FLAG);
try {
String ksttyp = getPropertyValue(Key_Store_Type, Def_Key_Store_Type);
String kmftyp = getPropertyValue(Key_Manager_Factory, Def_Key_Manager_Factory);
final String encode = getPropertyValue(ENCODE, DEF_ENCODE);
int backlog = getPropertyValue(REQUEST_QUEUE_LEN_SYMBOL, Def_BACKLOG);
int port = Integer.parseInt(getRequiredPropertyValue(PORT_SYMBOL));
String uri = getRequiredPropertyValue(URI_SYMBOL);
String interfaceName = getRequiredPropertyValue(INTERFACENAME);
String transactionName = getRequiredPropertyValue(TRANSACTIONNAME);
final Map<String, String[]> handleMap = checkConfigValidity(uri, interfaceName, transactionName);
String storepath = getRequiredPropertyValue(STOREPATH);
String storepwd = getRequiredPropertyValue(STOREPWD);
SSLContext sslContext = getSslContext(storepath, storepwd, ksttyp, kmftyp);
HttpsServer server = HttpsServer.create(new InetSocketAddress(port), backlog);
server.setHttpsConfigurator(new HttpsConfigurator(sslContext) {
@Override
public void configure(HttpsParameters params) {
SSLContext c = getSSLContext();
SSLParameters sslparams = c.getDefaultSSLParameters();
params.setSSLParameters(sslparams);
}
});
server.setExecutor(new ThreadPoolExecutor(8, 200, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(backlog)));
server.createContext(ROOT_PATH, new HttpHandler() {
@Override
public void handle(HttpExchange httpExchange) {
String dsturi = httpExchange.getRequestURI().getPath();
Headers header=httpExchange.getRequestHeaders();
try {
String requestBody = IOUtils.toString(httpExchange.getRequestBody(), encode);
for (Map.Entry<String, String[]> entry : handleMap.entrySet()) {
String orguri = entry.getKey();
if (dsturi.equals(orguri)) {
Object rtnVal = new Client().call(entry.getValue()[0], entry.getValue()[1], new Object[]{requestBody,header}).getContent();
if (rtnVal instanceof String) {
response(httpExchange, (String) rtnVal, encode);
} else {
response(httpExchange, CommonFunctionUtils.toJson(rtnVal), encode);
}
return;
}
}
logger.warn( "当前请求没有匹配的交易处理:" + dsturi);
} catch (Exception e) {
logger.error( "当前请求处理异常:" + dsturi, e);
}
}
});
server.start();
} catch (Exception e) {
logger.error( "https server start failed:" + e.getMessage(), e);
throw new InterfaceException("01410", "https server start failed", e);
}
logger.info( LOG_FLAG + "HttpsService[id:" + serviceDef.getId() + "] is started." + LOG_FLAG);
}
private Map<String, String[]> checkConfigValidity(String uri, String interfaceName, String transactionName) {
String[] uris = uri.split(DELIMITER);
String[] interfaceNames = interfaceName.split(DELIMITER);
String[] transactionNames = transactionName.split(DELIMITER);
if (uris.length == 0) {
throw new InterfaceException("01410", "parameter [uri,interfaceName,transactionName] cannot be empty");
}
if (uris.length != interfaceNames.length || uris.length != transactionNames.length) {
throw new InterfaceException("01410", "The number of three parameters does not match:[uri,interfaceName,transactionName]");
}
Map<String, String[]> rs = new HashMap<String, String[]>();
for (int i = 0; i < uris.length; i++) {
rs.put(uris[i], new String[]{interfaceNames[i], transactionNames[i]});
}
return rs;
}
@Override
public void close() {
}
private SSLContext getSslContext(String storePath, String storePwd, String ksttyp, String kmftyp) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException, KeyManagementException {
KeyStore ks = KeyStore.getInstance(ksttyp);
//载入证书
ks.load(new FileInputStream(storePath), storePwd.toCharArray());
//建立一个密钥管理工厂
KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmftyp);
//初始化工厂
kmf.init(ks, storePwd.toCharArray());
//建立证书实体
SSLContext sslContext = SSLContext.getInstance(Def_SSL_Context);
//初始化证书
sslContext.init(kmf.getKeyManagers(), null, null);
return sslContext;
}
private void response(HttpExchange xchg, String rtnmsg, String encode) {
Headers resheader = xchg.getResponseHeaders();
resheader.set("Content-type", "text/plain;charset=utf-8");
resheader.set("Connection", "Keep-Alive");
OutputStream os = null;
try {
int length = rtnmsg.getBytes(encode).length;
xchg.sendResponseHeaders(200, length);
os = xchg.getResponseBody();
os.write(rtnmsg.getBytes(encode));
os.flush();
} catch (Exception e) {
logger.error( "response occure exception:" + e.getMessage(), e);
throw new InterfaceException("01410", "response occure exception", e);
} finally {
if (os != null) {
try {
os.close();
} catch (IOException e) {
logger.error( "OutputStream close occure exception:" + e.getMessage(), e);
throw new InterfaceException("01410", "OutputStream close occure exception", e);
}
}
}
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.Client;
import com.brilliance.eibs.util.LogUtil;
import org.slf4j.Logger;
/**
* @author hujun
*
* 初始化“服务”,用于需要初次加载的内容, 比如长连接用于首先向对方发起请求,建立链路
*/
public class InitServer extends AbsServer {
public InitServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "InitServer starting ." + LOG_FLAG);
final String interfaceName = getRequiredPropertyValue("interfaceName");
final String transName = getRequiredPropertyValue("transactionName");
/*
* new Thread(new Runnable() {
*
* @Override public void run() {
*/
new Client().call(interfaceName, transName, null);
/*
* } }).start();
*/
logger.info( LOG_FLAG + "InitServer is finished." + LOG_FLAG);
}
@Override
public void close() {
// nothing to do
// not nessesary
}
public void call(String interfaceName, String transName) {
/*
* try { CommonFunctionUtils.sleep(10000); } catch (InterruptedException
* e) { // TODO Auto-generated catch block e.printStackTrace(); }
*/
Logger log = LogUtil.getTrnLog(interfaceName, transName);
// log.info("init....");
new Client().call(interfaceName, transName, null, log);
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.model.impl.ServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.Client;
import com.brilliance.eibs.util.StringUtil;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.math.NumberUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.PropertyValue;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerEndpoint;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import javax.jms.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class JmsListeningServer extends AbsServer {
static final String DESTINATION_TYPE_QUEUE = "queue";
static final String DESTINATION_TYPE_TOPIC = "topic";
static final String DESTINATION_TYPE_DURABLE_TOPIC = "durableTopic";
static final String DESTINATION_TYPE_SHARED_TOPIC = "sharedTopic";
static final String DESTINATION_TYPE_SHARED_DURABLE_TOPIC = "sharedDurableTopic";
final static MessageConverter messageConverter = new SimpleMessageConverter();
private final Map<String, DefaultMessageListenerContainer> containerCache = new HashMap<String, DefaultMessageListenerContainer>();
public JmsListeningServer(Context context, IServiceDef serviceDef,
ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "JmsListeningServer is starting ."
+ LOG_FLAG);
ConnectionFactory connectionFactory;
try {
String connectionFactoryClaz = getRequiredPropertyValue("connectionFactory");
String[] destinations = getRequiredPropertyValue("destination")
.split(",");
String[] interfaceNames = getRequiredPropertyValue("interfaceName")
.split(",");
String[] transactionNames = getRequiredPropertyValue(
"transactionName").split(",");
Class<?> connectionFactoryClazz = Class
.forName(connectionFactoryClaz);
connectionFactory = (ConnectionFactory) BeanUtils
.instantiate(connectionFactoryClazz);
BeanWrapper bw = new BeanWrapperImpl(connectionFactory);
Map<String, String> arguments = ((ServiceDef) this.serviceDef)
.getPropertyArguments("connectionFactory");
String username=null;
String password=null;
if (!arguments.isEmpty()) {
// 连接工厂设置必要属性,动态设置
for (Map.Entry<String, String> entry : arguments.entrySet()) {
String propertyName = entry.getKey();
Object originalValue = entry.getValue();
if("username".equalsIgnoreCase(propertyName)){
username=(String) originalValue;
}else if("password".equalsIgnoreCase(propertyName)){
password=(String) originalValue;
password=new String(Base64.decodeBase64(password));
}else{
Object convertedValue = originalValue;
convertedValue = ((BeanWrapperImpl) bw).convertForProperty(
convertedValue, propertyName);
PropertyValue pv = new PropertyValue(propertyName,
convertedValue);
bw.setPropertyValue(pv);
}
}
}
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if(username!=null){
UserCredentialsConnectionFactoryAdapter ucfa=new UserCredentialsConnectionFactoryAdapter();
ucfa.setTargetConnectionFactory(connectionFactory);
ucfa.setUsername(username);
ucfa.setPassword(password);
factory.setConnectionFactory(ucfa);
}else{
factory.setConnectionFactory(connectionFactory);
}
factory.setMessageConverter(messageConverter);
// 设置本地事务性
factory.setSessionTransacted(true);
// 设置监听类型(队列/主题)
initDestinationType(factory);
// 设置jms资源缓存级别
String cache;
if (StringUtil.isEmpty(getPropertyValue("cache"))) {
cache = "CACHE_SESSION";
} else {
cache = "CACHE_" + getPropertyValue("cache").toUpperCase();
}
factory.setCacheLevelName(cache);
// 设置消费者线程数
String concurrency = getPropertyValue("concurrency");
if (!StringUtil.isEmpty(concurrency)) {
factory.setConcurrency(concurrency);
}
// 设置每个消费者处理多少消息
String prefetch = getPropertyValue("prefetch");
factory.setMaxMessagesPerTask(NumberUtils.toInt(prefetch, 5));
// 设置收取消息超时时间(毫秒)
String receiveTimeout = getPropertyValue("receiveTimeout");
if (!StringUtil.isEmpty(receiveTimeout)) {
factory.setReceiveTimeout(NumberUtils.toLong(receiveTimeout));
}
for (int i=0;i<destinations.length;i++) {
String destination=destinations[i];
if (!containerCache.containsKey(destination)) {
// 设置监听端点
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
MessageListenerAdapter messageListener = new MessageListenerAdapter();
messageListener.setDefaultListenerMethod("handle");
messageListener.setDelegate(new MessageHandler(
interfaceNames[i], transactionNames[i]));
endpoint.setMessageListener(messageListener);
endpoint.setDestination(destination);
// 创建jms监听容器
DefaultMessageListenerContainer container = factory
.createListenerContainer(endpoint);
// 初始化jms监听容器
container.initialize();
// 启动jms监听容器
container.start();
containerCache.put(destination, container);
}
}
} catch (Throwable e) {
logger.error( LOG_FLAG + "JmsListeningServer start error."
+ LOG_FLAG, e);
close();
}
logger.info( LOG_FLAG + "JmsListeningServer start is finished."
+ LOG_FLAG);
}
private void initDestinationType(DefaultJmsListenerContainerFactory factory) {
String destinationType = getPropertyValue("destinationType");
boolean pubSubDomain = false;
boolean subscriptionDurable = false;
boolean subscriptionShared = false;
if (DESTINATION_TYPE_SHARED_DURABLE_TOPIC.equals(destinationType)) {
pubSubDomain = true;
subscriptionDurable = true;
subscriptionShared = true;
} else if (DESTINATION_TYPE_SHARED_TOPIC.equals(destinationType)) {
pubSubDomain = true;
subscriptionShared = true;
} else if (DESTINATION_TYPE_DURABLE_TOPIC.equals(destinationType)) {
pubSubDomain = true;
subscriptionDurable = true;
} else if (DESTINATION_TYPE_TOPIC.equals(destinationType)) {
pubSubDomain = true;
} else if (destinationType == null || "".equals(destinationType)
|| DESTINATION_TYPE_QUEUE.equals(destinationType)) {
// the default: queue
} else {
// 给出警告,依旧使用默认队列类型作为监听模式
logger.error(
LOG_FLAG
+ "Invalid listener container 'destination-type': only "
+ "\"queue\", \"topic\", \"durableTopic\", \"sharedTopic\", \"sharedDurableTopic\" supported."
+ LOG_FLAG);
}
factory.setPubSubDomain(pubSubDomain);
factory.setSubscriptionDurable(subscriptionDurable);
factory.setSubscriptionShared(subscriptionShared);
}
@Override
public void close() {
for(Map.Entry<String, DefaultMessageListenerContainer> item: containerCache.entrySet()){
DefaultMessageListenerContainer container=item.getValue();
if (container != null) {
container.shutdown();
container = null;
}
}
containerCache.clear();
}
class MessageHandler {
private String interfaceName;
private String transactionName;
public MessageHandler(String interfaceName, String transactionName) {
this.interfaceName = interfaceName;
this.transactionName = transactionName;
}
public void handle(Object message) {
Client client = new Client();
client.call(interfaceName, transactionName, new Object[] { message });
}
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.server.link_v2_0.*;
import com.brilliance.eibs.util.LogUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class LinkServer extends AbsServer {
private List<LinkDispatch> dispatchs = new ArrayList<LinkDispatch>();
private ExecutorService executorService;
public LinkServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "LinkServer starting ." + LOG_FLAG);
String links = getRequiredPropertyValue(LINKS_SYMBOL);// 127.0.0.1:10001:10002
int headTagLen = Integer.valueOf(getRequiredPropertyValue(HEAD_TAG_LEN_SYMBOL));
String heartbeat = getPropertyValue(HEARTBEAT_SYMBOL);
String fin = getPropertyValue(FIN_SYMBOL);
long interval = Long.valueOf(getPropertyValue(INTERVAL_SYMBOL));
long detect = Long.valueOf(getPropertyValue(DETECT_SYMBOL));
int timeout = Integer.valueOf(getPropertyValue(TIMEOUT_SYMBOL));
String interfaceName = getRequiredPropertyValue("interfaceName");
String transcationId = getRequiredPropertyValue("transactionName");
List<LinkData> datas = LinkParser.createSimplexLinkDatas(links);
executorService = Executors.newFixedThreadPool(datas.size());
for (LinkData data : datas) {
Link link = new SimplexLink(data, LogUtil.getLogger(context));
link.setTimeout(timeout);
Heartbeat hb = new Heartbeat(interval, heartbeat.getBytes(), link);
ReceiveDetect receiveDetect = new ReceiveDetect(detect, link);
link.setHeartbeat(hb);
link.setReceiveDetect(receiveDetect);
LinkDispatch dispatch = new LinkDispatch(link, interfaceName, transcationId);
dispatch.setHeadTagLen(headTagLen);
dispatch.setFin(fin.getBytes());
dispatchs.add(dispatch);
executorService.submit(dispatch);
}
logger.info(LOG_FLAG + "LinkServer is finished." + LOG_FLAG);
}
@Override
public void close() {
for (LinkDispatch dispatch : dispatchs)
dispatch.interrupted();
executorService.shutdownNow();
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.Client;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQServer extends AbsServer {
private DefaultMQPushConsumer consumer;
public RocketMQServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "RocketMQServer starting ." + LOG_FLAG);
final String interfaceName = getRequiredPropertyValue("interfaceName");
final String transactionName = getRequiredPropertyValue("transactionName");
String groupName = getRequiredPropertyValue("groupName");
String serverAddress = getRequiredPropertyValue("serverAddress");
String topic = getRequiredPropertyValue("topic");
String subExpression = getPropertyValue("subExpression", "*");
// boolean isPush = getPropertyValue("push",false);
try {
// Instantiate with specified consumer group name.
consumer = new DefaultMQPushConsumer(groupName);
// Specify name server addresses.
consumer.setNamesrvAddr(serverAddress);
// Subscribe one more more topics to consume.
consumer.subscribe(topic, subExpression);
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Client client = new Client();
client.call(interfaceName, transactionName, new Object[] {msgs});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch the consumer instance.
consumer.start();
} catch (MQClientException e) {
logger.error( LOG_FLAG + "RocketMQServer error." + LOG_FLAG, e);
close();
}
logger.info( LOG_FLAG + "RocketMQServer is finished." + LOG_FLAG);
}
@Override
public void close() {
if (null != consumer)
consumer.shutdown();
}
}
package com.brilliance.eibs.server;
import com.aliyun.openservices.ons.api.*;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.Client;
import com.brilliance.eibs.util.StringUtil;
import org.slf4j.Logger;
import java.util.Properties;
public class RocketMQServerOns extends AbsServer {
private Consumer consumer = null;
public RocketMQServerOns(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "RocketMQOnsServer starting ." + LOG_FLAG);
Properties properties;
String CONSUMER_ID = ""; //消费者ID
String ACCESS_KEY = ""; //阿里云身份验证AccessKey
String SECRET_KEY = ""; //阿里云身份验证SecretKey
String ONS_ADDR = ""; //消息队列服务器地址
String NAMESRV_ADDR = ""; //消息队列服务器地址
String[] TOPIC; //消息主题组
String[] TAG; //消息标签组
int CONSUME_THREAD_NUM = 0; //消费线程数
int MAX_RECONSUME_TIMES = 0; //消息消费失败的最大重试次数
int CONSUME_TIMEOUT = 0; //每条消息消费的最大超时时间
int SUSPEND_TIME_MILLIS = 0; //消息消费失败的重试间隔时间(只适用于顺序消息)
String interfaceName = getRequiredPropertyValue("interfaceName");
String transactionName = getRequiredPropertyValue("transactionName");
CONSUMER_ID = getRequiredPropertyValue("GroupId");
ACCESS_KEY = getPropertyValue("AccessKey");
SECRET_KEY = getPropertyValue("SecretKey");
ONS_ADDR = getPropertyValue("ONSAddr");
NAMESRV_ADDR = getPropertyValue("NamesrvAddr");
TOPIC = getRequiredPropertyValue("topic").split(",");
TAG = getRequiredPropertyValue("tag").split(",");
CONSUME_THREAD_NUM = getPropertyValue("ConsumeThreadNums", 5);
MAX_RECONSUME_TIMES = getPropertyValue("MaxReconsumeTimes", 1);
CONSUME_TIMEOUT = getPropertyValue("ConsumeTimeout", 15);
SUSPEND_TIME_MILLIS = getPropertyValue("SuspendTimeMillis", 3000);
//校验topic和tag数量是否一致,支持多消息订阅
checkTopicAndTag(TOPIC, TAG);
properties = new Properties();
//消息消费者ID
properties.put(PropertyKeyConst.GROUP_ID, CONSUMER_ID);
//阿里云身份验证AccessKey
properties.put(PropertyKeyConst.AccessKey, ACCESS_KEY);
//阿里云身份验证secretKey
properties.put(PropertyKeyConst.SecretKey, SECRET_KEY);
//消息队列服务器地址
if (!StringUtil.isEmpty(ONS_ADDR)) {
properties.put(PropertyKeyConst.ONSAddr, ONS_ADDR);
} else if (!StringUtil.isEmpty(NAMESRV_ADDR)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, NAMESRV_ADDR);
} else {
logger.error(LOG_FLAG + "不支持的接入方式." + LOG_FLAG);
throw new IllegalAccessError("不支持的接入方式.");
}
//设置 Consumer 实例的消费线程数,默认:64 (订阅 消费者)
properties.put(PropertyKeyConst.ConsumeThreadNums, CONSUME_THREAD_NUM);
//设置消息消费失败的最大重试次数,默认:16 (订阅 消费者)
properties.put(PropertyKeyConst.MaxReconsumeTimes, MAX_RECONSUME_TIMES);
//设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败, (订阅 消费者)
//等下次重新投递再次消费。每个业务需要设置一个合理的值,单位(分钟)。默认:15
properties.put(PropertyKeyConst.ConsumeTimeout, CONSUME_TIMEOUT);
//只适用于顺序消息,设置消息消费失败的重试间隔时间 单位(毫秒) (订阅 消费者)
properties.put(PropertyKeyConst.SuspendTimeMillis, SUSPEND_TIME_MILLIS);
try {
logger.info("开始创建Consumer...");
consumer = ONSFactory.createConsumer(properties);
for (int i = 0; i < TOPIC.length; i++) {
String tp = TOPIC[i];
String tg = TAG[i];
if (StringUtil.isEmpty(tp))
continue;
logger.info("订阅的Topic" + i + ":" + tp);
logger.info("订阅的Tag" + i + ":" + tg);
consumer.subscribe(tp, tg, new MessageListenerImpl(interfaceName, transactionName, context, logger));
}
consumer.start();
} catch (Exception e) {
logger.error(LOG_FLAG + "RocketMQOnsServer error." + LOG_FLAG, e);
close();
}
logger.info(LOG_FLAG + "RocketMQOnsServer is finished." + LOG_FLAG);
}
private void checkTopicAndTag(String[] topic, String[] tag) {
if (topic.length != tag.length) {
throw new IllegalArgumentException("消息主题【TOPIC】和消息标签【TAG】数量不一致!消息主题有" + topic.length + "个,消息标签有" + tag.length);
}
}
@Override
public void close() {
if (null != consumer)
consumer.shutdown();
}
}
class MessageListenerImpl implements MessageListener {
private String interfaceName;
private String transactionName;
private Context context;
private Logger logger;
public MessageListenerImpl(String interfaceName, String transactionName, Context context, Logger logger) {
super();
this.interfaceName = interfaceName;
this.transactionName = transactionName;
this.context = context;
this.logger = logger;
}
@Override
public Action consume(Message message, ConsumeContext ctx) {
String topicTagStr = "";
String msgId = message.getMsgID();
try {
topicTagStr = message.getTopic() + "," + message.getTag();
logger.info("msgId=" + msgId + "当前Topic+Tag=" + topicTagStr);
// 查询消息消费处理
Client client = new Client();
client.call(interfaceName, transactionName, new Object[]{message.getBody(), msgId});
} catch (Exception e) {
logger.error("查询消息监听类异常:", e);
}
return Action.CommitMessage;
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.frame.SocketProxyFactry;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.server.socket.IServerMode;
import com.brilliance.eibs.server.socket.impl.SocketServerMode;
import com.brilliance.eibs.util.LogUtil;
/**
* socket服务
*
* @author gechengyang
*/
public class ShortSocketServer extends AbsServer {
private int port;
private int queueLen;
private int size;
private String delayTime;
private String interfaceName;
private String transName;
private int timeout;
private IServerMode mode;
private boolean has_head = true;// 是否有报文头
private String head_len_type = IServerInstance.HEAD_LEN_TYPE_10;// 报文头长度是二进制还是十进制
private int head_len = 0;// 报文头长度
private boolean is_contain_head_len = false;// 报文头长度是否包含报文头
private int fill_len = 0;// 默认为0
private boolean is_contain_fill_len = false;// 报文头长度是否包含报文头和报文体之间数据的长度 true
// false
private int body_offset = 0;// 报文体位移量 默认为0, -1为减去1个长度,+1为增加1个长度
private String encoding = "UTF-8";
public ShortSocketServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "SocketServer starting ." + LOG_FLAG);
// 服务发布的端口
port = Integer.valueOf(getRequiredPropertyValue(PORT_SYMBOL));
logger.info("socketserver port=" + port);
// 等待队列的长度
queueLen = Integer.valueOf(getRequiredPropertyValue(REQUEST_QUEUE_LEN_SYMBOL));
size = Integer.valueOf(getRequiredPropertyValue(THREAD_POOL_SIZE_SYMBOL));
String trustips = getPropertyValue(TRUST_IP_SYMBOL);
// 延迟启动时间
delayTime = getPropertyValue(DELAY_TIME);
String type = getPropertyValue(TYPE);
interfaceName = getRequiredPropertyValue("interfaceName");
transName = getRequiredPropertyValue("transactionName");
timeout = Integer.valueOf(getRequiredPropertyValue(RECEIVE_TIMEOUT_SYMBOL));
has_head = getPropertyValue(HAS_CONTAIN_HEAD, true);
head_len_type = getPropertyValue(HEAD_LEN_TYPE, HEAD_LEN_TYPE_10);
head_len = getPropertyValue(HEAD_LEN, 0);
is_contain_head_len = getPropertyValue(IS_CONTAIN_HEAD_LEN, false);
fill_len = getPropertyValue(FILL_LEN, 0);
is_contain_fill_len = getPropertyValue(IS_CONTAIN_FILL_LEN, false);
body_offset = getPropertyValue(BODY_OFFSET, 0);
encoding = getPropertyValue(ENCODING, "UTF-8");
if (SocketProxyFactry.MINA_TYPE.equals(type)) {
} else if (SocketProxyFactry.ORG_SOCKET_TYPE.equals(type)) {
mode = new SocketServerMode(SocketProxyFactry.SHORT_SOCKET_TYPE, trustips, port, queueLen, size, delayTime, interfaceName, transName,
timeout, serviceDef.getExpression(), args, serviceDef.getId(), LogUtil.getLogger(context), has_head, head_len_type,
head_len, is_contain_head_len, fill_len, is_contain_fill_len, body_offset, encoding);
}
mode.startup();
logger.info(LOG_FLAG + "SocketServer is finished." + LOG_FLAG);
}
/**
* 关闭ServerSocket
*/
@Override
public void close() {
mode.close();
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.snmp.server.SnmpEngine;
/**
* SnmpServer
*
* @author gechengyang
*/
public class SnmpServer extends AbsServer {
private org.eclipse.jetty.server.Server server;
public SnmpServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "starting SnmpService[id:" + serviceDef.getId() + "]." + LOG_FLAG);
try {
String adress = getRequiredPropertyValue("url");
String timer = getRequiredPropertyValue("time");
int time = Integer.parseInt(timer);
String oids = getRequiredPropertyValue("oids");
String interfaceName = getRequiredPropertyValue("interfaceName");
String transactionName = getRequiredPropertyValue("transactionName");
SnmpEngine engine = new SnmpEngine(adress, time, oids, interfaceName, transactionName, logger);
engine.run();
} catch (Exception e) {
e.printStackTrace();
new InterfaceException("05001", e);
}
logger.info(LOG_FLAG + "SnmpService[id:" + serviceDef.getId() + "] is started." + LOG_FLAG);
}
@Override
public void close() {
if (server != null)
try {
server.stop();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// server.destroy();
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.frame.SocketProxyFactry;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.server.socket.IServerMode;
import com.brilliance.eibs.server.socket.impl.MinaNioMode;
import com.brilliance.eibs.server.socket.impl.SocketServerMode;
import com.brilliance.eibs.util.LogUtil;
/**
* socket服务
*
* @author xiaoyuanzhen ------------modified by weicong on 2016/4/01 -----------
*/
public class SocketServer extends AbsServer {
private int port;
private int queueLen;
private int size;
private String delayTime;
private String interfaceName;
private String transName;
private int timeout;
private IServerMode mode;
public SocketServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "SocketServer starting ." + LOG_FLAG);
// 服务发布的端口
port = Integer.valueOf(getRequiredPropertyValue(PORT_SYMBOL));
logger.info("socketserver port=" + port);
// 等待队列的长度
queueLen = Integer.valueOf(getRequiredPropertyValue(REQUEST_QUEUE_LEN_SYMBOL));
size = Integer.valueOf(getRequiredPropertyValue(THREAD_POOL_SIZE_SYMBOL));
String trustips = getPropertyValue(TRUST_IP_SYMBOL);
// 延迟启动时间
delayTime = getPropertyValue(DELAY_TIME);
String type = getPropertyValue(TYPE);
interfaceName = getRequiredPropertyValue("interfaceName");
transName = getRequiredPropertyValue("transactionName");
timeout = Integer.valueOf(getRequiredPropertyValue(RECEIVE_TIMEOUT_SYMBOL));
if (SocketProxyFactry.MINA_TYPE.equals(type)) {
mode = new MinaNioMode.Builder(port, queueLen).setProps(serviceDef.getPropertyMap()).setInterfaceName(interfaceName)
.setServerId(serviceDef.getId()).setTimeout(timeout).setTransName(transName).setTrustips(trustips)
.setLogger(LogUtil.getLogger(context)).build();
} else
/*
* mode = new SocketServerMode.Builder(port, queueLen,
* size).setArgs(args)
* .setDelayTime(delayTime).setExpression(serviceDef.getExpression())
* .setInterfaceName(interfaceName).setServerId(serviceDef.getId())
* .setTimeout(timeout).setTransName(transName)
* .setTrustips(trustips).setLogger
* (BeaLogFactory.getBeaLog(context)).build();
*/ {
mode = new SocketServerMode(SocketProxyFactry.ORG_SOCKET_TYPE, trustips, port, queueLen, size, delayTime, interfaceName, transName,
timeout, serviceDef.getExpression(), args, serviceDef.getId(), LogUtil.getLogger(context));
}
mode.startup();
logger.info(LOG_FLAG + "SocketServer is finished." + LOG_FLAG);
}
/**
* 关闭ServerSocket
*/
@Override
public void close() {
mode.close();
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.server.connection.Detect;
import com.brilliance.eibs.server.connection.FilterChain;
import com.brilliance.eibs.server.connection.impl.TransFilter;
import com.brilliance.eibs.server.terminal.Terminal;
import com.brilliance.eibs.server.terminal.TerminalPool;
import com.brilliance.eibs.server.terminal.impl.AsyncFixedSocketTerminal;
import java.io.IOException;
public class TerminalServer extends AbsServer {
private Terminal terminal;
private TerminalListner listener;
public TerminalServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "TerminalServer starting ." + LOG_FLAG);
int port = Integer.valueOf(getRequiredPropertyValue("port"));
String ip = getRequiredPropertyValue("ip");
String interfaceName = getRequiredPropertyValue("interfaceName");
String transactionId = getRequiredPropertyValue("transactionName");
int length = Integer.valueOf(getRequiredPropertyValue("length"));
FilterChain chain = new FilterChain();
chain.addResponseFilter(new TransFilter(interfaceName, transactionId));
terminal = new AsyncFixedSocketTerminal(ip, port, length, chain);
TerminalPool.addTerminal(serviceDef.getId(), terminal);
listener = new TerminalListner();
Thread t = new Thread(listener);
t.setName("Asynchronous Terminal '" + serviceDef.getId() + "' Server Listener");
t.setDaemon(true);
t.start();
terminal.startup();
logger.info(LOG_FLAG + "TerminalServer is finished." + LOG_FLAG);
}
@Override
public void close() {
listener.interrupted();
terminal.close();
TerminalPool.removeTerminal(terminal);
}
private class TerminalListner extends Detect {
private long interval = 60000;
private TerminalListner() {
}
private TerminalListner(long interval) {
this.interval = interval;
}
@Override
protected void exception() {
interrupted();
}
@Override
protected void handle() throws IOException {
try {
Thread.sleep(interval);
logger.info("[" + serviceDef.getId() + "] terminal status : " + terminal.isConnected());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
@Override
protected void close() {
logger.info("TerminalListner of [" + serviceDef.getId() + "] terminal is closed.");
}
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.JmsHandle;
import com.tibco.tibjms.TibjmsQueueConnectionFactory;
import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TibcoJmsServer extends AbsServer
{
private QueueReceiver receiver;
private ExecutorService executorService;
public TibcoJmsServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
String serverUrl = getRequiredPropertyValue(SERVER_URL);
String userName = getRequiredPropertyValue(USER_NAME);
String password = getPropertyValue(PASSWORD);
String queuename = getRequiredPropertyValue(QUEUE);
String interfaceName = getRequiredPropertyValue("interfaceName");
String transName = getRequiredPropertyValue("transactionName");
int size = Integer.valueOf(getRequiredPropertyValue(THREAD_POOL_SIZE_SYMBOL));
executorService = Executors.newFixedThreadPool(size);
try {
TibjmsQueueConnectionFactory factory = new TibjmsQueueConnectionFactory(serverUrl);
QueueConnection connection = factory.createQueueConnection(userName, password);
// QueueConnection connection2 = factory.createQueueConnection(
// userName, password);
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// QueueSession session2 = connection.createQueueSession(false,
// Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queuename);
// Queue queue2 = session2.createQueue(queuename);
receiver = session.createReceiver(queue);
// QueueReceiver receiver2 = session.createReceiver(queue2);
connection.start();
// connection2.start();
while (true) {
Message msgMessage = receiver.receive();
if (msgMessage instanceof TextMessage) {
try {
String str = ((TextMessage) msgMessage).getText();
executorService.submit(new JmsHandle(interfaceName, transName, str));
// new Thread(new JmsHandle(interfaceName, transName,
// str)).start();
} catch (JMSException e) {
receiver.close();
}
}
}
} catch (Exception e) {
} finally {
close();
}
}
@Override
public void close() {
if (null != receiver)
try {
receiver.close();
} catch (JMSException e) {
}
if (null != executorService)
executorService.shutdown();
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.main.UdpHandle;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
/**
* udpserver
*
* @author gechengyang
*
*/
public class UdpServer extends AbsServer {
private boolean isOpen = true;
public UdpServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info( LOG_FLAG + "UdpServer starting ." + LOG_FLAG);
try {
// 服务发布的端口
int port = Integer.valueOf(getRequiredPropertyValue(PORT_SYMBOL));
int buffersize = Integer.valueOf(getRequiredPropertyValue(BUFFER_SIZE));
logger.info( "socketserver port=" + port);
String interfaceName = getRequiredPropertyValue("interfaceName");
String transName = getRequiredPropertyValue("transactionName");
DatagramSocket server = new DatagramSocket(port);
logger.info( "server start at port: " + port);
while (isOpen) {
DatagramPacket dataPacket = null;
byte buffer[] = new byte[buffersize];
dataPacket = new DatagramPacket(buffer, buffer.length);
server.receive(dataPacket);
String revs = new String(buffer, 0, dataPacket.getLength());
logger.info( "dataPackegt.length=" + dataPacket.getLength());
logger.info( "server recive:" + revs);
byte[] b = new byte[dataPacket.getLength()];
System.arraycopy(buffer, 0, b, 0, dataPacket.getLength());
// new Thread(new UdpHandle(interfaceName, transName, revs,
// dataPacket, server)).start();
new Thread(new UdpHandle(interfaceName, transName, b, dataPacket, server)).start();
// byte[] sndinfo = "1234567".getBytes();
// DatagramPacket dataPacket1 = new DatagramPacket(sndinfo,
// sndinfo.length, dataPacket.getAddress(),
// dataPacket.getPort());
// server.send(dataPacket1);
// server.close();
}
close();
} catch (Exception e) {
throw new InterfaceException("03001", e);
} finally {
}
logger.info( LOG_FLAG + "UdpServer is finished." + LOG_FLAG);
}
@Override
public void close() {
}
}
package com.brilliance.eibs.server;
import com.brilliance.eibs.bean.ParseConfig;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.core.model.IServiceDef;
import com.brilliance.eibs.core.service.Context;
import com.brilliance.eibs.core.service.instance.plugin.MessageInterceptor;
import com.brilliance.eibs.el.ApacheEL;
import com.brilliance.eibs.util.ClassPathUpdater;
import com.brilliance.eibs.util.Constants;
import com.brilliance.eibs.util.StringUtil;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
import org.apache.cxf.transports.http.configuration.HTTPServerPolicy;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
/**
* webservice服务
*
* @author xiaoyuanzhen
*/
public class WSServer extends AbsServer {
private static Map<String, Boolean> generatedMap = new HashMap<String, Boolean>();
private JaxWsServerFactoryBean server;
public WSServer(Context context, IServiceDef serviceDef, ApacheEL elParser) {
super(context, serviceDef, elParser);
}
@Override
public void run() {
logger.info(LOG_FLAG + "starting WebService[id:" + serviceDef.getId() + "]." + LOG_FLAG);
try {
setReceiveTimeOut();
// 在同一个线程内只执行一次生成
String flg = getPropertyValue("compile");
if (StringUtil.isEmpty(flg)) {
flg = "true";
}
if (Boolean.valueOf(flg)) {
generateBeans();
}
String uri = getRequiredPropertyValue(URI_SYMBOL);
logger.info("webservice server url=" + uri);
String clazz = getRequiredPropertyValue(CLASS_SYMBOL);
String interfaceName = getPropertyValue("interfaceName");
String transName = getPropertyValue("transactionName");
server = new JaxWsServerFactoryBean();
Class<?> z = null;
if (Boolean.valueOf(flg)) {
z = ClassPathUpdater.dynamicLoadClass(clazz);
} else
z = Class.forName(clazz);
server.setServiceClass(z);
server.setAddress(uri);
context.addVariable(null, "cronExpression", serviceDef.getExpression());
if (!StringUtil.isEmpty(interfaceName)) {
server.getInInterceptors().add(new MessageInterceptor(context, interfaceName, transName, Phase.PRE_PROTOCOL));
server.getInInterceptors().add(new LoggingInInterceptor());
server.getOutInterceptors().add(new LoggingOutInterceptor());
}
Server endpoint = server.create();
configTimeout(endpoint);
} catch (Exception e) {
recoveryServer(server);
throw new InterfaceException("02701", e);
}
logger.info(LOG_FLAG + "WebService[id:" + serviceDef.getId() + "] is started." + LOG_FLAG);
}
private void setReceiveTimeOut() {
String rcv_timeout = getPropertyValue(RECEIVE_TIMEOUT_SYMBOL);
if (rcv_timeout != null) {
int receiveTimeout = Integer.valueOf(rcv_timeout);
if (receiveTimeout != 0) {
this.receiveTimeout = receiveTimeout;
}
logger.debug("Receive timeout = " + this.receiveTimeout);
}
}
/**
* 设置返回超时
*
* @param service
*/
private void configTimeout(Server server) {
JettyHTTPDestination destination = (JettyHTTPDestination) server.getDestination();
HTTPServerPolicy httpServerPolicy = new HTTPServerPolicy();
httpServerPolicy.setReceiveTimeout(receiveTimeout);
destination.setServer(httpServerPolicy);
}
/**
* 回收JaxWsServerFactoryBean
*
* @param server
*/
private void recoveryServer(JaxWsServerFactoryBean server) {
if (server != null)
server.destroy();
}
/**
* 判断是否生成过class,如果没生成自动生成
*/
public void generateBeans() {
String beanProp = getPropertyValue("beanname");
if (!StringUtil.isEmpty(beanProp) && !generatedMap.containsKey(beanProp)) {
generatedMap.put(beanProp, true);
logger.info("Creating webservice classes.");
generateBeans(ClassPathUpdater.addClasspath(Constants.BEANS_PATH) + File.separator + beanProp, "");
logger.info("Creating webservice classes successfully.");
}
}
private void generateBeans(String configpath, String generatepath) {
ParseConfig config = new ParseConfig();
config.setLogger(logger);
config.generateJavaBeanAndClass(config.parse(configpath), generatepath);
}
@Override
public void close() {
server.destroy();
logger.info("webservice[id:" + serviceDef.getId() + "] is closed.");
}
}
package com.brilliance.snmp.server;
import com.brilliance.eibs.core.exception.InterfaceException;
import com.brilliance.eibs.main.Client;
import org.slf4j.Logger;
import org.snmp4j.MessageDispatcher;
import org.snmp4j.MessageDispatcherImpl;
import org.snmp4j.TransportMapping;
import org.snmp4j.agent.*;
import org.snmp4j.agent.cfg.EngineBootsCounterFile;
import org.snmp4j.agent.example.Modules;
import org.snmp4j.agent.io.DefaultMOPersistenceProvider;
import org.snmp4j.agent.io.MOInput;
import org.snmp4j.agent.io.MOInputFactory;
import org.snmp4j.agent.io.prop.PropertyMOInput;
import org.snmp4j.agent.mo.DefaultMOFactory;
import org.snmp4j.agent.mo.MOAccessImpl;
import org.snmp4j.agent.mo.MOFactory;
import org.snmp4j.agent.mo.MOScalar;
import org.snmp4j.agent.mo.util.VariableProvider;
import org.snmp4j.agent.request.Request;
import org.snmp4j.agent.request.RequestStatus;
import org.snmp4j.agent.request.SubRequest;
import org.snmp4j.agent.request.SubRequestIterator;
import org.snmp4j.log.JavaLogFactory;
import org.snmp4j.log.LogFactory;
import org.snmp4j.mp.MPv3;
import org.snmp4j.smi.*;
import org.snmp4j.transport.TransportMappings;
import org.snmp4j.util.ThreadPool;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
/**
* Created by gechengyang on 2014/6/21.
*/
public class SnmpEngine
implements VariableProvider
{
static
{
LogFactory.setLogFactory(new JavaLogFactory());
}
protected AgentConfigManager agent;
protected MOServer server;
private String configFile;
private File bootCounterFile;
private Map<String, MOScalar> map = new HashMap<String, MOScalar>();
// supported MIBs
protected Modules modules;
private int time;
private String oids;
private String interfaceName;
private String transactionName;
protected Logger logger;
public SnmpEngine( String address, int time, String oids, String interfaceName, String transactionName, Logger logger )
{
this.time = time;
this.oids = oids;
this.interfaceName = interfaceName;
this.transactionName = transactionName;
this.logger=logger;
// ????????????
configFile = "SampleAgent.cfg";
bootCounterFile = new File("SampleAgent.bc");
server = new DefaultMOServer();
MOServer[] moServers = new MOServer[]{server};
// ??config properties??
InputStream configInputStream = SnmpEngine.class.getResourceAsStream("SampleAgentConfig.properties");
// ?properties??
final Properties props = new Properties();
try
{
props.load(configInputStream);
}
catch (IOException ex)
{
ex.printStackTrace();
}
// ??config file
MOInputFactory configurationFactory = new MOInputFactory()
{
public MOInput createMOInput()
{
return new PropertyMOInput(props, SnmpEngine.this);
}
};
List<String> addressList = new ArrayList<String>();
addressList.add(address);
MessageDispatcher messageDispatcher = new MessageDispatcherImpl();
addListenAddresses(messageDispatcher, addressList);
agent = new AgentConfigManager(new OctetString(MPv3.createLocalEngineID()), messageDispatcher, null, moServers, ThreadPool.create(
"SnmpServer", 3), configurationFactory, new DefaultMOPersistenceProvider(moServers, configFile), new EngineBootsCounterFile(
bootCounterFile));
}
protected void addListenAddresses(MessageDispatcher md, List addresses)
{
for (Iterator it = addresses.iterator(); it.hasNext();)
{
Address address = GenericAddress.parse((String) it.next());
TransportMapping tm = TransportMappings.getInstance().createTransportMapping(address);
if (tm != null)
{
md.addTransportMapping(tm);
}
else
{
logger.warn("No transport mapping available for address '" + address + "'.");
}
}
}
public void run()
{
// initialize agent before registering our own modules
agent.initialize();
// this requires sysUpTime to be available.
registerMIBs();
// now continue agent setup and launch it.
agent.run();
}
protected void registerMIBs()
{
if (modules == null)
modules = new Modules(getFactory());
try
{
// dealOid();
modules.registerMOs(server, null);
Timer timer = new Timer();
timer.schedule(new TimerTask()
{
@Override
public void run()
{
logger.info("snmp server begin to update......");
//????oid
for (String key : map.keySet())
{
server.unregister(map.get(key), null);
}
map.clear();
try
{
dealOid();
}
catch (DuplicateRegistrationException e)
{
new InterfaceException("05002", e);
}
}
}, 0, time);
}
catch (DuplicateRegistrationException e)
{
new InterfaceException("05002", e);
}
}
public void dealOid()
throws DuplicateRegistrationException
{
String oid[] = oids.split(",");
for (int i = 0; i < oid.length; i++)
{
Map<String, Object> dataMap=(Map<String, Object>) new Client().call(interfaceName, transactionName, oid[i]).getContent();
if(dataMap==null)
{
logger.info("dataMap of oids is null.......... wait for next cycle!");
return;
}
//dataMap=new Client.call(); ??Map ???OID???????
for (String key : dataMap.keySet())
{
Object value =dataMap.get(key);
//???? TODO ??????????????????
key = key + ".0";
MOScalar myScalar=null;
if(value instanceof Integer)
{
myScalar = new MOScalar(new OID(key), MOAccessImpl.ACCESS_READ_CREATE, new Integer32(Integer.parseInt(String.valueOf(value))));
}
else
{
myScalar = new MOScalar(new OID(key), MOAccessImpl.ACCESS_READ_CREATE, new OctetString(String.valueOf(value)));
}
map.put(key, myScalar);
server.register(myScalar, null);
}
}
}
public Variable getVariable(String name)
{
OID oid;
OctetString context = null;
int pos = name.indexOf(':');
if (pos >= 0)
{
context = new OctetString(name.substring(0, pos));
oid = new OID(name.substring(pos + 1, name.length()));
}
else
{
oid = new OID(name);
}
final DefaultMOContextScope scope = new DefaultMOContextScope(context, oid, true, oid, true);
MOQuery query = new DefaultMOQuery(scope, false, this);
ManagedObject mo = server.lookup(query);
if (mo != null)
{
final VariableBinding vb = new VariableBinding(oid);
final RequestStatus status = new RequestStatus();
SubRequest req = new SubRequest()
{
private boolean completed;
private MOQuery query;
public boolean hasError()
{
return false;
}
public void setErrorStatus(int errorStatus)
{
status.setErrorStatus(errorStatus);
}
public int getErrorStatus()
{
return status.getErrorStatus();
}
public RequestStatus getStatus()
{
return status;
}
public MOScope getScope()
{
return scope;
}
public VariableBinding getVariableBinding()
{
return vb;
}
public Request getRequest()
{
return null;
}
public Object getUndoValue()
{
return null;
}
public void setUndoValue(Object undoInformation)
{}
public void completed()
{
completed = true;
}
public boolean isComplete()
{
return completed;
}
public void setTargetMO(ManagedObject managedObject)
{}
public ManagedObject getTargetMO()
{
return null;
}
public int getIndex()
{
return 0;
}
public void setQuery(MOQuery query)
{
this.query = query;
}
public MOQuery getQuery()
{
return query;
}
public SubRequestIterator repetitions()
{
return null;
}
public void updateNextRepetition()
{}
public Object getUserObject()
{
return null;
}
public void setUserObject(Object userObject)
{
}
};
mo.get(req);
return vb.getVariable();
}
return null;
}
protected MOFactory getFactory()
{
return DefaultMOFactory.getInstance();
}
public static void main(String[] args)
{
String address = "udp:127.0.0.1/166";
SnmpEngine sampleAgent = new SnmpEngine(address,5000,"1.3.4,2.4.5","test","test",null);
// Add all available security protocols (e.g. SHA,MD5,DES,AES,3DES,..)
//SecurityProtocols.getInstance().addDefaultProtocols();
sampleAgent.run();
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment