摘要
公司运维同事针对ActiveMQ提出了两个问题,其中一个是“队列长时间无人监听时,自动删除该队列”。 调研提出了三种方案。这里是相关记录和说明。
问题
运维同事对生产环境使用的ActiveMQ做了相关监控。这个监控在某个队列出现消息积压时(实际规则更复杂一些,并且正在调整)发送短信报警。运维接到短信后会通知开发负责人。开发负责人再检查系统是否在正常监听相关队列。
但是,从过往经验来看,只有一次消息积压是业务系统故障导致的;其它情况(没有统计到具体数据,大约五六次)都是业务系统已经不再监听该队列导致的。这使得我们的运维、开发同事半夜三更火急火燎检查问题,结果发现只需要删除那个队列就可以了。
尤其惹发起床气的是,由于线上ActiveMQ配置了消息持久化,这种消息积压其实并不会对ActiveMQ产生多大的影响,完全可以在第二天上班后再处理。
考虑到大家的睡眠质量和夫妻感情,在JIRA中,我们调研、讨论了三个方案。
方案一:ActiveMQ自带配置
在ActiveMQ官方提供的功能列表中,有这样一项功能:Delete Inactive Destination。它可以删除“没有未处理消息、并且没有消费者的Destination”。
配置示例
这个配置比较简单,在ActiveMQ的配置文件activemq.xml中,做如下改动即可。这里示例的是对queue的配置;topic配置是类似的。
<!-- 在这里加上schedulePeriodForDestinationPurge属性。 -->
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000"
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 在这里加上gcInactiveDestinations和inactiveTimoutBeforeGC两个属性 -->
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
上述示例配置的含义是:这个Broker会每隔10000ms(由schedulePeriodForDestinationPurge配置指定)扫描一次标记有“gcInactiveDestinations=“true””的Queue(由于这里配置的是queue=“>”,因而实际是扫描所有Queue),将其中“没有未处理消息、并且没有消费者、并且此状态已超过30000ms(由inactiveTimoutBeforeGC配置指定)”的队列删除掉。有点晕。各配置项的具体说明如下。
配置项说明
以下三个配置项中,schedulePeriodForDestinationPurge和gcInactiveDestinations是必填配置;inactiveTimoutBeforeGC是选填配置。
schedulePeriodForDestinationPurge
这是针对Broker的配置,用于声明“扫描闲置队列的周期”,单位为毫秒。默认值为0,意为“不扫描”。
需要说明的是,这里只能配置扫描任务的启动周期、不能配置启动延迟。也就是说,配置好了之后,ActiveMQ服务启动时会立即扫描一次;然后再按照指定时间周期性扫描。
gcInactiveDestinations
这是针对Destination的配置,用于声明当Broker扫描闲置队列时,是否扫描这个Destination(由queue="xxxx"来指定)。默认值是false。
inactiveTimoutBeforeGC
这也是针对Destination的配置,用于声明这个Destination闲置多长时间后可以被删除。单位毫秒,默认时间60s。
这个配置必须在gcInactiveDestinations被设置为true的情况下才会生效。
方案分析
虽然上面介绍了这么多,但实际上,从第一句话中就可以看出这个方案无法解决我们的问题。因为我们的问题是要处理“有消息积压、但没有消费者的Destination”,而这个方案只能删除“没有未处理消息、并且没有消费者的Destination”。
除此之外,这应该算是最简单可靠的一种方案了。实际上,对大多数原生Queue来说,业务系统会同时下线其生产者与消费者。这个方案可以很好的应对这种情况。
方案二:自定义ActiveMQ插件
ActiveMQ插件(plugin),也有文档中称为拦截器(Interceptor)。二者其实是相辅相成的:配置时,我们需要一个插件;执行时,我们需要一个拦截器。
ActiveMQ官方提供了几个插件(日志、统计、时间戳等),可以参见官方说明和开发文档。我们可以参考官方示例来自定义一个插件。
配置
ActiveMQ通过解析activemq.xml中的配置,来加载一个插件的。因此我们从配置入手,逐步搞清楚插件和拦截器是如何工作的。
activemq.xml中的配置其实很简单,如下所示:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" advisorySupport="false">
<!-- 用plugins标签声明这是一个插件 -->
<plugins>
<!-- bea的语法来自spring,xml name space的声明已经说明了这一点 -->
<bean xmlns="http://www.springframework.org/schema/beans" id="linjunPlugin" class="net.loyintean.blog.jms.manage.PlugIn"/>
</plugins>
<!-- 其它配置 -->
</broker>
上述配置声明了一个插件,插件类名是net.loyintean.blog.jms.manage.PlugIn,id是linjunPlugin。
这个类必须包含在ActiveMQ的classpath路径下。我们可以自己打一个jar包,并把jar包放到ActiveMQ的lib路径下;也可以修改相关类路径。总之要保证ActiveMQ能够加载到这个类(及其依赖类)。
其实按照上面的配置,并不需要为插件配置一个id。不过,插件声明还有其它方式,有些是需要使用id的。这里不多说,可以参考开发文档。
如配置中的注释所说,声明插件所使用的标签及语法来自spring。也就是说,spring中的<property />
等其它标签,这里也是支持的。不过目前还没有找到对@Autowired等注解的支持方式。
插件
由于我使用的是spring boot,只需要加上一个spring-boot-starter-activemq
就可以引入所需依赖jar包了。不使用spring boot的话,需要引入activemq-broker-x.y.z.jar
。
根据ActiveMQ规范,插件必须实现BrokerPlugin接口。这个接口只有一个方法:Broker installPlugin(Broker broker) throws Exception
,用于在服务启动、加载插件时,获取当前启动的borker实例,并返回一个Broker实例。
例如,上文中声明的linjunPlugin代码如下:
public class PlugIn implements BrokerPlugin {
/**
* @author linjun
* @since 2017年10月30日
* @param b
* @return
* @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
*/
@Override
public Broker installPlugin(Broker b) {
return new RemoveDestination(b);
}
}
似乎有些莫名其妙,但从“装饰者”的角度来理解就轻松愉快了:入参broker是原生实例(当然也可能是其它插件“装饰”过的);出参则是被我们自己的插件“装饰”过的、增强版的实例。
一般来说,启动过程不会做太多处理;处理逻辑在我们的“装饰者”中——如上面代码里的RemoveDestination
。
拦截器
如上文所说,我们需要提供的是一个“装饰”过的Broker。但是Broker是一个接口,其中有超过50个方法,用于处理Broker在服务期间的各种事件(如服务启动、创建链接、消息收发、事务提交与回滚等等)。直接实现接口未免太丑陋了。ActiveMQ也考虑到了这一点,因此给我们提供了一个适配器(其实同时也是一个装饰者):BrokerFilter。它的代码如下:
public class BrokerFilter implements Broker {
// 被“装饰”的原生实例
protected final Broker next;
public BrokerFilter(Broker next) {
this.next = next;
}
// 省略其它接口方法,全部都直接委托给next处理。
}
借助这个适配器,我们可以专注的处理我们关注的事件。如我们的RemoveDestination,它只需要在服务启动时注册一个定时器,按需删除无人监听的队列即可。代码如下:
public class RemoveDestination extends BrokerFilter {
private Timer timer;
/**
* @param next
*/
public RemoveDestination(Broker next) {
super(next);
// 声明为守护线程,避免它阻塞关闭activeMQ的进程
this.timer = new Timer(true);
}
@Override
public void start() throws Exception {
super.start();
// DONE linjun 2017-11-01 改为定时调度
this.timer.schedule(new TimerTask() {
@Override
public void run() {
RemoveDestination.this.remove();
}
}, 3000, 3000);
}
private void remove() {
Map<ActiveMQDestination, Destination> destinationMap = this
.getDestinationMap();
ConnectionContext context = BrokerSupport.getConnectionContext(this);
destinationMap.entrySet().forEach(entry -> {
Destination destination = entry.getValue();
// 无人监听了
// DONE linjun 2017-11-01 只处理queue,不处理topic
if (destination.getDestinationStatistics().getConsumers()
.getCount() == 0) {
ActiveMQDestination activeMQDestination = entry.getKey();
if (activeMQDestination.isQueue()) {
try {
this.removeDestination(context,
activeMQDestination, 1);
} catch (Exception e) {
// 示例代码,不要喷我直接打印堆栈
e.printStackTrace();
}
}
}
});
}
}
除了BrokerFilter这个针对Broker事件做拦截、装饰的类之外,也有针对Destination的DestinationFilter,不赘述。
特别说明
无论是BrokerFilter还是DestinationFilter,在重写父类的某个方法时,要注意调用super中的对应方法。如RemoveDestination类在覆盖start()方法时,调用了super.start()方法。
这两个类中的每一个方法,都对应Broker或Destination的一个事件的“处理栈”。如果不调用父类方法,很可能会导致一些基础的、或关键的代码没有执行到,进而出现异常。因此,如果不是非常确定“执行到这里时必须中断当前事件”,否则一定要调用super相应方法。
方案分析
上面的代码是示例用,还可以进一步完善。但是这个方案是可以满足需求的。
不过,这个方案存在一项风险:当我们删除一个Destination时,其中所有未消费的消息也会随之被删除,即使这些消息已经做了持久化。如果有某个业务系统长时间出现故障、无法连上ActiveMQ,而ActiveMQ在此期间删除了它监听的Destination及其中消息……这个风险概率虽然小,但是影响太大。慎重起见,放弃方案二。
方案三:调整报警脚本
方案三属于运维的范畴。如JIRA中所讨论的,这个问题真正的“痛点”,并不是废弃队列,而是非紧急情况却在半夜报警。因此,由运维同事修改一下脚本,调整“没有消费者”这种问题的监控报警时间就可以了。
小结
最后选定的是方案三。方案一不能满足需求;方案二的风险较大。方案三直击痛点,干脆利落。
这件事也启示我们:做事情之前先想清楚目标,谋定而后动。
©著作权归作者所有:来自51CTO博客作者winters1224的原创作品,请联系作者获取转载授权,否则将追究法律责任 ActiveMQ队列消息积压问题调研 https://blog.51cto.com/winters1224/2049432