基于AMQP的分布式系统消息可靠传输方案的设计与实现

2024-01-19 04:12魏连秋张义红张建光邢春燕
数字通信世界 2023年12期
关键词:生产者交换机路由

魏连秋,张义红,张建光,马 倩,邢春燕,刘 伟

(1.衡水学院数学与计算机科学系,河北 衡水 053000;2.衡水学院马克思主义学院,河北 衡水 053000)

消息传输过程中,可能会出现网络异常、程序异常、机器异常等各种突发情况,无法保证消息正常传输。一条消息从生产到消费的整个过程中有生产者生产、消息代理存储、消费者消费三个过程需要保证消息的可靠性。通过对消息传输流程进行分析,发现若能保证生产过程可靠性、消息代理存储过程可靠性、消费过程可靠性,则基本可以实现消息传输的可靠性。本文通过对现有消息中间件实现机制的分析,在遵循AMQP协议的基础上,结合实际业务应用场景,权衡消息的可靠传输,对消息生产过程、消息代理存储过程、消息消费过程三个过程进行分析,提出一种分布式系统中消息可靠性传输方案,实现消息传输的高可靠性[1]。

1 消息生产可靠性

在生产者发出消息之后可能会出现网络拥塞、服务器宕机等突发问题,从而导致消息的丢失。如果不对消息生产过程的可靠性加以设计,生产者将无法感知消息是否已正常送达消息代理,进而将导致消息的丢失。如果消息在生产过程中,能感知到传输过程中消息传输失败,生产者可以进行重新传输等后续动作以确保消息生产过程的可靠性。因此,在消息生产过程中,设计一种高级发布确认方案以保证消息生产可靠性是十分必要的。

1.1 publisher confirm机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失,这种机制要给每个消息指定一个唯一ID,消息发送到MQ以后,会返回一个结果给发送者,表示消息是否发送成功。

publisher发送消息的配置为publisher-confirmtype。这里支持两种类型:一种是simple:同步等待confirm结果,直到超时;另一种是correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback。相对而言,异步确认性价比较高,因而在此采用correlated:异步回调。

(1)消息成功发送到交换机,如图1所示返回ack。

图1 消息成功投递到交换机

(2)消息未成功投递到交换机,如图2所示返回nack。

图2 消息未成功发送到交换机

在仅开启publisher confirm机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认信息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃了。这显然不是我们所希望的,于是提出mandatory+publish-return方案。

1.2 mandatory+publish-return方案

设置mandatory参数可以在当消息传输过程中不可达目的地时将消息返回给生产者。可以使用如图3所示方案,当把mandatory参数设置为true时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,当把该参数设置为false时,如果发现消息无法进行路由,则直接丢弃。

图3 mandatory+publish-return方案

在只使用mandatory参数的情况下,如果交换机无法将消息进行路由时会提示Returned message but no callback available,说明设置mandatory参数后,如果消息无法被路由,则会返回给生产者,此时是通过回调的方式进行的,但由于还没有回调函数,所以生产者需要设置相应的回调函数才能接受该消息。此时就需要设置生产者回执:publisher-return,实现一个ReturnCallback接口,这样当消息传输到交换机而没有路由到队列时,就返回ACK及路由失败原因。此时再运行则Returned message but no callback available提示消息,而可以看到生产者接收到了被退回的消息,并带上了消息被退回的原因:NO_ROUTE。

有了mandatory+publisher-return方案,我们获得了对无法路由消息的感知能力,此时我们可以通过日志的形式得到反馈并手动重新处理。但实际上,这样做无形中增加了生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想增加生产者的复杂性,又不丢失消息,怎么办?在RabbitMQ中,有一种备份交换机的机制存在,可以较好地解决这一问题。

2 消息消费可靠性

消息消费可靠性是指在分布式系统中,保证消息在传输和处理过程中不会丢失或重复,为了实现可靠的消息消费,一般采取如下措施。

2.1 消息确认机制

RabbitMQ为了能够把消息正确发送给消费者,提供了消息确认机制,即告诉RabbitMQ消息已经收到,使用消息确认机制可以使消费者成功处理消息后再从队列中删除该消息。

消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理该消息。若出现这样的场景:RabbitMQ传输消息给消费者,消费者获取消息后返回ACK,RabbitMQ删除消息,此时消费者突然宕机消息未成功处理,这样,消息就丢失了。因此消费者返回ACK的时机非常重要。一般RabbitMQ允许配置三种确认模式:①none:关闭ACK,RabbitMQ假定消费者获取消息后会成功处理,消息传输后立即被删除,此种模式下消息传输是不可靠的,可能丢失,因而不可取;②auto:自动ACK,由spring监测listener代码是否出现异常,没有异常则返回ACK。此种模式类似于事务机制;③manual:手动ACK,需要在业务代码结束后,手动调用API发送ACK,这种模式需要根据具体情况,判断什么时候发送ACK,使用较为复杂。综合考虑推荐使用默认的auto模式[2]。

2.2 消费者端限流

当消息数量变得过多或处理速度变慢时,可以使用消费者端限流来控制消息的传递速率。通过限制消息的最大数量或大小避免消费者宕机,从而提高消费者的可靠性。

3 方案设计与实现

为了保障分布式系统中消息可靠性传输,笔者设计了如下解决方案。

生产者发送消息给交换机,若不能发送成功则利用publisher confirm机制重新传输,若交换机不能成功将消息路由到队列则利用备份交换机机制将消息传输到指定队列进行消费;若消费者不能成功消费消息则利用延迟重试机制在指定次数内进行循环延迟重试;若循环结束仍旧不能成功消费则利用死信交换机将消息传输到死信队列进行消费。其中的交换机、队列设置均设为持久的,消息视具体情况而定,使用如图4所示传输方案,这样可最大限度保证消息可靠性传输与消费。

图4 发布确认+带重试次数延迟消费的可靠性传输方案

生产端核心代码如下:

4 结束语

在消费端对消息的处理逻辑中,还应考虑幂等性,即对同一条消息多次处理所产生的结果应该是相同的,这样出现重复处理的情况也不会对系统产生影响。另外,在分布式环境中,我们还可以采用副本集群模式,即在RabbitMQ集群中,多个节点会同步存储队列中的消息,如果某个节点发生故障,其他节点可以接替它继续处理数据,从而防止消息丢失。通过以上解决方案的合理选择和组合,可以提高RabbitMQ性能和可靠性,从而确保分布式系统的稳定性和可用性。

猜你喜欢
生产者交换机路由
1月巴西生产者价格指数上涨3.92%
2019德国IF设计大奖
修复损坏的交换机NOS
探究路由与环路的问题
家禽福利的未来:生产者能期待什么?
使用链路聚合进行交换机互联
一场大风带给生产者的思考
PoE交换机雷击浪涌防护设计
PRIME和G3-PLC路由机制对比
罗克韦尔自动化交换机Allen-Bradley ArmorStratix 5700