forked from devsnippet/PHPResqueBundle
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPHPResque.php
More file actions
106 lines (87 loc) · 2.55 KB
/
PHPResque.php
File metadata and controls
106 lines (87 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
<?php
namespace PHPResqueBundle;
use Psr\Log\LoggerInterface;
class PHPResque
{
private $queue = '*';
private $logging = 'normal';
private $checker_interval = 5;
private $fork_count = 1;
private $backend = '';
private $stayAlive = false;
private $blocking = true;
/**
* @var LoggerInterface logger
*/
private $logger;
public function __construct($backend, LoggerInterface $logger) {
$this->backend = $backend;
$this->logger = $logger;
}
public function defineQueue($name) {
$this->queue = $name;
}
public function verbose($mode) {
$this->logging = $mode;
}
public function setInterval($interval) {
$this->checker_interval = (int)$interval;
}
/**
* @param boolean $blocking
*/
public function setBlocking($blocking)
{
$this->blocking = $blocking;
}
/**
* @param boolean $stayAlive
*/
public function setStayAlive($stayAlive)
{
$this->stayAlive = $stayAlive;
}
public function forkInstances($count) {
settype($count, 'int');
if ($count > 1) {
if (function_exists('pcntl_fork')) {
$this->fork_count = $count;
} else {
fwrite(STDOUT, "*** Fork could not initialized. PHP function pcntl_fork() does NOT exists \n");
$this->fork_count = 1;
}
} else {
$this->fork_count = 1;
}
}
public function getForkInstances() {
return $this->fork_count;
}
private function work() {
$worker = new \Resque_Worker(explode(',', $this->queue));
$worker->setLogger($this->logger);
$worker->setStayAlive($this->stayAlive);
fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
$worker->work($this->checker_interval, $this->blocking);
}
public function daemon() {
\Resque::setBackend($this->backend);
if (strpos($this->queue, ':') !== false) {
list($namespace, $queue) = explode(':', $this->queue);
\Resque_Redis::prefix($namespace);
$this->queue = $queue;
}
if ($this->getForkInstances() > 1) {
for ($i = 0; $i < $this->getForkInstances(); ++$i) {
$pid = pcntl_fork();
if ($pid == -1) {
throw new \RuntimeException("Could not fork worker {$i}");
}
$this->work();
break;
}
} else {
$this->work();
}
}
}