一、前言
使用第三方库进行短信发送等,为了响应速度,失败延迟重试等,决定使用消息队列。
二、 使用
php-resque 在网上有不少教程在此便不加以累赘 ,参考地址: PHP-resque使用经验总结
三、结构分析
从push与pop方法中可看出,队列中使用redis的List(列表)右侧插入数据,左侧取出数据。
1 |
|
四、 结构设计
List结构不能完成延迟执行这一功能,为此新增两个redis key 分别为SortedSet(有序集合) 与Hash(哈希表),
将队列数据放在Hash表中, 将数据的Key 放入 SortedSet 中 其中 到达可执行的时间戳为Sort ,当 Sort 小于当前时间时,则将Hash 表中的数据移动至 List 队列中右侧,排队执行
五、 代码实现
Resque_Job 中
$this->instance->queue = $this->queue;
后插入
1 | //执行次数 |
Resque_Job 中插入3个方法
1 |
|
- reidis 支持
1 | class Resque_Redis extends Redisent { |
1 | class Resque_RedisCluster extends RedisentCluster{ |
- Resque_Worker 每次判断是否有到达时间的Job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30class Resque_Worker{
public function work($interval = 5)
{
$this->updateProcLine('Starting');
$this->startup();
while(true) {
if($this->shutdown) {
break;
}
$this->loadDelay();// 加载延迟执行
...
}
}
/**
* 延迟执行 移动队列位置
*/
public function loadDelay(){
$queues = $this->queues();
if($queues){
foreach ($queues as $queue){
$this->log('Checking Delay' . $queue, self::LOG_VERBOSE);
Resque_Job::loadDelay($queue);
}
}
}
}
class Resque 中加入3个方法
1 | class Resque{ |
- 调用
1 | //Job中失败时 |