Skip to content

Commit 323c4cc

Browse files
committed
feat: add support for db adapter
1 parent 294be10 commit 323c4cc

File tree

3 files changed

+402
-0
lines changed

3 files changed

+402
-0
lines changed

src/Batch.php

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
<?php
2+
3+
namespace Leaf;
4+
5+
use Leaf\Queue\Dispatchable;
6+
7+
class Batch implements Dispatchable
8+
{
9+
/**
10+
* Job config
11+
*/
12+
protected $config = [];
13+
14+
/**
15+
* Current job
16+
*/
17+
protected $job = [];
18+
19+
/**
20+
* Data to pass to the job
21+
*/
22+
protected $data = [];
23+
24+
/**
25+
* Queue instance
26+
* @var \Leaf\Queue
27+
*/
28+
protected $queue = null;
29+
30+
/**
31+
* Load a job for running
32+
*/
33+
public function __construct($job, $config, $queue)
34+
{
35+
$this->job = $job;
36+
$this->config = $config;
37+
$this->queue = $queue;
38+
}
39+
40+
/**
41+
* Get the job identifier.
42+
*
43+
* @return string
44+
*/
45+
public function getJobId()
46+
{
47+
return $this->job['id'];
48+
}
49+
50+
/**
51+
* Handle delay for job
52+
*/
53+
public function handleDelay()
54+
{
55+
sleep($this->config['delay']);
56+
}
57+
58+
/**
59+
* Check if job has expired
60+
*/
61+
public function hasExpired()
62+
{
63+
return $this->job['created_at'] < time() - $this->config['expire'];
64+
}
65+
66+
/**
67+
* Handle job expiry
68+
*/
69+
public function handleExpiry()
70+
{
71+
echo "Job {$this->job['id']} has expired\n";
72+
$this->queue->pop($this->job['id']);
73+
}
74+
75+
/**
76+
* Check if job has reached retry limit
77+
*/
78+
public function hasReachedRetryLimit()
79+
{
80+
return $this->job['retry_count'] >= $this->config['tries'];
81+
}
82+
83+
/**
84+
* Handle job retry limit
85+
*/
86+
public function handleRetryLimit()
87+
{
88+
echo "Job {$this->job['id']} has reached retry limit\n";
89+
$this->queue->pop($this->job['id']);
90+
}
91+
92+
/**
93+
* Set job status
94+
*/
95+
public function setStatus($status)
96+
{
97+
$this->queue->setJobStatus($this->job['id'], $status);
98+
}
99+
100+
/**
101+
* Retry job
102+
*/
103+
public function retry()
104+
{
105+
sleep($this->config['delayBeforeRetry'] ?? 0);
106+
$this->queue->retryFailedJob($this->job['id'], $this->config['retry_count']);
107+
}
108+
109+
/**
110+
* Release the job back into the queue after (n) seconds.
111+
*
112+
* @param int $delay
113+
* @return void
114+
*/
115+
public function release($delay = 0)
116+
{
117+
$this->queue->push([
118+
'class' => $this->job['class'],
119+
'config' => json_encode($this->config),
120+
'status' => 'pending',
121+
'retry_count' => $this->job['retry_count'] + 1,
122+
]);
123+
}
124+
125+
/**
126+
* Pass data to the job
127+
*/
128+
public function with($data)
129+
{
130+
$this->data = $data;
131+
return $this;
132+
}
133+
134+
/**
135+
* Create a new job
136+
*/
137+
public static function create(callable $job)
138+
{
139+
//
140+
}
141+
142+
public function handle()
143+
{
144+
//
145+
}
146+
147+
public function stack()
148+
{
149+
return;
150+
}
151+
152+
public function getConfig()
153+
{
154+
return [];
155+
}
156+
157+
public function trigger()
158+
{
159+
$this->queue->setJobStatus($this->job['id'], 'processing');
160+
$this->handle();
161+
}
162+
163+
public function removeFromQueue()
164+
{
165+
$this->queue->pop($this->job['id']);
166+
}
167+
}

src/Queue/Adapters/Database.php

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
<?php
2+
3+
namespace Leaf\Queue\Adapters;
4+
5+
/**
6+
* Database adapter
7+
* -----
8+
* Db adapter for the worker
9+
*/
10+
class Database implements Adapter
11+
{
12+
/** @var \Leaf\Db */
13+
protected $db;
14+
15+
protected $errors;
16+
17+
protected array $config = [];
18+
19+
public function __construct()
20+
{
21+
$this->db = new \Leaf\Db();
22+
}
23+
24+
/**
25+
* @inheritDoc
26+
*/
27+
public function connect($connection)
28+
{
29+
$appDbConfig = MvcConfig('database');
30+
$dbConnection = $appDbConfig['connections'][$connection['connection']] ?? $appDbConfig['connections'][$appDbConfig['default']];
31+
32+
$this->db->connect([
33+
'dbtype' => $dbConnection['driver'] ?? 'mysql',
34+
'charset' => $dbConnection['charset'] ?? null,
35+
'port' => $dbConnection['port'] ?? null,
36+
'unixSocket' => $dbConnection['unixSocket'] ?? null,
37+
'host' => $dbConnection['host'] ?? '127.0.0.1',
38+
'username' => $dbConnection['username'] ?? 'root',
39+
'password' => $dbConnection['password'] ?? '',
40+
'dbname' => $dbConnection['database'] ?? '',
41+
]);
42+
43+
$this->config['table'] = $connection['table'] ?? 'leaf_php_jobs';
44+
45+
return $this;
46+
}
47+
48+
/**
49+
* @inheritDoc
50+
*/
51+
public function pushJobToQueue($job)
52+
{
53+
if (!$this->db->tableExists($this->config['table'])) {
54+
$this->setupAdapterStorage();
55+
}
56+
57+
$this->db
58+
->insert($this->config['table'])
59+
->params($job)
60+
->execute();
61+
62+
if ($this->db->errors()) {
63+
$this->errors = $this->db->errors();
64+
return false;
65+
}
66+
67+
return true;
68+
}
69+
70+
/**
71+
* @inheritDoc
72+
*/
73+
public function popJobFromQueue($id)
74+
{
75+
$this->db
76+
->delete($this->config['table'])
77+
->where([
78+
"id" => $id,
79+
])
80+
->execute();
81+
82+
if ($this->db->errors()) {
83+
$this->errors = $this->db->errors();
84+
85+
return false;
86+
}
87+
88+
return true;
89+
}
90+
91+
/**
92+
* @inheritDoc
93+
*/
94+
public function setJobStatus($id, $status)
95+
{
96+
$this->db
97+
->update($this->config['table'])
98+
->params([
99+
"status" => $status,
100+
])
101+
->where([
102+
"id" => $id,
103+
])
104+
->execute();
105+
106+
if ($this->db->errors()) {
107+
$this->errors = $this->db->errors();
108+
109+
return false;
110+
}
111+
112+
return true;
113+
}
114+
115+
/**
116+
* @inheritDoc
117+
*/
118+
public function markJobAsFailed($id)
119+
{
120+
$this->db
121+
->update($this->config['table'])
122+
->params([
123+
"status" => "failed",
124+
])
125+
->where([
126+
"id" => $id,
127+
])
128+
->execute();
129+
130+
if ($this->db->errors()) {
131+
$this->errors = $this->db->errors();
132+
133+
return false;
134+
}
135+
136+
return true;
137+
}
138+
139+
/**
140+
* Setup storage for the adapter
141+
*/
142+
protected function setupAdapterStorage()
143+
{
144+
$this->db
145+
->createTable($this->config['table'], [
146+
'id' => 'INT NOT NULL AUTO_INCREMENT',
147+
'class' => 'VARCHAR(255)',
148+
'config' => 'TEXT',
149+
'status' => 'VARCHAR(50)',
150+
'retry_count' => 'INT',
151+
'created_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
152+
'updated_at' => 'DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP',
153+
'PRIMARY KEY' => '(ID)',
154+
])
155+
->execute();
156+
}
157+
158+
/**
159+
* @inheritDoc
160+
*/
161+
public function getJobs()
162+
{
163+
return [];
164+
}
165+
166+
/**
167+
* @inheritDoc
168+
*/
169+
public function getNextJob()
170+
{
171+
return $this->db
172+
->select($this->config['table'])
173+
->where([
174+
'status' => 'pending',
175+
])
176+
->orderBy('id', 'asc')
177+
->limit(1)
178+
->get()[0] ?? null;
179+
}
180+
181+
/**
182+
* @inheritDoc
183+
*/
184+
public function retryFailedJob($id, $retryCount)
185+
{
186+
$this->db
187+
->update($this->config['table'])
188+
->params([
189+
"status" => "pending",
190+
"retry_count" => (int) $retryCount + 1,
191+
])
192+
->where([
193+
"id" => $id,
194+
])
195+
->execute();
196+
197+
if ($this->db->errors()) {
198+
$this->errors = $this->db->errors();
199+
200+
return false;
201+
}
202+
203+
return true;
204+
}
205+
}

0 commit comments

Comments
 (0)