陈瑶 李洋磊
摘 要:本文分析了ActiveMQ数据传输的底层原理,以解决数据突发洪峰时期的队列数据积压问题。利用增加并发消费者、调整消息预取值、批量消息确认等参数,实现了传输性能的多倍提升。最后还根据业务运行出现过的问题,优化了服务端的配置,加强了薄弱环节的监控,提升了系统的稳定性。
关键词:ActiveMQ;民航数据传输;数据传输框架
中图分类号:TP368.5 文献标识码:A 文章编号:2096-4706(2019)16-0128-04
Abstract:This paper has analyzed the underlying principle of ActiveMQ data transmission,to solve the data backlog problem during the peak period of data. By adding concurrent consumers,adjusting message prefetch values and batch message validation parameters,the transmission performance is improved many times. Finally,according to the problems in the operation of the business,the configuration of the server is optimized,the monitoring of weak links is strengthened,and the stability of the system is improved.
Keywords:ActiveMQ;civil aviation data transfer;data transfer framework
0 引 言
为保证数据及时、可靠传输,选用了中间件ActiveMQ,设计了一套通用的数据传输框架。该框架现阶段主要应用于民航气象数据的传输,为南方航空、华南国际商务航空等用户提供高效、可靠的数据传输服务。该系统投入使用后,数据传输的及时性增强、可靠性提高,得到了用户单位的认可。
在几年的运行过程中,也发现了许多可以改进的地方。该框架现阶段传输的民航气象数据,具有在整点及半点数据量较大的特点,框架承载的数据量越来越多,用户数也在增加。在数据量突发洪峰的情况下,如果某个用户的数据获取能力较差,ActiveMQ的数据消费能力被阻塞,容易产生大量的数据等待确认、大量数据重发的情况,严重影响数据的吞吐量,导致数据积压,也给ActiveMQ服务器的运行带来潜在的风险。
为解决该问题,对系统运行中的数据进行分析,提出适用于不同场景的消息队列优化过程。
1 ActiveMQ消息的传送机制
如图1所示,ActiveMQ的消息由生产者(即数据发送 端)發出后,会被ActiveMQ的Broker保存,消费者(即数据接收端)已经在Broker上注册,Broker会确保消息被发送给这些消费者,确保消息已经送达后,该消息才会被删除。
ActiveMQ的消息传送机制如图1所示,通过消费者正常接收到消息后,返回一个确认接收状态的消息——ACK消息给Broker,如果层次较为复杂,则会一层一层的返回ACK消息。
ActiveMQ中的ACK消息有以下几种类型,定义在字段ACK_TYPE中,如表1所示。
从ACK_TYPE的值可以看出,在ActiveMQ中,消息确认的频率是可以由开发者选择的。可以消费一条消息返回一条确认消息,也可以选择另外一种模式——延时确认。在消费者成功消费消息后,不立即返回ACK,而是等到这些ACK消息的条数积攒到某个阈值时,返回一个ACK消息把他们全部确认。
从这个定义也可看出,延时确认具有更好的性能。特别是在网络拥堵的时期,N条消息只会有1条ACK消息,相比N条消息N条ACK返回,大大减轻了网络负荷。但这样的确认机制也存在一定的弊端,如果消费端出现异常,无法正常返回ACK,会导致N条消息重发,反而会造成网络负担。
并且大量消息如果不得到及时的确认,Broker需保存这些消息,并将他们放置于队列中排队等待确认,这将消耗Broker服务器的内存、硬盘等资源,如果该服务器的性能低下,将给Broker的运行带来潜在的风险。
所以需要分析运行的实际情况,根据已有的资源进行灵活的配置、调优。
2 问题定位及分析
利用现有的框架和数据传输模式,模拟测试数据突发洪峰时的数据吞吐量,从分析消息包处理耗时入手,进行各个参数的调优。首先在生产端生成大量的消息,以在测试数据突发洪峰时期,每个消费者的消息处理速度、消息积压数[2]。如表2所示。
从表2的数据可以看出,消费端的消费能力存在的差距,消费能力差的客户端在突发数据洪峰时容易发生数据积压。两个消费者的网络状态类似,可以排除因网络原因导致的消息积压。通过进一步分析消费能力弱的消费端,研究其消息处理流程,发现其接收到消息后还需进行串行处理,处理过程更加复杂,导致消息处理更加耗时,消息返回ACK的时间也更长,导致了Broker需等待这个更慢的消费者。
针对消费端处理速度存在瓶颈的问题,设想通过提高消费端的处理速度入手。消费端的消息处理流程为业务需要,无法精简处理流程来加快消费速度。那还有其他什么手段可以增加消费端对消息的消费速度?既然现存的串行等待的时间无法缩短,那是否可以通过并行多个消费者程序来提高效率?
3 增加并发消费者方案测试
拟通过增加并发消费者的方式,看是否能提高消息处理的速度。要使用并发消费者[3],可修改框架中Spring的JMS配置,增加多个Listener实例。配置项为Simple Message Listener Container[4],可以配置固定的实例个数,也可以配置一个实例数的区间,这样消费者可以根据消息的压力情况动态调整并发数。
配置文件:
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connec tionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="message Receiver"/>
<property name="concurrency" value="10-20"/>
或者:
<property name="concurrentConsumers" value= "20"/>
</bean>
测试结果如表3所示。
4 消费者优化
通过测试发现,增加并行消费者后,消息的消费速度出现明显的提升。但消费者数目大于10以后,消息处理速度不再提升,在多个消费者中,有些消费者很忙碌,需要处理大量的消息,有些消费者很空闲。为什么会出现这样的情况?
在目标URI的定义中,有一个prefetchSize[5]参数值可配置,如下代码所示:
String queueURI = "queueForGuest?customer.prefetchSize=100";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queue URI);
prefetchSize参数定义了一次有多少条消息推送给消费者,若在代码中没有指定prefetchSize参数值,系统将给其一个默认值,如表4所示。
从这个默认值中可以看出,使用默认值无法与现实需求吻合。如果消费者处理消息的能力很差,一次推送1000个消息给消费者,无疑会造成消费者端的拥堵。如果消费者端性能好、处理速度快,可配置较高的prefetchSize值[7]。
而本系统中消费端航空公司2的消费者就是一个慢消费者,消费消息的速度慢,如果使用默认prefetchSize值1000,一次将1000个消息推送给该消费端,剩下的推送给该消费端并行的消费者,就出现了上文中的情况,部分消费者要处理的消息很多,消费能力差,消费速度慢,这些消费者特别忙碌,甚至出现拥堵现象。而其他并行的消费者没有消息需要处理。
所以尝试将该消费端的prefetchSize值进行调整,提升消费端整体的性能。
为了测试系统中现有消费者的适合的prefetchSize值大小,将prefetchSize值分别配置为100、1000,并进行了对比分析测试,测试结果如表5所示。
将queuePrefetch参数修改为100后,消费端Consumer2并行且忙碌的消费者数量增加,消费10000条消息减少。
除了批量获取多个消息可以使性能提高,批量确认多个消息也将使性能大大提升,ACK的模式有很多种,如ActiveMQ消息的传送机制表中的说明,其中效率最高的机制为optimizeAcknowledge模式,当有prefetchSize的65%个消息被正确消费后,消费端将返回一条ACK消息,并批量确认这些消息。
这样的模式虽然效率高,但若消费端出现异常,未正常返回这些消息的ACK,Broker将重发这些消息,这样的模式适用于高吞吐量、对重复消息有容错能力的系统。观察系统运行时这样的异常情况较少,且在消费端均做了重复消息处理,同时本系统现应用于传输气象报文,对实时性要求很高,提高吞吐量对系统的运行意义重大。故这种高效模式适用于本系统。
将原来的逐条消息ACK改为optimizeAcknowledge模式后,消费端、Broker端的资源消耗降低,处理速度提高,测试结果如表6所示。
5 Broker优化
从表2的测试结果可以看出,不同的消费端的消费速度差异较大,系统运行中同时存在快消费者和慢消费者,在现场运行的过程中也多次发现这样的问题,某个消费者的消费能力较慢,不能及时消费消息或者返回ACK,导致Broker必须在内存中保存这些消息,增加了內存的消耗,消息积压过多时,需要将内存的消息写入到磁盘中,增加了Broker端的磁盘I/O消耗。如果情况进一步严重,Broker将阻塞生产者,迫使其降低生产消息的速率甚至不生产消息。
一个慢的消费者不仅给Broker端的运行带来了巨大的潜在风险,还有可能导致快的消费者也无法正常获取消息。这是在运行环境中必须高度重视的一个问题。
保证系统运行的稳定至关重要,但与此同时,即使用户是慢消费者,保证他们及时获取到数据也很重要,如何满足这个矛盾的需求,主要从以下三个方面进行了优化:
(1)关闭producerFlowControl,即使有慢消费者,先保证消息生产及快消费者消费的速度,保证消息传输不会因为慢消费者而终止。
(2)捕获Broker资源消耗异常,及时进行干预、优化。
在默认情况下,producerFlowControl是开启的,在这种模式下,如果消费者消费能力差,Broker将降低消息的生产,以保证消费端不会由于消息拥堵而资源耗尽,该模式为调节Broker来配合慢消费端。
如果选用该模式,消息的生产者也可以进行一些异常的处理,可以进行异常告警,并且生产者可以在等待设定的时间后进行重试,避免由于失败而使发送消息的请求立即被阻塞,生产者变成假死的状态。
(3)监控Broker资源使用情况,监控消费者消费情况,及时发现慢消费者,对异常及时进行干预、优化。监控每个消费者消费消息的情况,主要监控參数为消费者是否掉线、阻塞的消息数、等待确认的消息数、进队列消息数、出队列消息数等。
6 结 论
通过各个参数的调优,传输系统数据积压的问题得到了解决,消息传输的性能得到了提高,消息传输的速度提升情况如表7所示。
表7 优化后传输时间变化及传输效率提升情况
通过方案调整及参数优化,系统的性能及稳定性都得到了较大的提高,达到了预期目标。基于中间件ActiveMQ的调优方法还有很多,例如消息传送优先级、虚拟通道、分布式网络、roker集群等,在进一步的研究工作中可从这些方面进一步提高性能及系统稳定性。
参考文献:
[1] APACHE software foundation. ActiveMQ [EB/OL].http://activemq.apache.org/index.html,2018-09-10.
[2] 王鹏,从波,李国杰,等.基于ActiveMQ消息总线的性能测试方法 [J].测试技术学报,2019,33(2):147-152.
[3] 周聪.基于改进的Active MQ的通信模型的设计和实现 [D].长春:吉林大学,2017.
[4] Spring AMQP. Spring [EB/OL].https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.html,2019-01-01.
[5] Bruce Snyder,Dejan Bosanac,Rob Davies. Introduction to Apache ActiveMQ Green Paper from Active MQ in action [M].London:Manning,2017:20-23.
[6] Bruce Snyder,Dejan Bosanac,Rob Davies. Active MQ inaction [M].London:Manning,2005:4-5.
[7] 庞佳丽.分布式系统中基于中间件的异步通信可靠性研究 [D].杭州:浙江工业大学,2017.
作者简介:陈瑶(1987.04-),女,汉族,湖南湘潭人,工
程师,硕士,研究方向:数据传输框架;李洋磊(1983.01-),男,汉族,河南洛阳人,工程师,硕士,研究方向:民航气象信息系统设备维护。