如何保证消息中间件100%消息投递成功?;

如何保证消息中间件100%消息投递成功

前言

消息中间件,RabbitMQ,RocketMQ,Kafka等,引入消息中间件提升业务的抗并发性,流量削峰,业务解耦。

订单服务 ——>(投递消息) MQ ——>(监听消息) 物流服务

分析问题

小伙伴们对此会有些疑问,订单服务发起消息服务,返回成功不就成功了吗?如下面的伪代码

1
2
3
4
5
6
7
8
9
public boolean sendOrderMessage(){
rabbitTemplate template = new rabbitTemplate();
try{
template.send(message);
}catch{
return false;
}
return true;
}

上面代码中,一般发送消息就是这么写的,小伙伴们觉得有什么问题吗?
如果MQ服务器突然宕机了会出现什么情况?是不是我们订单服务发过去的消息全部没有了吗?是的,一般MQ中间件为了提高系统的吞吐量会把消息保存在内存中,如果不作其他处理,MQ服务器一旦宕机,消息将全部丢失。这个是业务不允许的,造成很大的影响。

持久化

有经验的小伙伴会说,我知道一个方法就是把消息持久化,RabbitMQ中发消息的时候会有个durable参数可以设置,设置为true,就会持久化。

这样的话MQ服务器即使宕机,重启后磁盘文件中有消息的存储,这样就不会丢失了吧。是的这样就一定概率的保障了消息不丢失。
但还会有个场景,就是消息刚刚保存到MQ内存中,但还没有来得及更新到磁盘文件中,突然宕机了。(我靠,这个时间这么短,也会出现,概率太低了吧),这个场景在持续的大量消息投递的过程中,会很常见。
那怎么办?我们如何作才能保障一定会持久化到磁盘上面呢?

confirm机制

上面问题出现在,没有人告诉我们持久化是否成功。好在很多MQ有回调通知的特性,RabbitMQ就有confirm机制来通知我们是否持久化成功?


confirm机制的原理:

  • 消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者;
  • 如果消息接收不成功,MQ会返回一个nack消息给生产者;
    我们看一下confirm的机制,试想一下,如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的。这个在高并发场景下是不能够接受的,吞吐量太低了。
    所以MQ持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。
    所以comfirm机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够100%保障消息不丢失,因为即使加上了confirm机制,消息在MQ内存中还没有刷盘到磁盘就宕机了,还是没法处理。
    说了这么多,还是没法确保,那怎么办呢???

消息提前持久化 + 定时任务

其实本质的原因是无法确定是否持久化?那我们是不是可以自己让消息持久化呢?答案是可以的,我们的方案再一步的演化。

  • 订单服务在投递消息之前,先持久化到redis或者DB中,建议redis,性能考虑,消息状态为发送中
  • confirm 机制监听消息是否发送成功,发送ack成功则删除redis中的消息;
  • 如果nack不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定;
  • 加了个定时任务,来拉取隔一定时间了,消息状态还是为发送中的,这个状态就表明,订单服务是没有收到ack成功消息。
  • 定时任务会作补偿性的投递消息,这个时候如果MQ回调ack成功接收了,再把Redis中此消息删除。
    这样的机制其实就是一个补偿机制,我不管MQ有没有真正的接收到,只要我的Redis中的消息状态也是为【发送中】,就表示此消息没有正确成功投递。再启动定时任务去监控,发起补偿投递,业务方需要能接受消息重复
    当然定时任务那边我们还可以加上一个补偿的次数,如果大于3次,还是没有收到ack消息,那就直接把消息的状态设置为【失败】,由人工去排查到底是为什么?
    这样的话方案就比较完美了,保障了100%的消息不丢失(当然不包含磁盘也坏了,可以做主从方案)。
    不过这样的方案,就会有可能发送多次相同的消息,很有可能MQ已经收到了消息,就是ack消息回调时出现网络故障,没有让生产者收到。那就要要求消费者一定在消费的时候保障幂等性!

    幂等性

限流算法

限流详解

限流算法

令牌桶算法

  • 描述:存放固定容量令牌的桶,按照固定速率往桶里添加令牌
  • 假设限制2r/s,则按照500毫秒的固定速率往桶中=添加令牌
  • 桶中最多存放b个令牌,桶满时,新加的令牌会被丢弃或拒绝

漏桶算法

  • 固定容量的漏桶,按照常量固定速率流出水滴
  • 如果桶是空的不需要流出水滴
  • 可以以任意的速率流入水底要漏桶
  • 如果流入的水滴超出了桶的容量,则流入的水滴溢出(被丢弃),而漏桶的容量是不变的

令牌桶是按照固定速率往桶中添加令牌,请求是否处理需要看桶中的令牌数是否足够,当令牌数减少为0时,则拒绝新的请求


漏桶按照固定速率流出请求,流入请求速率任意,当流入的请求数累计到漏桶容量时,则新流入的请求被拒绝。


漏桶限制的是常量流出速率,流出的速率是一个固定值,从而可一平滑 突发流入速率


令牌桶允许一定程度的突发,而漏桶主要是平滑流入速度,对于相同的参数的到的限流效果是一样的


应用级限流

  • 限制总并发数/连接数/请求数
  • 限制总资源数:(数据库连接数,线程数,使用池化)
  • 限制接口的总并发数(TPS)/请求数(QPS)
  • 限制接口的时间窗口请求数
  • 平滑限制接口的请求数

分布式限流

  • nginx+lua
  • redis+lua
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×