Skip to content

Commit b8f98ee

Browse files
committed
WIP
1 parent e541fa9 commit b8f98ee

File tree

7 files changed

+152
-104
lines changed

7 files changed

+152
-104
lines changed

bin/resque

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#!/usr/bin/env php
21
<?php
32

43
// Find and initialize Composer

demo/job.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ class PHP_Job
33
{
44
public function perform()
55
{
6-
sleep(120);
7-
fwrite(STDOUT, 'Hello!');
6+
fwrite(STDOUT, 'Start job! -> ');
7+
sleep(1);
8+
fwrite(STDOUT, 'Job ended!' . PHP_EOL);
89
}
910
}
1011
?>

demo/queue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@
1414
),
1515
);
1616

17-
$jobId = Resque::enqueue('default', $argv[1], $args, true);
17+
$jobId = Resque::enqueue($argv[1], $argv[2], $args, true);
1818
echo "Queued job ".$jobId."\n\n";
1919
?>

lib/Resque.php

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,21 +114,40 @@ public static function push($queue, $item)
114114
* @param string $queue The name of the queue to fetch an item from.
115115
* @return array Decoded item from the queue.
116116
*/
117-
public static function pop($queue, $interval = null)
117+
public static function pop($queue)
118118
{
119-
if($interval == null) {
120-
$item = self::redis()->lpop('queue:' . $queue);
121-
} else {
122-
$item = self::redis()->blpop('queue:' . $queue, $interval ? $interval : Resque::DEFAULT_INTERVAL);
123-
}
119+
$item = self::redis()->lpop('queue:' . $queue);
124120

125121
if(!$item) {
126122
return;
127123
}
128124

129-
return json_decode($interval == 0 ? $item : $item[1], true);
125+
return json_decode($item, true);
130126
}
131127

128+
/**
129+
* Pop an item off the end of the specified queue, decode it and
130+
* return it.
131+
*
132+
* @param string $queue The name of the queue to fetch an item from.
133+
* @return array Decoded item from the queue.
134+
*/
135+
public static function blpop($queues, $interval = null)
136+
{
137+
$list = array();
138+
foreach($queues AS $queue) {
139+
$list[] = 'queue:' . $queue;
140+
}
141+
142+
$item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL);
143+
144+
if(!$item) {
145+
return;
146+
}
147+
148+
return json_decode($item[1], true);
149+
}
150+
132151
/**
133152
* Return the size (number of pending jobs) of the specified queue.
134153
*

lib/Resque/Job.php

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,22 +73,41 @@ public static function create($queue, $class, $args = null, $monitor = false)
7373
return $id;
7474
}
7575

76-
/**
77-
* Find the next available job from the specified queue and return an
78-
* instance of Resque_Job for it.
79-
*
80-
* @param string $queue The name of the queue to check for a job in.
81-
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
82-
*/
83-
public static function reserve($queue, $interval = null)
84-
{
85-
$payload = Resque::pop($queue, $interval);
86-
if(!is_array($payload)) {
87-
return false;
88-
}
76+
/**
77+
* Find the next available job from the specified queue and return an
78+
* instance of Resque_Job for it.
79+
*
80+
* @param string $queue The name of the queue to check for a job in.
81+
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
82+
*/
83+
public static function reserve($queue)
84+
{
85+
$payload = Resque::pop($queue);
86+
if(!is_array($payload)) {
87+
return false;
88+
}
8989

90-
return new Resque_Job($queue, $payload);
91-
}
90+
return new Resque_Job($queue, $payload);
91+
}
92+
93+
/**
94+
* Find the next available job from the specified queue and return an
95+
* instance of Resque_Job for it.
96+
*
97+
* @param string $queue The name of the queue to check for a job in.
98+
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
99+
*/
100+
public static function reserveBlocking($queues, $interval = null)
101+
{
102+
$payload = Resque::blpop($queues, $interval);
103+
if(!is_array($payload)) {
104+
return false;
105+
}
106+
107+
var_dump($payload);
108+
109+
return new Resque_Job($payload->queue, $payload);
110+
}
92111

93112
/**
94113
* Update the status of the current job.

lib/Resque/Redis.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,13 @@ public function __construct($server, $database = null)
143143
*/
144144
public function __call($name, $args) {
145145
if(in_array($name, $this->keyCommands)) {
146-
$args[0] = self::$defaultNamespace . $args[0];
146+
if(is_array($args[0])) {
147+
foreach($args[0] AS $i => $v) {
148+
$args[0][$i] = self::$defaultNamespace . $v;
149+
}
150+
} else {
151+
$args[0] = self::$defaultNamespace . $args[0];
152+
}
147153
}
148154
try {
149155
return $this->driver->__call($name, $args);

lib/Resque/Worker.php

Lines changed: 81 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -152,70 +152,53 @@ public function work($interval = Resque::DEFAULT_INTERVAL)
152152
$this->updateProcLine('Starting');
153153
$this->startup();
154154

155-
while(true) {
156-
if($this->shutdown) {
157-
break;
158-
}
155+
while($job = $this->reserveBlocking($interval)) {
156+
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
159157

160-
// Attempt to find and reserve a job
161-
$job = false;
162-
if(!$this->paused) {
163-
$job = $this->reserve($interval);
164-
}
158+
$this->log('got ' . $job);
159+
Resque_Event::trigger('beforeFork', $job);
160+
$this->workingOn($job);
165161

166-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
162+
$this->child = Resque::fork();
167163

168-
if(!$job) {
169-
// For an interval of 0, break now - helps with unit testing etc
170-
if($interval == 0) {
171-
break;
164+
// Forked and we're the child. Run the job.
165+
if ($this->child === 0 || $this->child === false) {
166+
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
167+
$this->updateProcLine($status);
168+
$this->log($status, self::LOG_VERBOSE);
169+
$this->perform($job);
170+
if ($this->child === 0) {
171+
exit(0);
172172
}
173+
}
173174

174-
// If no job was found, we sleep for $interval before continuing and checking again
175-
if($this->paused) {
176-
$this->updateProcLine('Paused');
177-
usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
175+
if($this->child > 0) {
176+
// Parent process, sit and wait
177+
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
178+
$this->updateProcLine($status);
179+
$this->log($status, self::LOG_VERBOSE);
180+
181+
// Wait until the child process finishes before continuing
182+
pcntl_wait($status);
183+
$exitStatus = pcntl_wexitstatus($status);
184+
if($exitStatus !== 0) {
185+
$job->fail(new Resque_Job_DirtyExitException(
186+
'Job exited with exit code ' . $exitStatus
187+
));
178188
}
179-
180-
continue;
181189
}
182190

183-
$this->log('got ' . $job);
184-
Resque_Event::trigger('beforeFork', $job);
185-
$this->workingOn($job);
186-
187-
$this->child = Resque::fork();
188-
189-
// Forked and we're the child. Run the job.
190-
if ($this->child === 0 || $this->child === false) {
191-
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
192-
$this->updateProcLine($status);
193-
$this->log($status, self::LOG_VERBOSE);
194-
$this->perform($job);
195-
if ($this->child === 0) {
196-
exit(0);
197-
}
198-
}
199-
200-
if($this->child > 0) {
201-
// Parent process, sit and wait
202-
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
203-
$this->updateProcLine($status);
204-
$this->log($status, self::LOG_VERBOSE);
205-
206-
// Wait until the child process finishes before continuing
207-
pcntl_wait($status);
208-
$exitStatus = pcntl_wexitstatus($status);
209-
if($exitStatus !== 0) {
210-
$job->fail(new Resque_Job_DirtyExitException(
211-
'Job exited with exit code ' . $exitStatus
212-
));
213-
}
214-
}
191+
$this->child = null;
192+
$this->doneWorking();
215193

216-
$this->child = null;
217-
$this->doneWorking();
218-
}
194+
if($this->shutdown) {
195+
break;
196+
}
197+
198+
if($this->paused) {
199+
break;
200+
}
201+
}
219202

220203
$this->unregisterWorker();
221204
}
@@ -241,28 +224,49 @@ public function perform(Resque_Job $job)
241224
$this->log('done ' . $job);
242225
}
243226

244-
/**
245-
* Attempt to find a job from the top of one of the queues for this worker.
246-
*
247-
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
248-
*/
249-
public function reserve($interval = null)
250-
{
251-
$queues = $this->queues();
252-
if(!is_array($queues)) {
253-
return;
254-
}
255-
foreach($queues as $queue) {
256-
$this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE);
257-
$job = Resque_Job::reserve($queue, $interval);
258-
if($job) {
259-
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
260-
return $job;
261-
}
262-
}
263-
264-
return false;
265-
}
227+
/**
228+
* Attempt to find a job from the top of one of the queues for this worker.
229+
*
230+
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
231+
*/
232+
public function reserve()
233+
{
234+
$queues = $this->queues();
235+
if(!is_array($queues)) {
236+
return;
237+
}
238+
foreach($queues as $queue) {
239+
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
240+
$job = Resque_Job::reserve($queue);
241+
if($job) {
242+
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
243+
return $job;
244+
}
245+
}
246+
247+
return false;
248+
}
249+
250+
/**
251+
* Attempt to find a job from the top of one of the queues for this worker.
252+
*
253+
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
254+
*/
255+
public function reserveBlocking($interval = null)
256+
{
257+
$queues = $this->queues();
258+
if(!is_array($queues)) {
259+
return;
260+
}
261+
262+
$job = Resque_Job::reserveBlocking($queues, $interval);
263+
if($job) {
264+
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
265+
return $job;
266+
}
267+
268+
return false;
269+
}
266270

267271
/**
268272
* Return an array containing all of the queues that this worker should use

0 commit comments

Comments
 (0)