workerman/redis-queue
Message queue based on Redis that supports delayed message processing.
Project Address:
https://github.com/walkor/redis-queue
Installation:
composer require workerman/redis-queue
Example
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Subscribe
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Subscribe
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Send messages to the queue at regular intervals
Timer::add(1, function() use ($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
__construct (string $address, [array $options])
Create an instance
-
$addresssimilar toredis://ip:6379, must start with redis. -
$optionsincludes the following options:auth: authentication information, default is ‘’db: database, default is 0max_attempts: number of retry attempts after consumption failure, default is 5retry_seconds: retry interval in seconds, default is 5
Consumption failure means that the business throws an exception
ExceptionorError. After consumption failure, the message will be placed in the delay queue for retry. The number of retries is controlled bymax_attempts, and the retry interval is jointly controlled byretry_secondsandmax_attempts. For example, ifmax_attemptsis 5 andretry_secondsis 10, the interval for the first retry is1*10seconds, the interval for the second retry is2*10seconds, and so on, until the fifth retry. If the number of retries exceeds themax_attempts, the message is placed in the failed queue with the key{redis-queue}-failed(before version 1.0.5, it wasredis-queue-failed).
send(String $queue, Mixed $data, [int $dely=0])
Send a message to the queue
$queuequeue name, typeString$dataspecific message being published, can be an array or a string, typeMixed$delydelay in consumption time, default is 0, typeInt
subscribe(mixed $queue, callable $callback)
Subscribe to a single queue or multiple queues
$queuequeue name, can be a string or an array containing multiple queue names$callbackcallback function in the formatfunction (Mixed $data), where$datais the same as$datainsend($queue, $data).
unsubscribe(mixed $queue)
Unsubscribe from a single queue or multiple queues
$queuequeue name or an array containing multiple queue names
Sending messages to the queue in a non-workerman environment
Sometimes, some projects run in an Apache or PHP-FPM environment and cannot use the workerman/redis-queue project. You can refer to the following function to implement sending:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //Before version 1.0.5, it was redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; //Before version 1.0.5, it was redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Where the parameter $redis is the redis instance. For example, the usage of the redis extension is similar to the following:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);