RabbitMQ延时队列(实现定时任务)
比如有一个未付款订单,超过一定时间后,系统自动取消订单并释放占有的商品。
可以使用spring 的 schedule 定时任务轮询数据库,但是使用这种方式会极其消耗系统内存、增加数据库压力并且存在较大的时间误差

以上问题可以使用 RabbitMQ 的消息TTL和死信 Exchange 结合,下单后,如果30分钟未支付就会关闭订单和解锁库存,不需要全表扫描

消息的TTL(Time To Live)
- 消息的TTL就是消息的存活时间
- RabbitMQ 可以对队列和消息分别设置TTL
- 对队列设置就是队列没有消费者连接着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取最小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同队列设置)。可以通过设置消息的 expiration 字段或者 x-message-ttl 属性来设置时间,两者都是一样的效果。
Dead Letter Exchanges(DLX)
- 一个消息在满足如下条件下,会进入死信路由(路由不是队列),一个路由可以对应多个队列。
- 一个消息被 Cnsumer 拒收了,并且 reject 方法的参数里 requeue 是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/basic.nack)requeue=false
- 上面的消息的 TTL 到了,消息过期了
- 队列的长长度限制满了。排在前面的消息被丢弃或者仍在死信路由上
- Dead Letter Exchange 其实就是一种普通的 Exchange,和创建其它 exchange 没有两样。只是在某一个设置 Dead Letter Exchange 中有消息过期了,会自动触发消息的转发,发送到 Dead Letter Exchange 中去。
- 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,就可以实现一个延时队列。
可以给队列设置过期时间:

给每个消息设置过期时间

死信路由不能有消费者
实现
先创建好Queue、Exchange、Binding
使用SpringBoot容器进行创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| @Slf4j @Configuration public class AmqpMqConfig {
@Bean public Queue orderDelayQueue() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", "order-event-exchange"); args.put("x-dead-letter-routing-key", "order.release.order"); args.put("x-message-ttl", 60000); return new Queue( "order.delay.queue", true, false, false, args ); }
@Bean public Queue orderReleaseOrderQueue() { return new Queue( "order.release.order.queue", true, false, false ); }
@Bean public Exchange orderEventExchange() { return new TopicExchange("order-event-exchange", true, false); }
@Bean public Binding orderCreateOrderBinding() { return new Binding( "order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null ); }
@Bean public Binding orderReleaseOrderBinding() { return new Binding( "order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null ); } }
|
创建好后监听我们的死信队列,不能监听延时队列,因为延时队列中的消息到了过期时间没人处理后就会被当作死信,按照规则发到死信路由中,再由死信路由以指定路由键发到指定队列,然后队列将消息分发给消费者。
1 2 3 4 5 6 7 8 9
| @Component public class Listener { @RabbitListener(queues = "order.release.order.queue") public void test(Message message,String str, Channel channel) throws IOException { log.info("有订单过期啦: ====>>> {}",str); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
|
然后再创建一个控制器准备发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("order/order") public class OrderController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/amqp") public String testAmqp() { rabbitTemplate.convertAndSend("order-event-exchange","order.create.order", "Hello World"); return "successful"; } }
|
调用 OrderController.testAmqp()
就可以看到结果了