文章目录
- 需要做项目迁移时,例如laravel迁移至hyperf时,因为基本上都是一步一步迁移的,仍有例如支付回调等依旧在laravel框架中进行消费的情况。需要接管处理消息的queue进行数据格式改造,利用构造同样命名空间的job去进行投递,他会序列化数据,可以debug一下内容哦,然后投递至rabbitMQ后,laravel进行消费就好啦。其中hyperf的版本背景为2.1
- 在app下建立Job目录为例,大家可以根据情况来 在Job目录下建立Job.php,复制以下代码<?phpdeclare(strict_types=1);namespace AppJob;/** * Class Job * @package AppJob */ class Job { protected job; publicconnection; public queue; publicdelay;/** * Job constructor. */ public function __invoke() { this->job = null;this->connection = null; this->queue = null;this->delay = null; }/** * Set the desired delay for the job. * * @param DateTime|int|null delay * @returnthis */ public function delay(delay) {this->delay = delay;returnthis; }/** * Set the desired queue for the job. * * @param string|null queue * @returnthis */ public function onQueue(queue) {this->queue = queue;returnthis; }/** * Set the desired connection for the job. * * @param string|null connection * @returnthis */ public function onConnection(connection) {this->connection = connection;returnthis; }}接管Producer.php,继续创建Producer.php,复制以下代码进去<?phpdeclare(strict_types=1);namespace AppJob;use HyperfAmqpConnection; use HyperfAmqpMessageProducerMessageInterface; use PhpAmqpLibMessageAMQPMessage; use PhpAmqpLibWireAMQPTable; use HyperfAmqpBuilder;/** * 生产者 * Class Producer * @package AppJob */ class Producer extends Builder { public exchange_type; publicexchange_passive; public exchange_durable; publicexchange_auto_delete;public queue_passive; publicqueue_durable; public queue_exclusive; publicqueue_auto_delete; public queue_nowait;public function checkExchange(channel, producerMessage) {exchange = producerMessage->getExchange();queue = producerMessage->getQueue();routingKey = producerMessage->getRoutingKey();ttl = producerMessage->getTtl();this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct'); this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true); this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false); this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true);this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false); this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false);//定义交换器channel->exchange_declare(exchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);//定义队列channel->queue_declare(queue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete); //绑定队列到交换器上channel->queue_bind(queue,exchange, routingKey);if (ttl > 0) { // delayExchange = 'delayed_exchange_' .exchange; // delayQueue = 'delayed_queue_' .queue . '_' . ttl; //delayRoutingKey = routingKey .ttl; delayExchange =exchange; delayQueue =queue . '_deferred_' . ttl;delayRoutingKey = delayQueue; //定义延迟交换器channel->exchange_declare(delayExchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);//定义延迟队列channel->queue_declare(delayQueue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete, false, new AMQPTable(array( "x-dead-letter-exchange" =>exchange, "x-dead-letter-routing-key" => routingKey, "x-message-ttl" =>ttl * 1000, ))); //绑定延迟队列到交换器上 channel->queue_bind(delayQueue, delayExchange,delayRoutingKey);producerMessage->setExchange(delayExchange); producerMessage->setRoutingKey(delayRoutingKey); } }/** * @param ProducerMessageInterface producerMessage * @paramroutingKey * @param exchange * @param boolconfirm * @param int timeout * @return bool * @throws Exception * @throws Throwable */ public function produce(ProducerMessageInterfaceproducerMessage, routingKey,exchange, bool confirm = false, inttimeout = 5): bool { return retry(1, function () use (exchange,routingKey, producerMessage,confirm, timeout) { returnthis->produceMessage(producerMessage,routingKey, exchange,confirm, timeout); }); }/** * @param ProducerMessageInterfaceproducerMessage * @param routingKey * @paramexchange * @param bool confirm * @param inttimeout * @return bool * @throws Throwable */ private function produceMessage(ProducerMessageInterface producerMessage,routingKey, exchange, boolconfirm = false, int timeout = 5) {result = false;this->injectMessageProperty(producerMessage, routingKey,exchange);delay =producerMessage->getTtl(); if (delay>0) {message = new AMQPMessage(producerMessage->payload(), array_merge(producerMessage->getProperties(), [ 'expiration' => delay * 1000, ])); } else {message = new AMQPMessage(producerMessage->payload(),producerMessage->getProperties()); } // message = new AMQPMessage(producerMessage->payload(), producerMessage->getProperties());pool = this->getConnectionPool(producerMessage->getPoolName()); /** @var Connection connection */connection = pool->get(); if (confirm) { channel =connection->getConfirmChannel(); } else { channel =connection->getChannel(); } channel->set_ack_handler(function () use (&result) { result = true; });try { // 检测交换机和队列this->checkExchange(channel,producerMessage);channel->basic_publish(message, producerMessage->getExchange(),producerMessage->getRoutingKey()); channel->wait_for_pending_acks_returns(timeout); } catch (Throwable exception) { // Reconnect the connection before release.connection->reconnect(); throw exception; } finally {connection->release(); }return confirm ?result : true; }private function injectMessageProperty(ProducerMessageInterface producerMessage,routingKey, exchange) {producerMessage->setRoutingKey(routingKey);producerMessage->setExchange($exchange); } }接管ProducerMessage.php,继续创建ProducerMessage.php,复制以下代码进去<?phpdeclare(strict_types=1);namespace AppJob;use HyperfAmqpConstants; use HyperfAmqpMessageMessage; use HyperfAmqpMessageProducerMessageInterface;/** * 生产消息 * Class Job * @package AppJob */ abstract class ProducerMessage extends Message implements ProducerMessageInterface { /** * @var string */ protected payload = '';/** * @var string */ protectedroutingKey = '';/** * @var array */ protected properties = [ 'content_type' => 'text/plain', 'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT, ];public function getProperties(): array { returnthis->properties; }public function setPayload(data): self {this->payload = data; returnthis; }public function payload(): string { return this->serialize(); }public function serialize(): string { return json_encode(this->payload); }/** * @var integer 延迟时间(秒) */ protected ttl = 0;public function setTtl(ttl) { this->ttl =ttl; return this; }public function getTtl() { returnthis->ttl; }protected queue = 'default';public function setQueue(name) { this->queue =name;return this; }public function getQueue() { returnthis->queue; } }序列化数据,创建SerializeJobData.php,复制以下代码进去<?phpdeclare(strict_types=1);namespace AppJob; use HyperfUtilsStr;/** * 序列化队列数据 * Class SerializeJobData * @package AppJob */ class SerializeJobData extends ProducerMessage { public function __construct(job) { // 设置不同 poolthis->poolName = 'default'; /** * 当驱动为redis时 * use IlluminateSupportStr; * 'id' => Str::random(32),'attempts' => 0, */ if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') { this->payload = [ 'displayName' => get_class(job), 'job' => 'IlluminateQueueCallQueuedHandler@call', 'maxTries' => isset(job->tries) ?job->tries : null, 'timeout' => isset(job->timeout) ?job->timeout : null, 'data' => [ 'commandName' => get_class(job), 'command' => serialize(clonejob) ] ]; } else { this->payload = [ 'displayName' => get_class(job), 'job' => 'IlluminateQueueCallQueuedHandler@call', 'maxTries' => isset(job->tries) ?job->tries : null, 'timeout' => isset(job->timeout) ?job->timeout : null, 'data' => [ 'commandName' => get_class(job), 'command' => serialize(clonejob) ], 'id' => Str::random(32), 'attempts' => 0 ]; }} }创建助手函数 注意我的内容哦 按需修改if (!function_exists('producerPushData')) { /** * 投递信息 * @param ProducerMessageInterface message 消息 * @param stringroutingKey 默认 default * @param string exchange 所投入的queue * @param boolconfirm 是否需要确认 * @param int timeout 超时时间 * @return bool * @throws Throwable */ function producerPushData(message, routingKey = 'default',exchange = '', bool confirm = false, inttimeout = 5) { exchange = !empty(exchange) ? exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart'); return make(Producer::class)->produce(message, routingKey,exchange, confirm,timeout); } }使用方式 注意需要和laravel/lumen 保持同样的命名空间哦
需要做项目迁移时,例如laravel迁移至hyperf时,因为基本上都是一步一步迁移的,仍有例如支付回调等依旧在laravel框架中进行消费的情况。需要接管处理消息的queue进行数据格式改造,利用构造同样命名空间的job去进行投递,他会序列化数据,可以debug一下内容哦,然后投递至rabbitMQ后,laravel进行消费就好啦。其中hyperf的版本背景为2.1
- 在app下建立Job目录为例,大家可以根据情况来
- 在Job目录下建立Job.php,复制以下代码
<?php
declare(strict_types=1);
namespace AppJob;
/**
* Class Job
* @package AppJob
*/
class Job
{
protected job;
publicconnection;
public queue;
publicdelay;
/**
* Job constructor.
*/
public function __invoke()
{
this->job = null;this->connection = null;
this->queue = null;this->delay = null;
}
/**
* Set the desired delay for the job.
*
* @param DateTime|int|null delay
* @returnthis
*/
public function delay(delay)
{this->delay = delay;
returnthis;
}
/**
* Set the desired queue for the job.
*
* @param string|null queue
* @returnthis
*/
public function onQueue(queue)
{this->queue = queue;
returnthis;
}
/**
* Set the desired connection for the job.
*
* @param string|null connection
* @returnthis
*/
public function onConnection(connection)
{this->connection = connection;
returnthis;
}
}
- 接管Producer.php,继续创建Producer.php,复制以下代码进去
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfAmqpConnection;
use HyperfAmqpMessageProducerMessageInterface;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;
use HyperfAmqpBuilder;
/**
* 生产者
* Class Producer
* @package AppJob
*/
class Producer extends Builder
{
public exchange_type;
publicexchange_passive;
public exchange_durable;
publicexchange_auto_delete;
public queue_passive;
publicqueue_durable;
public queue_exclusive;
publicqueue_auto_delete;
public queue_nowait;
public function checkExchange(channel, producerMessage)
{exchange = producerMessage->getExchange();queue = producerMessage->getQueue();routingKey = producerMessage->getRoutingKey();ttl = producerMessage->getTtl();this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct');
this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true);
this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false);
this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true);this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false);
this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false);
//定义交换器channel->exchange_declare(exchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);
//定义队列channel->queue_declare(queue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete);
//绑定队列到交换器上channel->queue_bind(queue,exchange, routingKey);
if (ttl > 0) {
// delayExchange = 'delayed_exchange_' .exchange;
// delayQueue = 'delayed_queue_' .queue . '_' . ttl;
//delayRoutingKey = routingKey .ttl;
delayExchange =exchange;
delayQueue =queue . '_deferred_' . ttl;delayRoutingKey = delayQueue;
//定义延迟交换器channel->exchange_declare(delayExchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);
//定义延迟队列channel->queue_declare(delayQueue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete, false, new AMQPTable(array(
"x-dead-letter-exchange" =>exchange,
"x-dead-letter-routing-key" => routingKey,
"x-message-ttl" =>ttl * 1000,
)));
//绑定延迟队列到交换器上
channel->queue_bind(delayQueue, delayExchange,delayRoutingKey);
producerMessage->setExchange(delayExchange);
producerMessage->setRoutingKey(delayRoutingKey);
}
}
/**
* @param ProducerMessageInterface producerMessage
* @paramroutingKey
* @param exchange
* @param boolconfirm
* @param int timeout
* @return bool
* @throws Exception
* @throws Throwable
*/
public function produce(ProducerMessageInterfaceproducerMessage, routingKey,exchange, bool confirm = false, inttimeout = 5): bool
{
return retry(1, function () use (exchange,routingKey, producerMessage,confirm, timeout) {
returnthis->produceMessage(producerMessage,routingKey, exchange,confirm, timeout);
});
}
/**
* @param ProducerMessageInterfaceproducerMessage
* @param routingKey
* @paramexchange
* @param bool confirm
* @param inttimeout
* @return bool
* @throws Throwable
*/
private function produceMessage(ProducerMessageInterface producerMessage,routingKey, exchange, boolconfirm = false, int timeout = 5)
{result = false;
this->injectMessageProperty(producerMessage, routingKey,exchange);
delay =producerMessage->getTtl();
if (delay>0) {message = new AMQPMessage(producerMessage->payload(), array_merge(producerMessage->getProperties(), [
'expiration' => delay * 1000,
]));
} else {message = new AMQPMessage(producerMessage->payload(),producerMessage->getProperties());
}
// message = new AMQPMessage(producerMessage->payload(), producerMessage->getProperties());pool = this->getConnectionPool(producerMessage->getPoolName());
/** @var Connection connection */connection = pool->get();
if (confirm) {
channel =connection->getConfirmChannel();
} else {
channel =connection->getChannel();
}
channel->set_ack_handler(function () use (&result) {
result = true;
});
try {
// 检测交换机和队列this->checkExchange(channel,producerMessage);
channel->basic_publish(message, producerMessage->getExchange(),producerMessage->getRoutingKey());
channel->wait_for_pending_acks_returns(timeout);
} catch (Throwable exception) {
// Reconnect the connection before release.connection->reconnect();
throw exception;
} finally {connection->release();
}
return confirm ?result : true;
}
private function injectMessageProperty(ProducerMessageInterface producerMessage,routingKey, exchange)
{producerMessage->setRoutingKey(routingKey);producerMessage->setExchange($exchange);
}
}
- 接管ProducerMessage.php,继续创建ProducerMessage.php,复制以下代码进去
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfAmqpConstants;
use HyperfAmqpMessageMessage;
use HyperfAmqpMessageProducerMessageInterface;
/**
* 生产消息
* Class Job
* @package AppJob
*/
abstract class ProducerMessage extends Message implements ProducerMessageInterface
{
/**
* @var string
*/
protected payload = '';
/**
* @var string
*/
protectedroutingKey = '';
/**
* @var array
*/
protected properties
= [
'content_type' => 'text/plain',
'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
];
public function getProperties(): array
{
returnthis->properties;
}
public function setPayload(data): self
{this->payload = data;
returnthis;
}
public function payload(): string
{
return this->serialize();
}
public function serialize(): string
{
return json_encode(this->payload);
}
/**
* @var integer 延迟时间(秒)
*/
protected ttl = 0;
public function setTtl(ttl)
{
this->ttl =ttl;
return this;
}
public function getTtl()
{
returnthis->ttl;
}
protected queue = 'default';
public function setQueue(name)
{
this->queue =name;
return this;
}
public function getQueue()
{
returnthis->queue;
}
}
- 序列化数据,创建SerializeJobData.php,复制以下代码进去
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfUtilsStr;
/**
* 序列化队列数据
* Class SerializeJobData
* @package AppJob
*/
class SerializeJobData extends ProducerMessage
{
public function __construct(job)
{
// 设置不同 poolthis->poolName = 'default';
/**
* 当驱动为redis时
* use IlluminateSupportStr;
* 'id' => Str::random(32),'attempts' => 0,
*/
if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') {
this->payload = [
'displayName' => get_class(job),
'job' => 'IlluminateQueueCallQueuedHandler@call',
'maxTries' => isset(job->tries) ?job->tries : null,
'timeout' => isset(job->timeout) ?job->timeout : null,
'data' => [
'commandName' => get_class(job),
'command' => serialize(clonejob)
]
];
} else {
this->payload = [
'displayName' => get_class(job),
'job' => 'IlluminateQueueCallQueuedHandler@call',
'maxTries' => isset(job->tries) ?job->tries : null,
'timeout' => isset(job->timeout) ?job->timeout : null,
'data' => [
'commandName' => get_class(job),
'command' => serialize(clonejob)
],
'id' => Str::random(32),
'attempts' => 0
];
}
}
}
- 创建助手函数 注意我的内容哦 按需修改
if (!function_exists('producerPushData')) {
/**
* 投递信息
* @param ProducerMessageInterface message 消息
* @param stringroutingKey 默认 default
* @param string exchange 所投入的queue
* @param boolconfirm 是否需要确认
* @param int timeout 超时时间
* @return bool
* @throws Throwable
*/
function producerPushData(message, routingKey = 'default',exchange = '', bool confirm = false, inttimeout = 5)
{
exchange = !empty(exchange) ? exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart');
return make(Producer::class)->produce(message, routingKey,exchange, confirm,timeout);
}
}
- 使用方式 注意需要和laravel/lumen 保持同样的命名空间哦
<?php
declare(strict_types=1);
namespace AppJob;
/**
* Class Job
* @package AppJob
*/
class Job
{
protected job;
publicconnection;
public queue;
publicdelay;
/**
* Job constructor.
*/
public function __invoke()
{
this->job = null;this->connection = null;
this->queue = null;this->delay = null;
}
/**
* Set the desired delay for the job.
*
* @param DateTime|int|null delay
* @returnthis
*/
public function delay(delay)
{this->delay = delay;
returnthis;
}
/**
* Set the desired queue for the job.
*
* @param string|null queue
* @returnthis
*/
public function onQueue(queue)
{this->queue = queue;
returnthis;
}
/**
* Set the desired connection for the job.
*
* @param string|null connection
* @returnthis
*/
public function onConnection(connection)
{this->connection = connection;
returnthis;
}
}
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfAmqpConnection;
use HyperfAmqpMessageProducerMessageInterface;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;
use HyperfAmqpBuilder;
/**
* 生产者
* Class Producer
* @package AppJob
*/
class Producer extends Builder
{
public exchange_type;
publicexchange_passive;
public exchange_durable;
publicexchange_auto_delete;
public queue_passive;
publicqueue_durable;
public queue_exclusive;
publicqueue_auto_delete;
public queue_nowait;
public function checkExchange(channel, producerMessage)
{exchange = producerMessage->getExchange();queue = producerMessage->getQueue();routingKey = producerMessage->getRoutingKey();ttl = producerMessage->getTtl();this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct');
this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true);
this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false);this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false);
this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true);this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false);
this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false);
//定义交换器channel->exchange_declare(exchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);
//定义队列channel->queue_declare(queue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete);
//绑定队列到交换器上channel->queue_bind(queue,exchange, routingKey);
if (ttl > 0) {
// delayExchange = 'delayed_exchange_' .exchange;
// delayQueue = 'delayed_queue_' .queue . '_' . ttl;
//delayRoutingKey = routingKey .ttl;
delayExchange =exchange;
delayQueue =queue . '_deferred_' . ttl;delayRoutingKey = delayQueue;
//定义延迟交换器channel->exchange_declare(delayExchange,this->exchange_type, this->exchange_passive,this->exchange_durable, this->exchange_auto_delete);
//定义延迟队列channel->queue_declare(delayQueue,this->queue_passive, this->queue_durable,this->queue_exclusive, this->queue_auto_delete, false, new AMQPTable(array(
"x-dead-letter-exchange" =>exchange,
"x-dead-letter-routing-key" => routingKey,
"x-message-ttl" =>ttl * 1000,
)));
//绑定延迟队列到交换器上
channel->queue_bind(delayQueue, delayExchange,delayRoutingKey);
producerMessage->setExchange(delayExchange);
producerMessage->setRoutingKey(delayRoutingKey);
}
}
/**
* @param ProducerMessageInterface producerMessage
* @paramroutingKey
* @param exchange
* @param boolconfirm
* @param int timeout
* @return bool
* @throws Exception
* @throws Throwable
*/
public function produce(ProducerMessageInterfaceproducerMessage, routingKey,exchange, bool confirm = false, inttimeout = 5): bool
{
return retry(1, function () use (exchange,routingKey, producerMessage,confirm, timeout) {
returnthis->produceMessage(producerMessage,routingKey, exchange,confirm, timeout);
});
}
/**
* @param ProducerMessageInterfaceproducerMessage
* @param routingKey
* @paramexchange
* @param bool confirm
* @param inttimeout
* @return bool
* @throws Throwable
*/
private function produceMessage(ProducerMessageInterface producerMessage,routingKey, exchange, boolconfirm = false, int timeout = 5)
{result = false;
this->injectMessageProperty(producerMessage, routingKey,exchange);
delay =producerMessage->getTtl();
if (delay>0) {message = new AMQPMessage(producerMessage->payload(), array_merge(producerMessage->getProperties(), [
'expiration' => delay * 1000,
]));
} else {message = new AMQPMessage(producerMessage->payload(),producerMessage->getProperties());
}
// message = new AMQPMessage(producerMessage->payload(), producerMessage->getProperties());pool = this->getConnectionPool(producerMessage->getPoolName());
/** @var Connection connection */connection = pool->get();
if (confirm) {
channel =connection->getConfirmChannel();
} else {
channel =connection->getChannel();
}
channel->set_ack_handler(function () use (&result) {
result = true;
});
try {
// 检测交换机和队列this->checkExchange(channel,producerMessage);
channel->basic_publish(message, producerMessage->getExchange(),producerMessage->getRoutingKey());
channel->wait_for_pending_acks_returns(timeout);
} catch (Throwable exception) {
// Reconnect the connection before release.connection->reconnect();
throw exception;
} finally {connection->release();
}
return confirm ?result : true;
}
private function injectMessageProperty(ProducerMessageInterface producerMessage,routingKey, exchange)
{producerMessage->setRoutingKey(routingKey);producerMessage->setExchange($exchange);
}
}
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfAmqpConstants;
use HyperfAmqpMessageMessage;
use HyperfAmqpMessageProducerMessageInterface;
/**
* 生产消息
* Class Job
* @package AppJob
*/
abstract class ProducerMessage extends Message implements ProducerMessageInterface
{
/**
* @var string
*/
protected payload = '';
/**
* @var string
*/
protectedroutingKey = '';
/**
* @var array
*/
protected properties
= [
'content_type' => 'text/plain',
'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
];
public function getProperties(): array
{
returnthis->properties;
}
public function setPayload(data): self
{this->payload = data;
returnthis;
}
public function payload(): string
{
return this->serialize();
}
public function serialize(): string
{
return json_encode(this->payload);
}
/**
* @var integer 延迟时间(秒)
*/
protected ttl = 0;
public function setTtl(ttl)
{
this->ttl =ttl;
return this;
}
public function getTtl()
{
returnthis->ttl;
}
protected queue = 'default';
public function setQueue(name)
{
this->queue =name;
return this;
}
public function getQueue()
{
returnthis->queue;
}
}
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfUtilsStr;
/**
* 序列化队列数据
* Class SerializeJobData
* @package AppJob
*/
class SerializeJobData extends ProducerMessage
{
public function __construct(job)
{
// 设置不同 poolthis->poolName = 'default';
/**
* 当驱动为redis时
* use IlluminateSupportStr;
* 'id' => Str::random(32),'attempts' => 0,
*/
if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') {
this->payload = [
'displayName' => get_class(job),
'job' => 'IlluminateQueueCallQueuedHandler@call',
'maxTries' => isset(job->tries) ?job->tries : null,
'timeout' => isset(job->timeout) ?job->timeout : null,
'data' => [
'commandName' => get_class(job),
'command' => serialize(clonejob)
]
];
} else {
this->payload = [
'displayName' => get_class(job),
'job' => 'IlluminateQueueCallQueuedHandler@call',
'maxTries' => isset(job->tries) ?job->tries : null,
'timeout' => isset(job->timeout) ?job->timeout : null,
'data' => [
'commandName' => get_class(job),
'command' => serialize(clonejob)
],
'id' => Str::random(32),
'attempts' => 0
];
}
}
}
if (!function_exists('producerPushData')) {
/**
* 投递信息
* @param ProducerMessageInterface message 消息
* @param stringroutingKey 默认 default
* @param string exchange 所投入的queue
* @param boolconfirm 是否需要确认
* @param int timeout 超时时间
* @return bool
* @throws Throwable
*/
function producerPushData(message, routingKey = 'default',exchange = '', bool confirm = false, inttimeout = 5)
{
exchange = !empty(exchange) ? exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart');
return make(Producer::class)->produce(message, routingKey,exchange, confirm,timeout);
}
}
<?php
declare(strict_types=1);
use AppJobJob;
/**
* Class TestJob
*/
class TestJob extends Job
{
/**
* @var
*/
protected data;
/**
* TestJob constructor.
* @paramdata
*/
public function __construct(data)
{this->data = $data;
}
/**
* 处理逻辑
*/
public function __handle()
{
}
}
use AppJobSerializeJobData;
use TestJob;
data = [];job = new TestJob(data);
producerPushData((new SerializeJobData(job)));
<?php
declare(strict_types=1);
use AppJobJob;
/**
* Class TestJob
*/
class TestJob extends Job
{
/**
* @var
*/
protected data;
/**
* TestJob constructor.
* @paramdata
*/
public function __construct(data)
{this->data = $data;
}
/**
* 处理逻辑
*/
public function __handle()
{
}
}
use AppJobSerializeJobData;
use TestJob;
data = [];job = new TestJob(data);
producerPushData((new SerializeJobData(job)));
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=2ck0eg7nv76sk



