hyperf框架使用rabbitMQ生产消息至laravel/lumen进行消费

文章目录

需要做项目迁移时,例如laravel迁移至hyperf时,因为基本上都是一步一步迁移的,仍有例如支付回调等依旧在laravel框架中进行消费的情况。需要接管处理消息的queue进行数据格式改造,利用构造同样命名空间的job去进行投递,他会序列化数据,可以debug一下内容哦,然后投递至rabbitMQ后,laravel进行消费就好啦。其中hyperf的版本背景为2.1

  1. 在app下建立Job目录为例,大家可以根据情况来
  2. 在Job目录下建立Job.php,复制以下代码
<?php

declare(strict_types=1);

namespace AppJob;

/**
 * Class Job
 * @package AppJob
 */
class Job
{
    protected job;
    publicconnection;
    public queue;
    publicdelay;

    /**
     * Job constructor.
     */
    public function __invoke()
    {
        this->job = null;this->connection = null;
        this->queue = null;this->delay = null;
    }

    /**
     * Set the desired delay for the job.
     *
     * @param  DateTime|int|null  delay
     * @returnthis
     */
    public function delay(delay)
    {this->delay = delay;

        returnthis;
    }

    /**
     * Set the desired queue for the job.
     *
     * @param  string|null  queue
     * @returnthis
     */
    public function onQueue(queue)
    {this->queue = queue;

        returnthis;
    }

    /**
     * Set the desired connection for the job.
     *
     * @param  string|null  connection
     * @returnthis
     */
    public function onConnection(connection)
    {this->connection = connection;

        returnthis;
    }

}
  1. 接管Producer.php,继续创建Producer.php,复制以下代码进去
<?php

declare(strict_types=1);

namespace AppJob;

use HyperfAmqpConnection;
use HyperfAmqpMessageProducerMessageInterface;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;
use HyperfAmqpBuilder;

/**
 * 生产者
 * Class Producer
 * @package AppJob
 */
class Producer extends Builder
{
    public exchange_type;
    publicexchange_passive;
    public exchange_durable;
    publicexchange_auto_delete;

    public queue_passive;
    publicqueue_durable;
    public queue_exclusive;
    publicqueue_auto_delete;
    public queue_nowait;

    public function checkExchange(channel, producerMessage)
    {exchange   = producerMessage->getExchange();queue      = producerMessage->getQueue();routingKey = producerMessage->getRoutingKey();ttl        = producerMessage->getTtl();this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct');
        this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true);
        this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false);
        this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true);this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false);
        this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false);

        //定义交换器channel->exchange_declare(exchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);

        //定义队列channel->queue_declare(queue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete);
        //绑定队列到交换器上channel->queue_bind(queue,exchange, routingKey);

        if (ttl > 0) {
            // delayExchange   = 'delayed_exchange_' .exchange;
            // delayQueue      = 'delayed_queue_' .queue . '_' . ttl;
            //delayRoutingKey = routingKey .ttl;
            delayExchange   =exchange;
            delayQueue      =queue . '_deferred_' . ttl;delayRoutingKey = delayQueue;
            //定义延迟交换器channel->exchange_declare(delayExchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);

            //定义延迟队列channel->queue_declare(delayQueue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete, false, new AMQPTable(array(
                "x-dead-letter-exchange"    =>exchange,
                "x-dead-letter-routing-key" => routingKey,
                "x-message-ttl"             =>ttl * 1000,
            )));
            //绑定延迟队列到交换器上
            channel->queue_bind(delayQueue, delayExchange,delayRoutingKey);

            producerMessage->setExchange(delayExchange);
            producerMessage->setRoutingKey(delayRoutingKey);
        }
    }

    /**
     * @param ProducerMessageInterface producerMessage
     * @paramroutingKey
     * @param exchange
     * @param boolconfirm
     * @param int timeout
     * @return bool
     * @throws Exception
     * @throws Throwable
     */
    public function produce(ProducerMessageInterfaceproducerMessage, routingKey,exchange, bool confirm = false, inttimeout = 5): bool
    {
        return retry(1, function () use (exchange,routingKey, producerMessage,confirm, timeout) {
            returnthis->produceMessage(producerMessage,routingKey, exchange,confirm, timeout);
        });
    }

    /**
     * @param ProducerMessageInterfaceproducerMessage
     * @param routingKey
     * @paramexchange
     * @param bool confirm
     * @param inttimeout
     * @return bool
     * @throws Throwable
     */
    private function produceMessage(ProducerMessageInterface producerMessage,routingKey, exchange, boolconfirm = false, int timeout = 5)
    {result = false;

        this->injectMessageProperty(producerMessage, routingKey,exchange);

        delay =producerMessage->getTtl();
        if (delay>0) {message = new AMQPMessage(producerMessage->payload(), array_merge(producerMessage->getProperties(), [
                'expiration' => delay * 1000,
            ]));
        } else {message = new AMQPMessage(producerMessage->payload(),producerMessage->getProperties());
        }
        // message = new AMQPMessage(producerMessage->payload(), producerMessage->getProperties());pool = this->getConnectionPool(producerMessage->getPoolName());
        /** @var Connection connection */connection = pool->get();
        if (confirm) {
            channel =connection->getConfirmChannel();
        } else {
            channel =connection->getChannel();
        }
        channel->set_ack_handler(function () use (&result) {
            result = true;
        });

        try {
            // 检测交换机和队列this->checkExchange(channel,producerMessage);

            channel->basic_publish(message, producerMessage->getExchange(),producerMessage->getRoutingKey());
            channel->wait_for_pending_acks_returns(timeout);
        } catch (Throwable exception) {
            // Reconnect the connection before release.connection->reconnect();
            throw exception;
        } finally {connection->release();
        }

        return confirm ?result : true;
    }

    private function injectMessageProperty(ProducerMessageInterface producerMessage,routingKey, exchange)
    {producerMessage->setRoutingKey(routingKey);producerMessage->setExchange($exchange);
    }
}
  1. 接管ProducerMessage.php,继续创建ProducerMessage.php,复制以下代码进去
<?php

declare(strict_types=1);

namespace AppJob;

use HyperfAmqpConstants;
use HyperfAmqpMessageMessage;
use HyperfAmqpMessageProducerMessageInterface;

/**
 * 生产消息
 * Class Job
 * @package AppJob
 */
abstract class ProducerMessage extends Message implements ProducerMessageInterface
{
    /**
     * @var string
     */
    protected payload = '';

    /**
     * @var string
     */
    protectedroutingKey = '';

    /**
     * @var array
     */
    protected properties
        = [
            'content_type' => 'text/plain',
            'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
        ];

    public function getProperties(): array
    {
        returnthis->properties;
    }

    public function setPayload(data): self
    {this->payload = data;
        returnthis;
    }

    public function payload(): string
    {
        return this->serialize();
    }

    public function serialize(): string
    {
        return json_encode(this->payload);
    }

    /**
     * @var integer 延迟时间(秒)
     */
    protected ttl = 0;

    public function setTtl(ttl)
    {
        this->ttl =ttl;
        return this;
    }

    public function getTtl()
    {
        returnthis->ttl;
    }

    protected queue = 'default';

    public function setQueue(name)
    {
        this->queue =name;

        return this;
    }

    public function getQueue()
    {
        returnthis->queue;
    }
}
  1. 序列化数据,创建SerializeJobData.php,复制以下代码进去
<?php

declare(strict_types=1);

namespace AppJob;
use HyperfUtilsStr;

/**
 * 序列化队列数据
 * Class SerializeJobData
 * @package AppJob
 */
class SerializeJobData extends ProducerMessage
{
    public function __construct(job)
    {
        // 设置不同 poolthis->poolName = 'default';
        /**
         * 当驱动为redis时
         * use IlluminateSupportStr;
         * 'id' => Str::random(32),'attempts' => 0,
         */
        if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') {
            this->payload = [
                'displayName' => get_class(job),
                'job' => 'IlluminateQueueCallQueuedHandler@call',
                'maxTries' => isset(job->tries) ?job->tries : null,
                'timeout' => isset(job->timeout) ?job->timeout : null,
                'data' => [
                    'commandName' => get_class(job),
                    'command' => serialize(clonejob)
                ]
            ];
        } else {
            this->payload = [
                'displayName' => get_class(job),
                'job' => 'IlluminateQueueCallQueuedHandler@call',
                'maxTries' => isset(job->tries) ?job->tries : null,
                'timeout' => isset(job->timeout) ?job->timeout : null,
                'data' => [
                    'commandName' => get_class(job),
                    'command' => serialize(clonejob)
                ],
                'id' => Str::random(32),
                'attempts' => 0
            ];
        }

    }
}
  1. 创建助手函数 注意我的内容哦 按需修改
if (!function_exists('producerPushData')) {
    /**
     * 投递信息
     * @param ProducerMessageInterface message 消息
     * @param stringroutingKey 默认 default
     * @param string exchange 所投入的queue
     * @param boolconfirm 是否需要确认
     * @param int timeout 超时时间
     * @return bool
     * @throws Throwable
     */
    function producerPushData(message, routingKey = 'default',exchange = '', bool confirm = false, inttimeout = 5)
    {
        exchange = !empty(exchange) ? exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart');
        return make(Producer::class)->produce(message, routingKey,exchange, confirm,timeout);
    }
}

  1. 使用方式 注意需要和laravel/lumen 保持同样的命名空间哦

<?php

declare(strict_types=1);

use AppJobJob;

/**
 * Class TestJob
 */
class TestJob extends Job
{
    /**
     * @var
     */
    protected data;

    /**
     * TestJob constructor.
     * @paramdata
     */
    public function __construct(data)
    {this->data = $data;
    }

    /**
     * 处理逻辑
     */
    public function __handle()
    {

    }
}
use AppJobSerializeJobData;
use TestJob;
data = [];job = new TestJob(data);
producerPushData((new SerializeJobData(job)));

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=2ck0eg7nv76sk

zhaohao

大家好,欢迎来到赵豪博客!赵豪,94年生人,PHP程序员一枚,因为对PHP开发有着相对比较浓厚的兴趣,所以现在从事着PHP程序员的工作。 今天再次开通这个博客,这里将记录我的职业生涯的点点滴滴,感谢来访与关注!如果我的博客能给您带来一些帮助那真是一件非常荣幸的事情~

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

微信扫一扫

微信扫一扫

微信扫一扫,分享到朋友圈

hyperf框架使用rabbitMQ生产消息至laravel/lumen进行消费
返回顶部

显示

忘记密码?

显示

显示

获取验证码

Close