加入收藏 | 设为首页 | 会员中心 | 我要投稿 汽车网 (https://www.0577qiche.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

PHP与RabbitMQ实现消息队列的完整代码

发布时间:2023-10-11 11:31:01 所属栏目:PHP教程 来源:
导读:本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细
本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

php扩展地址: http://pecl.php.net/package/amqp

具体以官网为准 http://www.rabbitmq.com/getstarted.html

介绍:

config.php 配置信息

BaseMQ.php MQ基类

ProductMQ.php 生产者类

ConsumerMQ.php 消费者类

Consumer2MQ.php 消费者2

config.php

<?php 
 
return [ 
 
    //配置 
 
    'host' => [ 
 
        'host' => '127.0.0.1', 
 
        'port' => '5672', 
 
        'login' => 'guest', 
 
        'password' => 'guest', 
 
        'vhost'=>'/', 
 
    ], 
 
    //交换机 
//Cuoxin.com 
    'exchange'=>'word', 
 
    //路由 
 
    'routes' => [], 
 
]; 
BaseMQ.php

<?php 
 
/** 
 
 * Created by PhpStorm. 
 
 * User: pc 
 
 * Date: 2018/12/13 
 
 * Time: 14:11 
 
 */ 
 
 
 
namespace MyObjSummary/rabbitMQ; 
 
 
 
/** Member 
 
 *      AMQPChannel 
 
 *      AMQPConnection 
 
 *      AMQPEnvelope 
 
 *      AMQPExchange 
 
 *      AMQPQueue 
 
 * Class BaseMQ 
 
 * @package MyObjSummary/rabbitMQ 
 
 */ 
 
class BaseMQ 
 

 
    /** MQ Channel 
 
     * @var /AMQPChannel 
 
     */ 
 
    public $AMQPChannel ; 
 
 
 
    /** MQ Link 
 
     * @var /AMQPConnection 
 
     */ 
 
    public $AMQPConnection ; 
 
 
 
    /** MQ Envelope 
 
     * @var /AMQPEnvelope 
 
     */ 
 
    public $AMQPEnvelope ; 
 
 
 
    /** MQ Exchange 
 
     * @var /AMQPExchange 
 
     */ 
 
    public $AMQPExchange ; 
 
 
 
    /** MQ Queue 
 
     * @var /AMQPQueue 
 
     */ 
 
    public $AMQPQueue ; 
 
 
 
    /** conf 
 
     * @var 
 
     */ 
 
    public $conf ; 
 
 
 
    /** exchange 
 
     * @var 
 
     */ 
 
    public $exchange ; 
 
 
 
    /** link 
 
     * BaseMQ constructor. 
 
     * @throws /AMQPConnectionException 
 
     */ 
 
    public function __construct() 
 
    { 
 
        $conf =  require 'config.php' ; 
 
        if(!$conf) 
 
            throw new /AMQPConnectionException('config error!'); 
 
        $this->conf     = $conf['host'] ; 
 
        $this->exchange = $conf['exchange'] ; 
 
        $this->AMQPConnection = new /AMQPConnection($this->conf); 
 
        if (!$this->AMQPConnection->connect()) 
 
            throw new /AMQPConnectionException("Cannot connect to the broker!/n"); 
 
    } 
 
 
 
    /** 
 
     * close link 
 
     */ 
 
    public function close() 
 
    { 
 
        $this->AMQPConnection->disconnect(); 
 
    } 
 
 
 
    /** Channel 
 
     * @return /AMQPChannel 
 
     * @throws /AMQPConnectionException 
 
     */ 
 
    public function channel() 
 
    { 
 
        if(!$this->AMQPChannel) { 
 
            $this->AMQPChannel =  new /AMQPChannel($this->AMQPConnection); 
 
        } 
 
        return $this->AMQPChannel; 
 
    } 
 
 
 
    /** Exchange 
 
     * @return /AMQPExchange 
 
     * @throws /AMQPConnectionException 
 
     * @throws /AMQPExchangeException 
 
     */ 
 
    public function exchange() 
 
    { 
 
        if(!$this->AMQPExchange) { 
 
            $this->AMQPExchange = new /AMQPExchange($this->channel()); 
 
            $this->AMQPExchange->setName($this->exchange); 
 
        } 
 
        return $this->AMQPExchange ; 
 
    } 
 

    /** queue 
 
     * @return /AMQPQueue 
 
     * @throws /AMQPConnectionException 
 
     * @throws /AMQPQueueException 
 
     */ 
 
    public function queue() 
 
    { 
 
        if(!$this->AMQPQueue) { 
 
            $this->AMQPQueue = new /AMQPQueue($this->channel()); 
 
        } 
 
        return $this->AMQPQueue ; 
 
    } 
 
 
 
    /** Envelope 
 
     * @return /AMQPEnvelope 
 
     */ 
 
    public function envelope() 
 
    { 
 
        if(!$this->AMQPEnvelope) { 
 
            $this->AMQPEnvelope = new /AMQPEnvelope(); 
//Cuoxin.com 
        } 
 
        return $this->AMQPEnvelope; 
 
    } 
 

ProductMQ.php

<?php 
 
//生产者 P 
 
namespace MyObjSummary/rabbitMQ; 
 
require 'BaseMQ.php'; 
 
class ProductMQ extends BaseMQ 
 

 
    private $routes = ['hello','word']; //路由key 
 

    /** 
 
     * ProductMQ constructor. 
 
     * @throws /AMQPConnectionException 
 
     */ 
 
    public function __construct() 
 
    { 
 
       parent::__construct(); 
 
    } 
 
 
 
    /** 只控制发送成功 不接受消费者是否收到 
 
     * @throws /AMQPChannelException 
 
     * @throws /AMQPConnectionException 
 
     * @throws /AMQPExchangeException 
 
     */ 
 
    public function run() 
 
    { 
 
        //频道 
 
        $channel = $this->channel(); 
 
        //创建交换机对象 
 
        $ex = $this->exchange(); 
 
        //消息内容 
 
        $message = 'product message '.rand(1,99999); 
 
        //开始事务 
 
        $channel->startTransaction(); 
 
        $sendEd = true ; 
 
        foreach ($this->routes as $route) { 
 
            $sendEd = $ex->publish($message, $route) ; 
 
            echo "Send Message:".$sendEd."/n"; 
 
        } 
 
        if(!$sendEd) { 
 
            $channel->rollbackTransaction(); 
 
        } 
 
        $channel->commitTransaction(); //提交事务 
 
        $this->close(); 
 
        die ; 
 
    } 
 

 
try{ 
 
    (new ProductMQ())->run(); 
 
}catch (/Exception $exception){ 
 
    var_dump($exception->getMessage()) ; 
 

ConsumerMQ.php

<?php 
 
//消费者 C 
 
namespace MyObjSummary/rabbitMQ; 
 
require 'BaseMQ.php'; 
 
class ConsumerMQ extends BaseMQ 
 

 
    private  $q_name = 'hello'; //队列名 
 
    private  $route  = 'hello'; //路由key 
 
    /** 
 
     * ConsumerMQ constructor. 
 
     * @throws /AMQPConnectionException 
 
     */ 
 
    public function __construct() 
 
    { 
 
        parent::__construct(); 
 
    } 
 
    /** 接受消息 如果终止 重连时会有消息 
 
     * @throws /AMQPChannelException 
 
     * @throws /AMQPConnectionException 
 
     * @throws /AMQPExchangeException 
 
     * @throws /AMQPQueueException 
 
     */ 
 
    public function run() 
 
    { 
  
        //创建交换机 
 
        $ex = $this->exchange(); 
 
        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 
 
        $ex->setFlags(AMQP_DURABLE); //持久化 
 
        //echo "Exchange Status:".$ex->declare()."/n"; 
 
        //创建队列 
 
        $q = $this->queue(); 
 
        //var_dump($q->declare());exit(); 
 
        $q->setName($this->q_name); 
 
        $q->setFlags(AMQP_DURABLE); //持久化 
 
        //echo "Message Total:".$q->declareQueue()."/n"; 
 
 
 
        //绑定交换机与队列,并指定路由键 
 
        echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."/n"; 
 
 
 
        //阻塞模式接收消息 
 
        echo "Message:/n"; 
 
        while(True){ 
 
            $q->consume(function ($envelope,$queue){ 
 
                $msg = $envelope->getBody(); 
 
                echo $msg."/n"; //处理消息 
 
                $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
 
            }); 
 
            //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 
 
        } 
 
        $this->close(); 
 
    } 
 

 
try{ 
 
    (new ConsumerMQ)->run(); 
 
}catch (/Exception $exception){ 
 
    var_dump($exception->getMessage()) ; 
 

(编辑:汽车网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章