php rabbitmq的开发体验(三)
2021/6/15 20:36:57
本文主要是介绍php rabbitmq的开发体验(三),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、前言
在上一篇rabbitmq开发体验(二),我们正式的用我们php来操作消息队列的生产和消费,并利用的rabbitmq的高级特性来进行ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。已经可以正常的使用,但消息消费异常问题罗列以下。
1、自动ack机制会导致消息丢失的问题;
简要代码如下,设置消息自动ack,这种情况下,MQ只要确认消息发送成功,无须等待应答就会丢弃消息,
这会导致客户端还未处理完时,出异常或断电了,导致消息丢失的后果。解决方法就是把代码里的true,改成false,并在消息处理完后发ack响应。
注:自动ack还有个弊端,只要队列不空,RabbitMQ会源源不断的把消息推送给客户端,而不管客户端能否消费的完。
$this->channel->basic_consume( $this->query_name, '', //customer_tag false, //no_local true, //no_ack 消息自动ack false, //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接 false, //nowait
2、自动ack机制会导致消息丢失的问题;
为了解决问题1,做了改进,简要代码如下:
$this->channel->basic_consume( $this->query_name, '', //customer_tag false, //no_local false, //no_ack 关闭自动ack,手工发送ack false, //exclusive 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接 false, //nowait
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //手动在成功消费后发送ack
先处理消息,完成后,再做ack响应,失败就不做ack响应,这样消息会储存在MQ的Unacked消息里,不会丢失,看起来没啥问题,
但是有一次,callback触发了一个bug,导致所有消息都抛出异常,然后队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃的问题。
原因是如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会抛弃,直至客户端断开重连时,才变回ready;
如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态,Unacked消息多了,占用内存越来越大,就会异常了。
解决办法就是及时去ack消息了。
3、启用nack机制后,导致的死循环;
为了解决问题2,再调整一下代码,简要代码如下:
catch (Exception $e){ $this->writeLog('runtime/vm_exception.log',$e->getMessage()); //发送nack信息应答当前消息处理异常 第三个参数是否重回队列 默认false不重回队列 $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true); }
嗯,改成这模样总没问题了吧,正常就ack,不正常就nack,并等下一次重新消费。
果然,又出问题了,这回又是callback出异常了,但是故障现象是Ready的消息猛增,一直不见减少。
原因是出异常后,把消息塞回队列头部,下一步又消费这条会出异常的消息,又出错,塞回队列……
进入了死循环了,当然新的消息不会消费,导致堆积了……
我的解决方案:
$retry = $this->getRetryCount($msg); try { $routingKey = $this->getOrigRoutingKey($msg); $subMessage = new SubMessage($msg, $routingKey , [ 'retry_count' => $retry, // 重试次数 ]); $this->subscribe($subMessage); } catch (\Exception $ex) { $this->writeLog('runtime/vm_consume_failed.log', '消费失败!' . $ex->getMessage() . $msg->getBody()); if ($retry > 3) { // 超过最大重试次数,消息无法处理 $publishFailed($msg); return; } // 消息处理失败,稍后重试 $publishRetry($msg); }
/** * 获取消息重试次数 * @param AMQPMessage $msg * @return int */ protected function getRetryCount($msg) { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; }
// 发起延时重试 $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) { /** @var AMQPTable $headers */ if ($msg->has('application_headers')) { $headers = $msg->get('application_headers'); } else { $headers = new AMQPTable(); } $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg)); $properties = $msg->get_properties(); $properties['application_headers'] = $headers; $newMsg = new AMQPMessage($msg->getBody(), $properties); $this->channel->basic_publish( $newMsg, $exchangeRetryName, $queueName ); //发送ack信息应答当前消息处理完成 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); };
/** * 声明重试队列 */ private function declareRetryQueue() { $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array( 'x-dead-letter-exchange' => $this->exchange_name, 'x-dead-letter-routing-key' => $this->query_name, 'x-message-ttl' => 3 * 1000, ))); $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name); }
这篇关于php rabbitmq的开发体验(三)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-19php8的协程和hyperf的协程有什么区别?-icode9专业技术文章分享
- 2024-12-19php8 的fiber是什么?-icode9专业技术文章分享
- 2024-12-05怎么在php8,1 里面开启 debug?-icode9专业技术文章分享
- 2024-12-05怎么在php8,1 里面开启 debug?-icode9专业技术文章分享
- 2024-11-29使用PHP 将ETH账户的资产汇集到一个账户
- 2024-11-23怎么实现安卓+php 热更新方案?-icode9专业技术文章分享
- 2024-11-22PHP 中怎么实现判断多个值是否为空、null 或者为 false?-icode9专业技术文章分享
- 2024-11-11开源 PHP 商城项目 CRMEB 二次开发和部署教程
- 2024-11-09怎么使用php在kaufland平台刊登商品?-icode9专业技术文章分享
- 2024-11-05PHP的抽象类和接口是什么,有什么区别-icode9专业技术文章分享