Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
B
be-esb-plugin
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
be-esb-ecosystem-maven
be-esb-plugin
Commits
75256457
Commit
75256457
authored
Sep 19, 2022
by
WeiCong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
优化jms服务
parent
3fc85bd2
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
209 additions
and
185 deletions
+209
-185
JmsListeningServer.java
...n/java/com/brilliance/eibs/server/JmsListeningServer.java
+209
-185
No files found.
be-esb-plugin-jms/src/main/java/com/brilliance/eibs/server/JmsListeningServer.java
View file @
75256457
...
...
@@ -9,7 +9,6 @@ import com.brilliance.eibs.util.StringUtil;
import
org.apache.commons.codec.binary.Base64
;
import
org.apache.commons.lang.math.NumberUtils
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.BeanWrapper
;
import
org.springframework.beans.BeanWrapperImpl
;
import
org.springframework.beans.PropertyValue
;
import
org.springframework.jms.config.DefaultJmsListenerContainerFactory
;
...
...
@@ -23,190 +22,215 @@ import org.springframework.jms.support.converter.SimpleMessageConverter;
import
javax.jms.ConnectionFactory
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Objects
;
public
class
JmsListeningServer
extends
AbsServer
{
static
final
String
DESTINATION_TYPE_QUEUE
=
"queue"
;
static
final
String
DESTINATION_TYPE_TOPIC
=
"topic"
;
static
final
String
DESTINATION_TYPE_DURABLE_TOPIC
=
"durableTopic"
;
static
final
String
DESTINATION_TYPE_SHARED_TOPIC
=
"sharedTopic"
;
static
final
String
DESTINATION_TYPE_SHARED_DURABLE_TOPIC
=
"sharedDurableTopic"
;
final
static
MessageConverter
messageConverter
=
new
SimpleMessageConverter
();
private
final
Map
<
String
,
DefaultMessageListenerContainer
>
containerCache
=
new
HashMap
<
String
,
DefaultMessageListenerContainer
>();
public
JmsListeningServer
(
Context
context
,
IServiceDef
serviceDef
,
ApacheEL
elParser
)
{
super
(
context
,
serviceDef
,
elParser
);
}
@Override
public
void
run
()
{
logger
.
info
(
LOG_FLAG
+
"JmsListeningServer is starting ."
+
LOG_FLAG
);
ConnectionFactory
connectionFactory
;
try
{
String
connectionFactoryClaz
=
getRequiredPropertyValue
(
"connectionFactory"
);
String
[]
destinations
=
getRequiredPropertyValue
(
"destination"
)
.
split
(
","
);
String
[]
interfaceNames
=
getRequiredPropertyValue
(
"interfaceName"
)
.
split
(
","
);
String
[]
transactionNames
=
getRequiredPropertyValue
(
"transactionName"
).
split
(
","
);
Class
<?>
connectionFactoryClazz
=
Class
.
forName
(
connectionFactoryClaz
);
connectionFactory
=
(
ConnectionFactory
)
BeanUtils
.
instantiate
(
connectionFactoryClazz
);
BeanWrapper
bw
=
new
BeanWrapperImpl
(
connectionFactory
);
Map
<
String
,
String
>
arguments
=
((
ServiceDef
)
this
.
serviceDef
)
.
getPropertyArguments
(
"connectionFactory"
);
String
username
=
null
;
String
password
=
null
;
if
(!
arguments
.
isEmpty
())
{
// 连接工厂设置必要属性,动态设置
for
(
Map
.
Entry
<
String
,
String
>
entry
:
arguments
.
entrySet
())
{
String
propertyName
=
entry
.
getKey
();
Object
originalValue
=
entry
.
getValue
();
if
(
"username"
.
equalsIgnoreCase
(
propertyName
)){
username
=(
String
)
originalValue
;
}
else
if
(
"password"
.
equalsIgnoreCase
(
propertyName
)){
password
=(
String
)
originalValue
;
password
=
new
String
(
Base64
.
decodeBase64
(
password
));
}
else
{
Object
convertedValue
=
originalValue
;
convertedValue
=
((
BeanWrapperImpl
)
bw
).
convertForProperty
(
convertedValue
,
propertyName
);
PropertyValue
pv
=
new
PropertyValue
(
propertyName
,
convertedValue
);
bw
.
setPropertyValue
(
pv
);
}
}
}
DefaultJmsListenerContainerFactory
factory
=
new
DefaultJmsListenerContainerFactory
();
if
(
username
!=
null
){
UserCredentialsConnectionFactoryAdapter
ucfa
=
new
UserCredentialsConnectionFactoryAdapter
();
ucfa
.
setTargetConnectionFactory
(
connectionFactory
);
ucfa
.
setUsername
(
username
);
ucfa
.
setPassword
(
password
);
factory
.
setConnectionFactory
(
ucfa
);
}
else
{
factory
.
setConnectionFactory
(
connectionFactory
);
}
factory
.
setMessageConverter
(
messageConverter
);
// 设置本地事务性
factory
.
setSessionTransacted
(
true
);
// 设置监听类型(队列/主题)
initDestinationType
(
factory
);
// 设置jms资源缓存级别
String
cache
;
if
(
StringUtil
.
isEmpty
(
getPropertyValue
(
"cache"
)))
{
cache
=
"CACHE_SESSION"
;
}
else
{
cache
=
"CACHE_"
+
getPropertyValue
(
"cache"
).
toUpperCase
();
}
factory
.
setCacheLevelName
(
cache
);
// 设置消费者线程数
String
concurrency
=
getPropertyValue
(
"concurrency"
);
if
(!
StringUtil
.
isEmpty
(
concurrency
))
{
factory
.
setConcurrency
(
concurrency
);
}
// 设置每个消费者处理多少消息
String
prefetch
=
getPropertyValue
(
"prefetch"
);
factory
.
setMaxMessagesPerTask
(
NumberUtils
.
toInt
(
prefetch
,
5
));
// 设置收取消息超时时间(毫秒)
String
receiveTimeout
=
getPropertyValue
(
"receiveTimeout"
);
if
(!
StringUtil
.
isEmpty
(
receiveTimeout
))
{
factory
.
setReceiveTimeout
(
NumberUtils
.
toLong
(
receiveTimeout
));
}
for
(
int
i
=
0
;
i
<
destinations
.
length
;
i
++)
{
String
destination
=
destinations
[
i
];
if
(!
containerCache
.
containsKey
(
destination
))
{
// 设置监听端点
SimpleJmsListenerEndpoint
endpoint
=
new
SimpleJmsListenerEndpoint
();
MessageListenerAdapter
messageListener
=
new
MessageListenerAdapter
();
messageListener
.
setDefaultListenerMethod
(
"handle"
);
messageListener
.
setDelegate
(
new
MessageHandler
(
interfaceNames
[
i
],
transactionNames
[
i
]));
endpoint
.
setMessageListener
(
messageListener
);
endpoint
.
setDestination
(
destination
);
// 创建jms监听容器
DefaultMessageListenerContainer
container
=
factory
.
createListenerContainer
(
endpoint
);
// 初始化jms监听容器
container
.
initialize
();
// 启动jms监听容器
container
.
start
();
containerCache
.
put
(
destination
,
container
);
}
}
}
catch
(
Throwable
e
)
{
logger
.
error
(
LOG_FLAG
+
"JmsListeningServer start error."
+
LOG_FLAG
,
e
);
close
();
}
logger
.
info
(
LOG_FLAG
+
"JmsListeningServer start is finished."
+
LOG_FLAG
);
}
private
void
initDestinationType
(
DefaultJmsListenerContainerFactory
factory
)
{
String
destinationType
=
getPropertyValue
(
"destinationType"
);
boolean
pubSubDomain
=
false
;
boolean
subscriptionDurable
=
false
;
boolean
subscriptionShared
=
false
;
if
(
DESTINATION_TYPE_SHARED_DURABLE_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
subscriptionDurable
=
true
;
subscriptionShared
=
true
;
}
else
if
(
DESTINATION_TYPE_SHARED_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
subscriptionShared
=
true
;
}
else
if
(
DESTINATION_TYPE_DURABLE_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
subscriptionDurable
=
true
;
}
else
if
(
DESTINATION_TYPE_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
}
else
if
(
destinationType
==
null
||
""
.
equals
(
destinationType
)
||
DESTINATION_TYPE_QUEUE
.
equals
(
destinationType
))
{
// the default: queue
}
else
{
// 给出警告,依旧使用默认队列类型作为监听模式
logger
.
error
(
LOG_FLAG
+
"Invalid listener container 'destination-type': only "
+
"\"queue\", \"topic\", \"durableTopic\", \"sharedTopic\", \"sharedDurableTopic\" supported."
+
LOG_FLAG
);
}
factory
.
setPubSubDomain
(
pubSubDomain
);
factory
.
setSubscriptionDurable
(
subscriptionDurable
);
factory
.
setSubscriptionShared
(
subscriptionShared
);
}
@Override
public
void
close
()
{
for
(
Map
.
Entry
<
String
,
DefaultMessageListenerContainer
>
item:
containerCache
.
entrySet
()){
DefaultMessageListenerContainer
container
=
item
.
getValue
();
if
(
container
!=
null
)
{
container
.
shutdown
();
container
=
null
;
}
}
containerCache
.
clear
();
}
class
MessageHandler
{
private
String
interfaceName
;
private
String
transactionName
;
public
MessageHandler
(
String
interfaceName
,
String
transactionName
)
{
this
.
interfaceName
=
interfaceName
;
this
.
transactionName
=
transactionName
;
}
public
void
handle
(
Object
message
)
{
Client
client
=
new
Client
();
client
.
call
(
interfaceName
,
transactionName
,
new
Object
[]
{
message
});
}
}
static
final
String
DESTINATION_TYPE_QUEUE
=
"queue"
;
static
final
String
DESTINATION_TYPE_TOPIC
=
"topic"
;
static
final
String
DESTINATION_TYPE_DURABLE_TOPIC
=
"durableTopic"
;
static
final
String
DESTINATION_TYPE_SHARED_TOPIC
=
"sharedTopic"
;
static
final
String
DESTINATION_TYPE_SHARED_DURABLE_TOPIC
=
"sharedDurableTopic"
;
final
static
MessageConverter
messageConverter
=
new
SimpleMessageConverter
();
private
final
Map
<
String
,
DefaultMessageListenerContainer
>
containerCache
=
new
HashMap
<
String
,
DefaultMessageListenerContainer
>();
public
JmsListeningServer
(
Context
context
,
IServiceDef
serviceDef
,
ApacheEL
elParser
)
{
super
(
context
,
serviceDef
,
elParser
);
}
@Override
public
void
run
()
{
logger
.
info
(
LOG_FLAG
+
"JmsListeningServer is starting ."
+
LOG_FLAG
);
ConnectionFactory
connectionFactory
;
try
{
String
connectionFactoryClaz
=
getRequiredPropertyValue
(
"connectionFactory"
);
String
[]
destinations
=
getRequiredPropertyValue
(
"destination"
)
.
split
(
","
);
String
[]
interfaceNames
=
getRequiredPropertyValue
(
"interfaceName"
)
.
split
(
","
);
String
[]
transactionNames
=
getRequiredPropertyValue
(
"transactionName"
).
split
(
","
);
Class
<?>
connectionFactoryClazz
=
Class
.
forName
(
connectionFactoryClaz
);
connectionFactory
=
(
ConnectionFactory
)
BeanUtils
.
instantiateClass
(
connectionFactoryClazz
);
BeanWrapperImpl
bw
=
new
BeanWrapperImpl
(
connectionFactory
);
Map
<
String
,
String
>
arguments
=
((
ServiceDef
)
this
.
serviceDef
)
.
getPropertyArguments
(
"connectionFactory"
);
String
username
=
null
;
String
password
=
null
;
if
(!
arguments
.
isEmpty
())
{
// 连接工厂设置必要属性,动态设置
for
(
Map
.
Entry
<
String
,
String
>
entry
:
arguments
.
entrySet
())
{
String
propertyName
=
entry
.
getKey
();
Object
originalValue
=
entry
.
getValue
();
if
(
"username"
.
equalsIgnoreCase
(
propertyName
))
{
username
=
(
String
)
originalValue
;
}
else
if
(
"password"
.
equalsIgnoreCase
(
propertyName
))
{
password
=
(
String
)
originalValue
;
password
=
new
String
(
Base64
.
decodeBase64
(
password
));
}
else
{
Object
convertedValue
=
originalValue
;
if
(
"hostName"
.
equalsIgnoreCase
(
propertyName
))
{
String
ip
=
getServer
(
entry
.
getValue
(),
true
);
convertedValue
=
bw
.
convertForProperty
(
ip
,
propertyName
);
}
else
if
(
"port"
.
equalsIgnoreCase
(
propertyName
))
{
String
port
=
getServer
(
entry
.
getValue
(),
false
);
convertedValue
=
bw
.
convertForProperty
(
port
,
propertyName
);
}
else
{
convertedValue
=
bw
.
convertForProperty
(
convertedValue
,
propertyName
);
}
PropertyValue
pv
=
new
PropertyValue
(
propertyName
,
convertedValue
);
bw
.
setPropertyValue
(
pv
);
}
}
}
DefaultJmsListenerContainerFactory
factory
=
new
DefaultJmsListenerContainerFactory
();
if
(
username
!=
null
)
{
UserCredentialsConnectionFactoryAdapter
ucfa
=
new
UserCredentialsConnectionFactoryAdapter
();
ucfa
.
setTargetConnectionFactory
(
connectionFactory
);
ucfa
.
setUsername
(
username
);
ucfa
.
setPassword
(
password
);
factory
.
setConnectionFactory
(
ucfa
);
}
else
{
factory
.
setConnectionFactory
(
connectionFactory
);
}
factory
.
setMessageConverter
(
messageConverter
);
// 设置本地事务性
factory
.
setSessionTransacted
(
true
);
// 设置监听类型(队列/主题)
initDestinationType
(
factory
);
// 设置jms资源缓存级别
String
cache
;
if
(
StringUtil
.
isEmpty
(
getPropertyValue
(
"cache"
)))
{
cache
=
"CACHE_SESSION"
;
}
else
{
cache
=
"CACHE_"
+
getPropertyValue
(
"cache"
).
toUpperCase
();
}
factory
.
setCacheLevelName
(
cache
);
// 设置消费者线程数
String
concurrency
=
getPropertyValue
(
"concurrency"
);
if
(!
StringUtil
.
isEmpty
(
concurrency
))
{
factory
.
setConcurrency
(
concurrency
);
}
// 设置每个消费者处理多少消息
String
prefetch
=
getPropertyValue
(
"prefetch"
);
factory
.
setMaxMessagesPerTask
(
NumberUtils
.
toInt
(
prefetch
,
5
));
// 设置收取消息超时时间(毫秒)
String
receiveTimeout
=
getPropertyValue
(
"receiveTimeout"
);
if
(!
StringUtil
.
isEmpty
(
receiveTimeout
))
{
factory
.
setReceiveTimeout
(
NumberUtils
.
toLong
(
receiveTimeout
));
}
for
(
int
i
=
0
;
i
<
destinations
.
length
;
i
++)
{
String
destination
=
destinations
[
i
];
if
(!
containerCache
.
containsKey
(
destination
))
{
// 设置监听端点
SimpleJmsListenerEndpoint
endpoint
=
new
SimpleJmsListenerEndpoint
();
MessageListenerAdapter
messageListener
=
new
MessageListenerAdapter
();
messageListener
.
setDefaultListenerMethod
(
"handle"
);
messageListener
.
setDelegate
(
new
MessageHandler
(
interfaceNames
[
i
],
transactionNames
[
i
]));
endpoint
.
setMessageListener
(
messageListener
);
endpoint
.
setDestination
(
destination
);
// 创建jms监听容器
DefaultMessageListenerContainer
container
=
factory
.
createListenerContainer
(
endpoint
);
// 初始化jms监听容器
container
.
initialize
();
// 启动jms监听容器
container
.
start
();
containerCache
.
put
(
destination
,
container
);
}
}
}
catch
(
Throwable
e
)
{
logger
.
error
(
LOG_FLAG
+
"JmsListeningServer start error."
+
LOG_FLAG
,
e
);
close
();
}
logger
.
info
(
LOG_FLAG
+
"JmsListeningServer start is finished."
+
LOG_FLAG
);
}
private
String
getServer
(
String
hostName
,
boolean
isIp
)
{
if
(
hostName
.
indexOf
(
":"
)
==
-
1
)
{
return
hostName
;
}
Objects
.
requireNonNull
(
hostName
);
int
indexOf
=
hostName
.
indexOf
(
"-"
);
int
index
=
0
;
String
servers
=
hostName
;
if
(
indexOf
!=
-
1
)
{
index
=
Integer
.
parseInt
(
hostName
.
substring
(
indexOf
+
1
));
servers
=
hostName
.
substring
(
0
,
indexOf
);
}
String
[]
strings
=
servers
.
split
(
","
);
return
isIp
?
strings
[
index
].
substring
(
0
,
strings
[
index
].
indexOf
(
":"
))
:
strings
[
index
].
substring
(
strings
[
index
].
indexOf
(
":"
)
+
1
);
}
private
void
initDestinationType
(
DefaultJmsListenerContainerFactory
factory
)
{
String
destinationType
=
getPropertyValue
(
"destinationType"
);
boolean
pubSubDomain
=
false
;
boolean
subscriptionDurable
=
false
;
boolean
subscriptionShared
=
false
;
if
(
DESTINATION_TYPE_SHARED_DURABLE_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
subscriptionDurable
=
true
;
subscriptionShared
=
true
;
}
else
if
(
DESTINATION_TYPE_SHARED_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
subscriptionShared
=
true
;
}
else
if
(
DESTINATION_TYPE_DURABLE_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
subscriptionDurable
=
true
;
}
else
if
(
DESTINATION_TYPE_TOPIC
.
equals
(
destinationType
))
{
pubSubDomain
=
true
;
}
else
if
(
destinationType
==
null
||
""
.
equals
(
destinationType
)
||
DESTINATION_TYPE_QUEUE
.
equals
(
destinationType
))
{
// the default: queue
}
else
{
// 给出警告,依旧使用默认队列类型作为监听模式
logger
.
error
(
LOG_FLAG
+
"Invalid listener container 'destination-type': only "
+
"\"queue\", \"topic\", \"durableTopic\", \"sharedTopic\", \"sharedDurableTopic\" supported."
+
LOG_FLAG
);
}
factory
.
setPubSubDomain
(
pubSubDomain
);
factory
.
setSubscriptionDurable
(
subscriptionDurable
);
factory
.
setSubscriptionShared
(
subscriptionShared
);
}
@Override
public
void
close
()
{
for
(
Map
.
Entry
<
String
,
DefaultMessageListenerContainer
>
item
:
containerCache
.
entrySet
())
{
DefaultMessageListenerContainer
container
=
item
.
getValue
();
if
(
container
!=
null
)
{
container
.
shutdown
();
container
=
null
;
}
}
containerCache
.
clear
();
}
class
MessageHandler
{
private
String
interfaceName
;
private
String
transactionName
;
public
MessageHandler
(
String
interfaceName
,
String
transactionName
)
{
this
.
interfaceName
=
interfaceName
;
this
.
transactionName
=
transactionName
;
}
public
void
handle
(
Object
message
)
{
Client
client
=
new
Client
();
client
.
call
(
interfaceName
,
transactionName
,
new
Object
[]{
message
});
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment