摘要
从可配置项、获取连接、Failover三个方面,简单介绍了四种JmsConnectionFactory。并对其中部分类做了性能测试。
ActiveMQConnectionFactory
这个类有两个同名类:org.apache.activemq.spring.ActiveMQConnectionFactory
和org.apache.activemq.ActiveMQConnectionFactory
。
前者是spring配置文件中,注解默认使用的类。实际上,它直接继承自后者,在后者的基础上只针对Spring注入时的beanName做了一些扩展,功能完全没变。所以下面讨论的各种功能、特性同时适用于这两个类。
可配置项
作为一个基础性的工厂类,它的可配置项非常多。参见以下代码。这还不包括它从父类中继承的可配置项。各配置项的具体含义各位可以自己去找,不一一说明。
虽然可配置项很多,但是THREAD系统中实际配置的只有两个:brokerURL和redeliveryPolicyMap。
获取链接
使用这个工厂来获取连接,每次都会调用到这个方法上:
protected ActiveMQConnection createActiveMQConnection(
Transport transport, JMSStatsImpl stats) throws Exception {
ActiveMQConnection connection
= newActiveMQConnection(transport,
getClientIdGenerator(),
getConnectionIdGenerator(),
tats);
returnconnection;
}
很明显的,它每次都会去创建一个新的连接。这样做的效率明显是非常低下的,在后续会有性能对比。
Failover策略
由于ActiveMQConnectionFactory每次都会创建一个新的连接,因此当MQ集群全部可用时,它会随机地连接到集群中不同的服务上去。在一定程度上来说,这也是一种“负载均衡”策略了。
当MQ集群中有部分服务不可用时,已连接到这些服务上的JMS连接会抛出异常,然后自动重连到另一台服务上。只要集群中还有服务可用,那么“抛异常、重新连接”的操作对应用来说是透明的:消息仍然能正常的发送/消费。全宕机了另当别论。相关日志是这样的:
2017-07-11 16:59:39.778 WARN 7196 — [0.1:61616@50246] o.a.a.t.failover.FailoverTransport : Transport (tcp://127.0.0.1:61616) failed , attempting to automatically reconnect: {}java.net.SocketException: Connection resetat java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_121]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_121]
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619) ~[activemq-client-5.14.5.jar:5.14.5]
at java.io.DataInputStream.readInt(DataInputStream.java:387) ~[na:1.8.0_121]
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.14.5.jar:5.14.5]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
2017-07-11 16:59:39.791 INFO 7196 — [ActiveMQ Task-2] o.a.a.t.failover.FailoverTransport : Successfully reconnected to tcp://10.255.33.108:61616
如果宕机的服务又恢复了,那么ActiveMQConnectionFactory在创建新连接时,会尝试去连接这些服务。
PooledConnectionFactory
顾名思义,PooledConnectionFactory是一个“池化”连接工厂。它会把一些JMS资源(Connection/Session/Producer)放到资源池中进行复用。
不过,Connection是以用户名+密码为key值放入Pool中的。对于同一个用户(null也是一样),实际只有一个Connection。
另外,虽然他会把Producer放到资源池中,但并不会把Consumer也放进来。
与ActiveMQConnectionFactory类似,PooledConnectionFactory同样有两个同名类:org.apache.activemq.pool.PooledConnectionFactory
和org.apache.activemq.jms.pool.PooledConnectionFactory
。并且,前者也用于一个Spring配置注解:,前者也继承自后者……
可配置项
PooledConnectionFactory需要两项配置:实际生成链接的工厂,以及链接池。
实际生成的工厂可以通过构造方法传入(指定一个ActiveMQConnectionFactory、或者指定一个 brokerURL)、也可以用setter配置。
连接池的配置较少,主要是下面这些:
需要注意的是,有些set方法会对应到一个成员变量(如setIdleTimeout方法会给this.idleTimeout赋值);有些则直接操作连接池(如setTimeBetweenExpirationCheckMillis方法,直接将timeBetweenExpirationCheckMillis写入连接池中)。
另外,有些配置项与其它资源池的配置含义不同(如setMaxConnections,参见蛋疼的PooledConnectionFactory(activemq-pool)中的介绍)。
获取连接
PooledConnectionFactory创建链接的方法如下:
@Override
public synchronized Connection createConnection(String userName, String password) throws JMSException {
if (stopped.get()) {
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
return null;
}
ConnectionPool connection = null;
ConnectionKey key = new ConnectionKey(userName, password);
// This will either return an existing non-expired ConnectionPool or it
// will create a new one to meet the demand.
if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
try {
connectionsPool.addObject(key);
connection = mostRecentlyCreated.getAndSet(null);
connection.incrementReferenceCount();
} catch (Exception e) {
throw createJmsException("Error while attempting to add new Connection to the pool", e);
}
} else {
try {
// We can race against other threads returning the connection when there is an
// expiration or idle timeout. We keep pulling out ConnectionPool instances until
// we win and get a non-closed instance and then increment the reference count
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
}
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
}
}
} catch (Exception e) {
throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
}
try {
connectionsPool.returnObject(key, connection);
} catch (Exception e) {
throw createJmsException("Error when returning connection to the pool", e);
}
}
return newPooledConnection(connection);
}
代码有点绕(绕的一个原因是所谓的ConnectionPool其实叫PooledConnection或者SessionPool更合适,因为其中实际是一个Connection和若干个Session;另一个原因是实际的connection并没有存放在成员变量connectionsPool中,而是放在了mostRecentlyCreated里)。不过代码中的注释说明已经说明了重点:
要么获取一个未过期的链接;要么创建一个新连接
// This will either return an existing non-expired ConnectionPool or it
// will create a new one to meet the demand.
另外值得一说的是,虽然PooledConnectionFactory会用资源池来暂存Connection、Session,以及Producer(根据javadoc,但没找到缓存producter的代码位置),但它并不缓存Consumers。
Failover策略
使用这个链接工厂,在初始化时它只连上MQ集群中的一台服务。也就是说,由这个链接工厂获取的所有Connection/Session/Producer/Consumer,都会连到这一台服务上。这有可能造成MQ集群压力过于集中以及资源浪费。
如果已经连上的服务宕机了,会重连到另一台服务上,同样的,收、发消息的应用是不受影响的。这应该是因为它底层仍然是ActiveMQConnectionFactory的缘故。
但是宕机的服务重启后,不会主动尝试重连,除非已连上的服务又关闭了。
SingleConnectionFactory
同样顾名思义,SingleConnectionFactory只提供一个Connection。当然,由于一个Connection可以打开多个Session/Productor/Consumer,后面这些资源并不保证唯一。
与以上两个连接工厂不同,SingleConnectionFactory不是apache.activemq提供的类,而是spring提供的。这使得它并不局限于ActiveMQ,而能用于任何遵循java.jms相关接口、规范的消息服务。
可配置项
可能跟功能少有关,SingleConnectionFactory的可配置项也很少,就这么几个:
获取连接
虽然singleconnectionfactory类中有一个单例的Connection成员变量,但它并不会直接把这个变量提供给调用方,而是通过Java动态代理,返回一个SharedConnectionInvocationHandler:
protected Connection getSharedConnectionProxy(Connection target) {
List<Class<?>> classes = new ArrayList<Class<?>>(3);
classes.add(Connection.class);
if (target instanceof QueueConnection) {
classes.add(QueueConnection.class);
}
if (target instanceof TopicConnection) {
classes.add(TopicConnection.class);
}
return (Connection) Proxy.newProxyInstance(
Connection.class.getClassLoader(),
classes.toArray(new Class<?>[classes.size()]),
new SharedConnectionInvocationHandler());
}
private class SharedConnectionInvocationHandler implements InvocationHandler {
private ExceptionListener localExceptionListener;
private boolean locallyStarted = false;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("equals")) {
Object other = args[0];
if (proxy == other) {
return true;
}
if (other == null || !Proxy.isProxyClass(other.getClass())) {
return false;
}
InvocationHandler otherHandler = Proxy.getInvocationHandler(other);
return (otherHandler instanceof SharedConnectionInvocationHandler &&
factory() == ((SharedConnectionInvocationHandler) otherHandler).factory());
}
else if (method.getName().equals("hashCode")) {
// Use hashCode of containing SingleConnectionFactory.
return System.identityHashCode(factory());
}
else if (method.getName().equals("toString")) {
return "Shared JMS Connection: " + getConnection();
}
else if (method.getName().equals("setClientID")) {
// Handle setClientID method: throw exception if not compatible.
String currentClientId = getConnection().getClientID();
if (currentClientId != null && currentClientId.equals(args[0])) {
return null;
}
else {
throw new javax.jms.IllegalStateException(
"setClientID call not supported on proxy for shared Connection. " +
"Set the 'clientId' property on the SingleConnectionFactory instead.");
}
}
else if (method.getName().equals("setExceptionListener")) {
// Handle setExceptionListener method: add to the chain.
synchronized (connectionMonitor) {
if (aggregatedExceptionListener != null) {
ExceptionListener listener = (ExceptionListener) args[0];
if (listener != this.localExceptionListener) {
if (this.localExceptionListener != null) {
aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
}
if (listener != null) {
aggregatedExceptionListener.delegates.add(listener);
}
this.localExceptionListener = listener;
}
return null;
}
else {
throw new javax.jms.IllegalStateException(
"setExceptionListener call not supported on proxy for shared Connection. " +
"Set the 'exceptionListener' property on the SingleConnectionFactory instead. " +
"Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " +
"which will allow for registering further ExceptionListeners to the recovery chain.");
}
}
}
else if (method.getName().equals("getExceptionListener")) {
synchronized (connectionMonitor) {
if (this.localExceptionListener != null) {
return this.localExceptionListener;
}
else {
return getExceptionListener();
}
}
}
else if (method.getName().equals("start")) {
localStart();
return null;
}
else if (method.getName().equals("stop")) {
localStop();
return null;
}
else if (method.getName().equals("close")) {
localStop();
synchronized (connectionMonitor) {
if (this.localExceptionListener != null) {
if (aggregatedExceptionListener != null) {
aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
}
this.localExceptionListener = null;
}
}
return null;
}
else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
method.getName().equals("createTopicSession")) {
// Default: JMS 2.0 createSession() method
Integer mode = Session.AUTO_ACKNOWLEDGE;
if (args != null) {
if (args.length == 1) {
// JMS 2.0 createSession(int) method
mode = (Integer) args[0];
}
else if (args.length == 2) {
// JMS 1.1 createSession(boolean, int) method
boolean transacted = (Boolean) args[0];
Integer ackMode = (Integer) args[1];
mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
}
}
Session session = getSession(getConnection(), mode);
if (session != null) {
if (!method.getReturnType().isInstance(session)) {
String msg = "JMS Session does not implement specific domain: " + session;
try {
session.close();
}
catch (Throwable ex) {
logger.trace("Failed to close newly obtained JMS Session", ex);
}
throw new javax.jms.IllegalStateException(msg);
}
return session;
}
}
try {
return method.invoke(getConnection(), args);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
}
private void localStart() throws JMSException {
synchronized (connectionMonitor) {
if (!this.locallyStarted) {
this.locallyStarted = true;
if (startedCount == 0 && connection != null) {
connection.start();
}
startedCount++;
}
}
}
private void localStop() throws JMSException {
synchronized (connectionMonitor) {
if (this.locallyStarted) {
this.locallyStarted = false;
if (startedCount == 1 && connection != null) {
connection.stop();
}
if (startedCount > 0) {
startedCount--;
}
}
}
}
private SingleConnectionFactory factory() {
return SingleConnectionFactory.this;
}
}
SharedConnectionInvocationHandler通过反射的方式,实现了Connection接口下的一些重要方法。并且,它屏蔽掉了Connection.close()方法,确保这个链接不会关闭。
Failover策略
实话说我没测试SingleConnectionFactory的Failover策略……因为这个工厂一般只适用于测试、或者某些“独立”系统,一般不会在线上使用。
#CachingConnectionFactory 与PooledConnectionFactory相似,CachingConnectionFactory也会缓存和复用一些资源;不同的是,CachingConnectionFactory除了Session/Producer之外,还会缓存Consumer。(但并不缓存Connection,见下)
另外,CachingConnectionFactory和SingleConnectionFactory一样,也是由Spring提供的。实际上,CachingConnectionFactory就直接继承自后者。也正因为继承自后者,CachingConnectionFactory中只有一个Connection。
可配置项
由于继承自SingleConnectionFactory,因此CachingConnectionFactory也有父类的可配置项。除了那些之外,它还有几个自己的配置项:
这些配置中,值得注意的是sessionCacheSize:它的默认值是1。换句话说,默认情况下CachingConnectionFactory只会缓存一个Session。为了获取较好的并发效率,建议设置一个合适的值。
需要的session数量超出sessionCacheSize值时,CachingConnectionFactory会创建新的Session,并用动态代理包装后返回回去。
获取连接
CachingConnectionFactory没有重写SingleConnectionFactory中获取连接的方法,因此它也是通过动态代理生成了一个SharedConnectionInvocationHandler。 CachingConnectionFactory重写的是getSession(Connection,Integer)方法。在这个方法中,CachingConnectionFactory会管理Session/Producer/Consumer的相关缓存,并把实际Session包装为一个动态代理。代码有点多,这里就不贴出来了。
Failover策略
CachingConnectionFactory的Failover策略与PooledConnectionFactory基本上是一样的。
对比
第一次测试是我用本地的junit+本地ActiveMQ,所以性能数值的绝对值就不要太在意了,关注下对比吧。
测试了发送10000条数据的用时,单位毫秒。
工厂 | 第一次 | 第二次 | 第三次 | 平均值 |
---|---|---|---|---|
ActiveMQConnectionFactory | 104080 | - | - | 104080 |
PooledConnectionFactory | 20216 | 19770 | 17605 | 19197 |
CachingConnectionFactory(缓存容量1) | 17395 | 19310 | 20212 | 18972 |
第二次是用我本地tomcat+测试ActiveMQ,测试了一个线上的批量试算接口。测试结果是这样的:
工厂 | 用时 |
---|---|
ActiveMQConnectionFactory | 327114 |
CachingConnectionFactory(缓存容量100) | 57665 |