Skip to content

Commit 132531e

Browse files
committed
WIP
1 parent ff0d2bc commit 132531e

File tree

7 files changed

+152
-104
lines changed

7 files changed

+152
-104
lines changed

bin/resque

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#!/usr/bin/env php
21
<?php
32

43
// Find and initialize Composer

demo/job.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ class PHP_Job
33
{
44
public function perform()
55
{
6-
sleep(120);
7-
fwrite(STDOUT, 'Hello!');
6+
fwrite(STDOUT, 'Start job! -> ');
7+
sleep(1);
8+
fwrite(STDOUT, 'Job ended!' . PHP_EOL);
89
}
910
}
1011
?>

demo/queue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@
1414
),
1515
);
1616

17-
$jobId = Resque::enqueue('default', $argv[1], $args, true);
17+
$jobId = Resque::enqueue($argv[1], $argv[2], $args, true);
1818
echo "Queued job ".$jobId."\n\n";
1919
?>

lib/Resque.php

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,21 +110,40 @@ public static function push($queue, $item)
110110
* @param string $queue The name of the queue to fetch an item from.
111111
* @return array Decoded item from the queue.
112112
*/
113-
public static function pop($queue, $interval = null)
113+
public static function pop($queue)
114114
{
115-
if($interval == null) {
116-
$item = self::redis()->lpop('queue:' . $queue);
117-
} else {
118-
$item = self::redis()->blpop('queue:' . $queue, $interval ? $interval : Resque::DEFAULT_INTERVAL);
119-
}
115+
$item = self::redis()->lpop('queue:' . $queue);
120116

121117
if(!$item) {
122118
return;
123119
}
124120

125-
return json_decode($interval == 0 ? $item : $item[1], true);
121+
return json_decode($item, true);
126122
}
127123

124+
/**
125+
* Pop an item off the end of the specified queue, decode it and
126+
* return it.
127+
*
128+
* @param string $queue The name of the queue to fetch an item from.
129+
* @return array Decoded item from the queue.
130+
*/
131+
public static function blpop($queues, $interval = null)
132+
{
133+
$list = array();
134+
foreach($queues AS $queue) {
135+
$list[] = 'queue:' . $queue;
136+
}
137+
138+
$item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL);
139+
140+
if(!$item) {
141+
return;
142+
}
143+
144+
return json_decode($item[1], true);
145+
}
146+
128147
/**
129148
* Return the size (number of pending jobs) of the specified queue.
130149
*

lib/Resque/Job.php

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,41 @@ public static function create($queue, $class, $args = null, $monitor = false)
7171
return $id;
7272
}
7373

74-
/**
75-
* Find the next available job from the specified queue and return an
76-
* instance of Resque_Job for it.
77-
*
78-
* @param string $queue The name of the queue to check for a job in.
79-
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
80-
*/
81-
public static function reserve($queue, $interval = null)
82-
{
83-
$payload = Resque::pop($queue, $interval);
84-
if(!is_array($payload)) {
85-
return false;
86-
}
74+
/**
75+
* Find the next available job from the specified queue and return an
76+
* instance of Resque_Job for it.
77+
*
78+
* @param string $queue The name of the queue to check for a job in.
79+
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
80+
*/
81+
public static function reserve($queue)
82+
{
83+
$payload = Resque::pop($queue);
84+
if(!is_array($payload)) {
85+
return false;
86+
}
8787

88-
return new Resque_Job($queue, $payload);
89-
}
88+
return new Resque_Job($queue, $payload);
89+
}
90+
91+
/**
92+
* Find the next available job from the specified queue and return an
93+
* instance of Resque_Job for it.
94+
*
95+
* @param string $queue The name of the queue to check for a job in.
96+
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
97+
*/
98+
public static function reserveBlocking($queues, $interval = null)
99+
{
100+
$payload = Resque::blpop($queues, $interval);
101+
if(!is_array($payload)) {
102+
return false;
103+
}
104+
105+
var_dump($payload);
106+
107+
return new Resque_Job($payload->queue, $payload);
108+
}
90109

91110
/**
92111
* Update the status of the current job.

lib/Resque/Redis.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,13 @@ public function __construct($server, $database = null)
143143
*/
144144
public function __call($name, $args) {
145145
if(in_array($name, $this->keyCommands)) {
146-
$args[0] = self::$defaultNamespace . $args[0];
146+
if(is_array($args[0])) {
147+
foreach($args[0] AS $i => $v) {
148+
$args[0][$i] = self::$defaultNamespace . $v;
149+
}
150+
} else {
151+
$args[0] = self::$defaultNamespace . $args[0];
152+
}
147153
}
148154
try {
149155
return $this->driver->__call($name, $args);

lib/Resque/Worker.php

Lines changed: 81 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -152,70 +152,53 @@ public function work($interval = Resque::DEFAULT_INTERVAL)
152152
$this->updateProcLine('Starting');
153153
$this->startup();
154154

155-
while(true) {
156-
if($this->shutdown) {
157-
break;
158-
}
155+
while($job = $this->reserveBlocking($interval)) {
156+
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
159157

160-
// Attempt to find and reserve a job
161-
$job = false;
162-
if(!$this->paused) {
163-
$job = $this->reserve($interval);
164-
}
158+
$this->log('got ' . $job);
159+
Resque_Event::trigger('beforeFork', $job);
160+
$this->workingOn($job);
165161

166-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
162+
$this->child = Resque::fork();
167163

168-
if(!$job) {
169-
// For an interval of 0, break now - helps with unit testing etc
170-
if($interval == 0) {
171-
break;
164+
// Forked and we're the child. Run the job.
165+
if ($this->child === 0 || $this->child === false) {
166+
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
167+
$this->updateProcLine($status);
168+
$this->log($status, self::LOG_VERBOSE);
169+
$this->perform($job);
170+
if ($this->child === 0) {
171+
exit(0);
172172
}
173+
}
173174

174-
// If no job was found, we sleep for $interval before continuing and checking again
175-
if($this->paused) {
176-
$this->updateProcLine('Paused');
177-
usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
175+
if($this->child > 0) {
176+
// Parent process, sit and wait
177+
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
178+
$this->updateProcLine($status);
179+
$this->log($status, self::LOG_VERBOSE);
180+
181+
// Wait until the child process finishes before continuing
182+
pcntl_wait($status);
183+
$exitStatus = pcntl_wexitstatus($status);
184+
if($exitStatus !== 0) {
185+
$job->fail(new Resque_Job_DirtyExitException(
186+
'Job exited with exit code ' . $exitStatus
187+
));
178188
}
179-
180-
continue;
181189
}
182190

183-
$this->log('got ' . $job);
184-
Resque_Event::trigger('beforeFork', $job);
185-
$this->workingOn($job);
186-
187-
$this->child = Resque::fork();
188-
189-
// Forked and we're the child. Run the job.
190-
if ($this->child === 0 || $this->child === false) {
191-
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
192-
$this->updateProcLine($status);
193-
$this->log($status, self::LOG_VERBOSE);
194-
$this->perform($job);
195-
if ($this->child === 0) {
196-
exit(0);
197-
}
198-
}
199-
200-
if($this->child > 0) {
201-
// Parent process, sit and wait
202-
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
203-
$this->updateProcLine($status);
204-
$this->log($status, self::LOG_VERBOSE);
205-
206-
// Wait until the child process finishes before continuing
207-
pcntl_wait($status);
208-
$exitStatus = pcntl_wexitstatus($status);
209-
if($exitStatus !== 0) {
210-
$job->fail(new Resque_Job_DirtyExitException(
211-
'Job exited with exit code ' . $exitStatus
212-
));
213-
}
214-
}
191+
$this->child = null;
192+
$this->doneWorking();
215193

216-
$this->child = null;
217-
$this->doneWorking();
218-
}
194+
if($this->shutdown) {
195+
break;
196+
}
197+
198+
if($this->paused) {
199+
break;
200+
}
201+
}
219202

220203
$this->unregisterWorker();
221204
}
@@ -241,28 +224,49 @@ public function perform(Resque_Job $job)
241224
$this->log('done ' . $job);
242225
}
243226

244-
/**
245-
* Attempt to find a job from the top of one of the queues for this worker.
246-
*
247-
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
248-
*/
249-
public function reserve($interval = null)
250-
{
251-
$queues = $this->queues();
252-
if(!is_array($queues)) {
253-
return;
254-
}
255-
foreach($queues as $queue) {
256-
$this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE);
257-
$job = Resque_Job::reserve($queue, $interval);
258-
if($job) {
259-
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
260-
return $job;
261-
}
262-
}
263-
264-
return false;
265-
}
227+
/**
228+
* Attempt to find a job from the top of one of the queues for this worker.
229+
*
230+
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
231+
*/
232+
public function reserve()
233+
{
234+
$queues = $this->queues();
235+
if(!is_array($queues)) {
236+
return;
237+
}
238+
foreach($queues as $queue) {
239+
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
240+
$job = Resque_Job::reserve($queue);
241+
if($job) {
242+
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
243+
return $job;
244+
}
245+
}
246+
247+
return false;
248+
}
249+
250+
/**
251+
* Attempt to find a job from the top of one of the queues for this worker.
252+
*
253+
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
254+
*/
255+
public function reserveBlocking($interval = null)
256+
{
257+
$queues = $this->queues();
258+
if(!is_array($queues)) {
259+
return;
260+
}
261+
262+
$job = Resque_Job::reserveBlocking($queues, $interval);
263+
if($job) {
264+
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
265+
return $job;
266+
}
267+
268+
return false;
269+
}
266270

267271
/**
268272
* Return an array containing all of the queues that this worker should use

0 commit comments

Comments
 (0)