1
1
<?php
2
+
2
3
/**
3
4
* Base Resque class.
4
5
*
@@ -10,7 +11,7 @@ class Resque
10
11
{
11
12
const VERSION = '1.2 ' ;
12
13
13
- const DEFAULT_INTERVAL = 5 ;
14
+ const DEFAULT_INTERVAL = 5 ;
14
15
15
16
/**
16
17
* @var Resque_Redis Instance of Resque_Redis that talks to redis.
@@ -28,9 +29,9 @@ class Resque
28
29
*/
29
30
protected static $ redisDatabase = 0 ;
30
31
31
- /**
32
- * @var string auth of Redis database
33
- */
32
+ /**
33
+ * @var string auth of Redis database
34
+ */
34
35
protected static $ auth ;
35
36
36
37
/**
@@ -42,7 +43,7 @@ class Resque
42
43
* and returns a Resque_Redis instance, or
43
44
* a nested array of servers with host/port pairs.
44
45
* @param int $database
45
- * @param string $auth
46
+ * @param string $auth
46
47
*/
47
48
public static function setBackend ($ server , $ database = 0 , $ auth = null )
48
49
{
@@ -70,8 +71,8 @@ public static function redis()
70
71
}
71
72
72
73
if (!empty (self ::$ auth )) {
73
- self ::$ redis ->auth (self ::$ auth );
74
- }
74
+ self ::$ redis ->auth (self ::$ auth );
75
+ }
75
76
76
77
return self ::$ redis ;
77
78
}
@@ -87,7 +88,7 @@ public static function redis()
87
88
*/
88
89
public static function fork ()
89
90
{
90
- if (!function_exists ('pcntl_fork ' )) {
91
+ if (!function_exists ('pcntl_fork ' )) {
91
92
return false ;
92
93
}
93
94
@@ -96,7 +97,7 @@ public static function fork()
96
97
self ::$ redis = null ;
97
98
98
99
$ pid = pcntl_fork ();
99
- if ($ pid === -1 ) {
100
+ if ($ pid === -1 ) {
100
101
throw new RuntimeException ('Unable to fork child worker. ' );
101
102
}
102
103
@@ -133,9 +134,9 @@ public static function push($queue, $item)
133
134
*/
134
135
public static function pop ($ queue )
135
136
{
136
- $ item = self ::redis ()->lpop ('queue: ' . $ queue );
137
+ $ item = self ::redis ()->lpop ('queue: ' . $ queue );
137
138
138
- if (!$ item ) {
139
+ if (!$ item ) {
139
140
return ;
140
141
}
141
142
@@ -149,13 +150,13 @@ public static function pop($queue)
149
150
* @param array $items
150
151
* @return integer number of deleted items
151
152
*/
152
- public static function dequeue ($ queue , $ items = Array ())
153
+ public static function dequeue ($ queue , $ items = array ())
153
154
{
154
- if (count ($ items ) > 0 ) {
155
+ if (count ($ items ) > 0 ) {
155
156
return self ::removeItems ($ queue , $ items );
156
- } else {
157
+ } else {
157
158
return self ::removeList ($ queue );
158
- }
159
+ }
159
160
}
160
161
161
162
/**
@@ -166,9 +167,9 @@ public static function dequeue($queue, $items = Array())
166
167
*/
167
168
public static function removeQueue ($ queue )
168
169
{
169
- $ num = self ::removeList ($ queue );
170
- self ::redis ()->srem ('queues ' , $ queue );
171
- return $ num ;
170
+ $ num = self ::removeList ($ queue );
171
+ self ::redis ()->srem ('queues ' , $ queue );
172
+ return $ num ;
172
173
}
173
174
174
175
/**
@@ -181,28 +182,28 @@ public static function removeQueue($queue)
181
182
*/
182
183
public static function blpop (array $ queues , $ timeout )
183
184
{
184
- $ list = array ();
185
- foreach ($ queues AS $ queue ) {
186
- $ list [] = 'queue: ' . $ queue ;
187
- }
185
+ $ list = array ();
186
+ foreach ($ queues as $ queue ) {
187
+ $ list [] = 'queue: ' . $ queue ;
188
+ }
188
189
189
- $ item = self ::redis ()->blpop ($ list , (int )$ timeout );
190
+ $ item = self ::redis ()->blpop ($ list , (int )$ timeout );
190
191
191
- if (!$ item ) {
192
- return ;
193
- }
192
+ if (!$ item ) {
193
+ return ;
194
+ }
194
195
195
- /**
196
- * Normally the Resque_Redis class returns queue names without the prefix
197
- * But the blpop is a bit different. It returns the name as prefix:queue:name
198
- * So we need to strip off the prefix:queue: part
199
- */
200
- $ queue = substr ($ item [0 ], strlen (self ::redis ()->getPrefix () . 'queue: ' ));
196
+ /**
197
+ * Normally the Resque_Redis class returns queue names without the prefix
198
+ * But the blpop is a bit different. It returns the name as prefix:queue:name
199
+ * So we need to strip off the prefix:queue: part
200
+ */
201
+ $ queue = substr ($ item [0 ], strlen (self ::redis ()->getPrefix () . 'queue: ' ));
201
202
202
- return array (
203
+ return array (
203
204
'queue ' => $ queue ,
204
205
'payload ' => json_decode ($ item [1 ], true )
205
- );
206
+ );
206
207
}
207
208
208
209
/**
@@ -239,8 +240,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
239
240
);
240
241
try {
241
242
Resque_Event::trigger ('beforeEnqueue ' , $ hookParams );
242
- }
243
- catch (Resque_Job_DontCreate $ e ) {
243
+ } catch (Resque_Job_DontCreate $ e ) {
244
244
return false ;
245
245
}
246
246
@@ -269,7 +269,7 @@ public static function reserve($queue)
269
269
public static function queues ()
270
270
{
271
271
$ queues = self ::redis ()->smembers ('queues ' );
272
- if (!is_array ($ queues )) {
272
+ if (!is_array ($ queues )) {
273
273
$ queues = array ();
274
274
}
275
275
return $ queues ;
@@ -283,7 +283,7 @@ public static function queues()
283
283
public static function items ($ queue , $ start = 0 , $ stop = -1 )
284
284
{
285
285
$ list = self ::redis ()->lrange ('queue: ' . $ queue , $ start , $ stop );
286
- if (!is_array ($ list )) {
286
+ if (!is_array ($ list )) {
287
287
$ list = array ();
288
288
}
289
289
return $ list ;
@@ -301,20 +301,20 @@ public static function items($queue, $start = 0, $stop = -1)
301
301
* @param array $items
302
302
* @return integer number of deleted items
303
303
*/
304
- private static function removeItems ($ queue , $ items = Array ())
304
+ private static function removeItems ($ queue , $ items = array ())
305
305
{
306
306
$ counter = 0 ;
307
- $ originalQueue = 'queue: ' . $ queue ;
308
- $ tempQueue = $ originalQueue . ':temp: ' . time ();
309
- $ requeueQueue = $ tempQueue . ':requeue ' ;
307
+ $ originalQueue = 'queue: ' . $ queue ;
308
+ $ tempQueue = $ originalQueue . ':temp: ' . time ();
309
+ $ requeueQueue = $ tempQueue . ':requeue ' ;
310
310
311
311
// move each item from original queue to temp queue and process it
312
312
$ finished = false ;
313
313
while (!$ finished ) {
314
314
$ string = self ::redis ()->rpoplpush ($ originalQueue , self ::redis ()->getPrefix () . $ tempQueue );
315
315
316
316
if (!empty ($ string )) {
317
- if (self ::matchItem ($ string , $ items )) {
317
+ if (self ::matchItem ($ string , $ items )) {
318
318
self ::redis ()->rpop ($ tempQueue );
319
319
$ counter ++;
320
320
} else {
@@ -328,9 +328,9 @@ private static function removeItems($queue, $items = Array())
328
328
// move back from temp queue to original queue
329
329
$ finished = false ;
330
330
while (!$ finished ) {
331
- $ string = self ::redis ()->rpoplpush ($ requeueQueue , self ::redis ()->getPrefix () .$ originalQueue );
331
+ $ string = self ::redis ()->rpoplpush ($ requeueQueue , self ::redis ()->getPrefix () . $ originalQueue );
332
332
if (empty ($ string )) {
333
- $ finished = true ;
333
+ $ finished = true ;
334
334
}
335
335
}
336
336
@@ -353,29 +353,31 @@ private static function removeItems($queue, $items = Array())
353
353
*/
354
354
private static function matchItem ($ string , $ items )
355
355
{
356
- $ decoded = json_decode ($ string , true );
356
+ $ decoded = json_decode ($ string , true );
357
357
358
- foreach ($ items as $ key => $ val ) {
358
+ foreach ($ items as $ key => $ val ) {
359
359
# class name only ex: item[0] = ['class']
360
360
if (is_numeric ($ key )) {
361
- if ($ decoded ['class ' ] == $ val ) {
361
+ if ($ decoded ['class ' ] == $ val ) {
362
362
return true ;
363
- }
363
+ }
364
364
# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
365
365
} elseif (is_array ($ val )) {
366
- $ decodedArgs = (array )$ decoded ['args ' ][0 ];
367
- if ($ decoded ['class ' ] == $ key &&
368
- count ($ decodedArgs ) > 0 && count (array_diff ($ decodedArgs , $ val )) == 0 ) {
366
+ $ decodedArgs = (array )$ decoded ['args ' ][0 ];
367
+ if (
368
+ $ decoded ['class ' ] == $ key &&
369
+ count ($ decodedArgs ) > 0 && count (array_diff ($ decodedArgs , $ val )) == 0
370
+ ) {
369
371
return true ;
370
372
}
371
373
# class name with ID, example: item[0] = ['class' => 'id']
372
374
} else {
373
- if ($ decoded ['class ' ] == $ key && $ decoded ['id ' ] == $ val ) {
375
+ if ($ decoded ['class ' ] == $ key && $ decoded ['id ' ] == $ val ) {
374
376
return true ;
375
- }
377
+ }
376
378
}
377
- }
378
- return false ;
379
+ }
380
+ return false ;
379
381
}
380
382
381
383
/**
@@ -388,9 +390,9 @@ private static function matchItem($string, $items)
388
390
*/
389
391
private static function removeList ($ queue )
390
392
{
391
- $ counter = self ::size ($ queue );
392
- $ result = self ::redis ()->del ('queue: ' . $ queue );
393
- return ($ result == 1 ) ? $ counter : 0 ;
393
+ $ counter = self ::size ($ queue );
394
+ $ result = self ::redis ()->del ('queue: ' . $ queue );
395
+ return ($ result == 1 ) ? $ counter : 0 ;
394
396
}
395
397
396
398
/*
0 commit comments