摘要

从可配置项、获取连接、Failover三个方面,简单介绍了四种JmsConnectionFactory。并对其中部分类做了性能测试。

ActiveMQConnectionFactory

这个类有两个同名类:org.apache.activemq.spring.ActiveMQConnectionFactoryorg.apache.activemq.ActiveMQConnectionFactory

前者是spring配置文件中,注解默认使用的类。实际上,它直接继承自后者,在后者的基础上只针对Spring注入时的beanName做了一些扩展,功能完全没变。所以下面讨论的各种功能、特性同时适用于这两个类。

可配置项

作为一个基础性的工厂类,它的可配置项非常多。参见以下代码。这还不包括它从父类中继承的可配置项。各配置项的具体含义各位可以自己去找,不一一说明。

配置项

虽然可配置项很多,但是THREAD系统中实际配置的只有两个:brokerURL和redeliveryPolicyMap。

获取链接

使用这个工厂来获取连接,每次都会调用到这个方法上:

protected ActiveMQConnection createActiveMQConnection(
    Transport transport, JMSStatsImpl stats) throws Exception {
    ActiveMQConnection connection 
        = newActiveMQConnection(transport,
                getClientIdGenerator(),
                getConnectionIdGenerator(), 
                tats);
    returnconnection;
}

很明显的,它每次都会去创建一个新的连接。这样做的效率明显是非常低下的,在后续会有性能对比。

Failover策略

由于ActiveMQConnectionFactory每次都会创建一个新的连接,因此当MQ集群全部可用时,它会随机地连接到集群中不同的服务上去。在一定程度上来说,这也是一种“负载均衡”策略了。

当MQ集群中有部分服务不可用时,已连接到这些服务上的JMS连接会抛出异常,然后自动重连到另一台服务上。只要集群中还有服务可用,那么“抛异常、重新连接”的操作对应用来说是透明的:消息仍然能正常的发送/消费。全宕机了另当别论。相关日志是这样的:

2017-07-11 16:59:39.778 WARN 7196 — [0.1:61616@50246] o.a.a.t.failover.FailoverTransport : Transport (tcp://127.0.0.1:61616) failed , attempting to automatically reconnect: {}java.net.SocketException: Connection resetat java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_121]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_121]
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619) ~[activemq-client-5.14.5.jar:5.14.5]
at java.io.DataInputStream.readInt(DataInputStream.java:387) ~[na:1.8.0_121]
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.14.5.jar:5.14.5]
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.14.5.jar:5.14.5]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
2017-07-11 16:59:39.791 INFO 7196 — [ActiveMQ Task-2] o.a.a.t.failover.FailoverTransport : Successfully reconnected to tcp://10.255.33.108:61616

如果宕机的服务又恢复了,那么ActiveMQConnectionFactory在创建新连接时,会尝试去连接这些服务。

PooledConnectionFactory

顾名思义,PooledConnectionFactory是一个“池化”连接工厂。它会把一些JMS资源(Connection/Session/Producer)放到资源池中进行复用。

不过,Connection是以用户名+密码为key值放入Pool中的。对于同一个用户(null也是一样),实际只有一个Connection。

另外,虽然他会把Producer放到资源池中,但并不会把Consumer也放进来。

与ActiveMQConnectionFactory类似,PooledConnectionFactory同样有两个同名类:org.apache.activemq.pool.PooledConnectionFactoryorg.apache.activemq.jms.pool.PooledConnectionFactory。并且,前者也用于一个Spring配置注解:,前者也继承自后者……

可配置项

PooledConnectionFactory需要两项配置:实际生成链接的工厂,以及链接池。

实际生成的工厂可以通过构造方法传入(指定一个ActiveMQConnectionFactory、或者指定一个 brokerURL)、也可以用setter配置。

连接池的配置较少,主要是下面这些:

需要注意的是,有些set方法会对应到一个成员变量(如setIdleTimeout方法会给this.idleTimeout赋值);有些则直接操作连接池(如setTimeBetweenExpirationCheckMillis方法,直接将timeBetweenExpirationCheckMillis写入连接池中)。

另外,有些配置项与其它资源池的配置含义不同(如setMaxConnections,参见蛋疼的PooledConnectionFactory(activemq-pool)中的介绍)。

获取连接

PooledConnectionFactory创建链接的方法如下:

@Override
public synchronized Connection createConnection(String userName, String password) throws JMSException {
    if (stopped.get()) {
        LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");        
        return null;    
    }    
    ConnectionPool connection = null;    
    ConnectionKey key = new ConnectionKey(userName, password);    
    // This will either return an existing non-expired ConnectionPool or it    
    // will create a new one to meet the demand.    
    if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {        
        try {            
            connectionsPool.addObject(key);            
            connection = mostRecentlyCreated.getAndSet(null);            
            connection.incrementReferenceCount();        
        } catch (Exception e) {            
            throw createJmsException("Error while attempting to add new Connection to the pool", e);        
        }    
    } else {        
        try {            
            // We can race against other threads returning the connection when there is an            
            // expiration or idle timeout.  We keep pulling out ConnectionPool instances until            
            // we win and get a non-closed instance and then increment the reference count            
            // under lock to prevent another thread from triggering an expiration check and            
            // pulling the rug out from under us.            
            while (connection == null) {                
                connection = connectionsPool.borrowObject(key);                
                synchronized (connection) {                    
                    if (connection.getConnection() != null) {                        
                        connection.incrementReferenceCount();                        
                            break;                    
                    }                    
                    // Return the bad one to the pool and let if get destroyed as normal.                    
                    connectionsPool.returnObject(key, connection);                    
                    connection = null;                
                }            
            }        
        } catch (Exception e) {            
            throw createJmsException("Error while attempting to retrieve a connection from the pool", e);        
        }        
        try {            
            connectionsPool.returnObject(key, connection);        
        } catch (Exception e) {            
            throw createJmsException("Error when returning connection to the pool", e);        
        }    
    }    
    return newPooledConnection(connection);
}

代码有点绕(绕的一个原因是所谓的ConnectionPool其实叫PooledConnection或者SessionPool更合适,因为其中实际是一个Connection和若干个Session;另一个原因是实际的connection并没有存放在成员变量connectionsPool中,而是放在了mostRecentlyCreated里)。不过代码中的注释说明已经说明了重点:

要么获取一个未过期的链接;要么创建一个新连接
// This will either return an existing non-expired ConnectionPool or it
// will create a new one to meet the demand.

另外值得一说的是,虽然PooledConnectionFactory会用资源池来暂存Connection、Session,以及Producer(根据javadoc,但没找到缓存producter的代码位置),但它并不缓存Consumers。

Failover策略

使用这个链接工厂,在初始化时它只连上MQ集群中的一台服务。也就是说,由这个链接工厂获取的所有Connection/Session/Producer/Consumer,都会连到这一台服务上。这有可能造成MQ集群压力过于集中以及资源浪费。

如果已经连上的服务宕机了,会重连到另一台服务上,同样的,收、发消息的应用是不受影响的。这应该是因为它底层仍然是ActiveMQConnectionFactory的缘故。

但是宕机的服务重启后,不会主动尝试重连,除非已连上的服务又关闭了。

SingleConnectionFactory

同样顾名思义,SingleConnectionFactory只提供一个Connection。当然,由于一个Connection可以打开多个Session/Productor/Consumer,后面这些资源并不保证唯一。

与以上两个连接工厂不同,SingleConnectionFactory不是apache.activemq提供的类,而是spring提供的。这使得它并不局限于ActiveMQ,而能用于任何遵循java.jms相关接口、规范的消息服务。

可配置项

可能跟功能少有关,SingleConnectionFactory的可配置项也很少,就这么几个:

获取连接

虽然singleconnectionfactory类中有一个单例的Connection成员变量,但它并不会直接把这个变量提供给调用方,而是通过Java动态代理,返回一个SharedConnectionInvocationHandler:

protected Connection getSharedConnectionProxy(Connection target) {
    List<Class<?>> classes = new ArrayList<Class<?>>(3);
    classes.add(Connection.class);
    if (target instanceof QueueConnection) {
        classes.add(QueueConnection.class);
    }
    if (target instanceof TopicConnection) {
        classes.add(TopicConnection.class);
    }
    return (Connection) Proxy.newProxyInstance(
            Connection.class.getClassLoader(),
            classes.toArray(new Class<?>[classes.size()]),
            new SharedConnectionInvocationHandler());
}
 
private class SharedConnectionInvocationHandler implements InvocationHandler {
    private ExceptionListener localExceptionListener;
    private boolean locallyStarted = false;
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getName().equals("equals")) {
            Object other = args[0];
            if (proxy == other) {
                return true;
            }
            if (other == null || !Proxy.isProxyClass(other.getClass())) {
                return false;
            }
            InvocationHandler otherHandler = Proxy.getInvocationHandler(other);
            return (otherHandler instanceof SharedConnectionInvocationHandler &&
                    factory() == ((SharedConnectionInvocationHandler) otherHandler).factory());
        }
        else if (method.getName().equals("hashCode")) {
            // Use hashCode of containing SingleConnectionFactory.
            return System.identityHashCode(factory());
        }
        else if (method.getName().equals("toString")) {
            return "Shared JMS Connection: " + getConnection();
        }
        else if (method.getName().equals("setClientID")) {
            // Handle setClientID method: throw exception if not compatible.
            String currentClientId = getConnection().getClientID();
            if (currentClientId != null && currentClientId.equals(args[0])) {
                return null;
            }
            else {
                throw new javax.jms.IllegalStateException(
                        "setClientID call not supported on proxy for shared Connection. " +
                        "Set the 'clientId' property on the SingleConnectionFactory instead.");
            }
        }
        else if (method.getName().equals("setExceptionListener")) {
            // Handle setExceptionListener method: add to the chain.
            synchronized (connectionMonitor) {
                if (aggregatedExceptionListener != null) {
                    ExceptionListener listener = (ExceptionListener) args[0];
                    if (listener != this.localExceptionListener) {
                        if (this.localExceptionListener != null) {
                            aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
                        }
                        if (listener != null) {
                            aggregatedExceptionListener.delegates.add(listener);
                        }
                        this.localExceptionListener = listener;
                    }
                    return null;
                }
                else {
                    throw new javax.jms.IllegalStateException(
                            "setExceptionListener call not supported on proxy for shared Connection. " +
                            "Set the 'exceptionListener' property on the SingleConnectionFactory instead. " +
                            "Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " +
                            "which will allow for registering further ExceptionListeners to the recovery chain.");
                }
            }
        }
        else if (method.getName().equals("getExceptionListener")) {
            synchronized (connectionMonitor) {
                if (this.localExceptionListener != null) {
                    return this.localExceptionListener;
                }
                else {
                    return getExceptionListener();
                }
            }
        }
        else if (method.getName().equals("start")) {
            localStart();
            return null;
        }
        else if (method.getName().equals("stop")) {
            localStop();
            return null;
        }
        else if (method.getName().equals("close")) {
            localStop();
            synchronized (connectionMonitor) {
                if (this.localExceptionListener != null) {
                    if (aggregatedExceptionListener != null) {
                        aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
                    }
                    this.localExceptionListener = null;
                }
            }
            return null;
        }
        else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
                method.getName().equals("createTopicSession")) {
            // Default: JMS 2.0 createSession() method
            Integer mode = Session.AUTO_ACKNOWLEDGE;
            if (args != null) {
                if (args.length == 1) {
                    // JMS 2.0 createSession(int) method
                    mode = (Integer) args[0];
                }
                else if (args.length == 2) {
                    // JMS 1.1 createSession(boolean, int) method
                    boolean transacted = (Boolean) args[0];
                    Integer ackMode = (Integer) args[1];
                    mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
                }
            }
            Session session = getSession(getConnection(), mode);
            if (session != null) {
                if (!method.getReturnType().isInstance(session)) {
                    String msg = "JMS Session does not implement specific domain: " + session;
                    try {
                        session.close();
                    }
                    catch (Throwable ex) {
                        logger.trace("Failed to close newly obtained JMS Session", ex);
                    }
                    throw new javax.jms.IllegalStateException(msg);
                }
                return session;
            }
        }
        try {
            return method.invoke(getConnection(), args);
        }
        catch (InvocationTargetException ex) {
            throw ex.getTargetException();
        }
    }
    private void localStart() throws JMSException {
        synchronized (connectionMonitor) {
            if (!this.locallyStarted) {
                this.locallyStarted = true;
                if (startedCount == 0 && connection != null) {
                    connection.start();
                }
                startedCount++;
            }
        }
    }
    private void localStop() throws JMSException {
        synchronized (connectionMonitor) {
            if (this.locallyStarted) {
                this.locallyStarted = false;
                if (startedCount == 1 && connection != null) {
                    connection.stop();
                }
                if (startedCount > 0) {
                    startedCount--;
                }
            }
        }
    }
    private SingleConnectionFactory factory() {
        return SingleConnectionFactory.this;
    }
}

SharedConnectionInvocationHandler通过反射的方式,实现了Connection接口下的一些重要方法。并且,它屏蔽掉了Connection.close()方法,确保这个链接不会关闭。

Failover策略

实话说我没测试SingleConnectionFactory的Failover策略……因为这个工厂一般只适用于测试、或者某些“独立”系统,一般不会在线上使用。

#CachingConnectionFactory 与PooledConnectionFactory相似,CachingConnectionFactory也会缓存和复用一些资源;不同的是,CachingConnectionFactory除了Session/Producer之外,还会缓存Consumer。(但并不缓存Connection,见下)

另外,CachingConnectionFactory和SingleConnectionFactory一样,也是由Spring提供的。实际上,CachingConnectionFactory就直接继承自后者。也正因为继承自后者,CachingConnectionFactory中只有一个Connection。

可配置项

由于继承自SingleConnectionFactory,因此CachingConnectionFactory也有父类的可配置项。除了那些之外,它还有几个自己的配置项:

这些配置中,值得注意的是sessionCacheSize:它的默认值是1。换句话说,默认情况下CachingConnectionFactory只会缓存一个Session。为了获取较好的并发效率,建议设置一个合适的值。

需要的session数量超出sessionCacheSize值时,CachingConnectionFactory会创建新的Session,并用动态代理包装后返回回去。

获取连接

CachingConnectionFactory没有重写SingleConnectionFactory中获取连接的方法,因此它也是通过动态代理生成了一个SharedConnectionInvocationHandler。 CachingConnectionFactory重写的是getSession(Connection,Integer)方法。在这个方法中,CachingConnectionFactory会管理Session/Producer/Consumer的相关缓存,并把实际Session包装为一个动态代理。代码有点多,这里就不贴出来了。

Failover策略

CachingConnectionFactory的Failover策略与PooledConnectionFactory基本上是一样的。

对比

第一次测试是我用本地的junit+本地ActiveMQ,所以性能数值的绝对值就不要太在意了,关注下对比吧。

测试了发送10000条数据的用时,单位毫秒。

工厂第一次第二次第三次平均值
ActiveMQConnectionFactory104080--104080
PooledConnectionFactory20216197701760519197
CachingConnectionFactory(缓存容量1)17395193102021218972

第二次是用我本地tomcat+测试ActiveMQ,测试了一个线上的批量试算接口。测试结果是这样的:

工厂用时
ActiveMQConnectionFactory327114
CachingConnectionFactory(缓存容量100)57665