Skip to content

Commit 9ebad2d

Browse files
committed
Support job options
1 parent e8d9c78 commit 9ebad2d

File tree

4 files changed

+98
-19
lines changed

4 files changed

+98
-19
lines changed

src/Queue/QlessQueue.php

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
class QlessQueue extends Queue implements QueueContract
1818
{
19+
public const JOB_OPTIONS_KEY = '__QLESS_OPTIONS';
20+
1921
private const WORKER_PREFIX = 'laravel_';
2022

2123
/**
@@ -73,12 +75,19 @@ public function pushRaw($payload, $queueName = null, array $options = [])
7375

7476
$queue = $this->getConnection()->queues[$queueName];
7577

78+
$qlessOptions = $payloadData['data'][self::JOB_OPTIONS_KEY] ?? [];
79+
80+
$options = array_merge($qlessOptions, $options);
81+
7682
return $queue->put(
7783
$payloadData['job'],
7884
$payloadData['data'],
79-
null,
80-
$payloadData['timeout'],
81-
$payloadData['maxTries']
85+
$options['jid'] ?? null,
86+
$options['delay'] ?? null,
87+
$options['retries'] ?? null,
88+
$options['priority'] ?? null,
89+
$options['tags'] ?? null,
90+
$options['depends'] ?? null
8291
);
8392
}
8493

@@ -106,10 +115,13 @@ public function push($job, $data = '', $queueName = null)
106115
*/
107116
public function later($delay, $job, $data = '', $queueName = null)
108117
{
118+
$options = $data[self::JOB_OPTIONS_KEY] ?? [];
119+
$options = array_merge($options, ['timeout' => $delay]);
120+
109121
return $this->pushRaw(
110-
$this->makePayload($job, $data, ['timeout' => $delay]),
122+
$this->makePayload($job, $data, $options),
111123
$queueName,
112-
['timeout' => $delay]
124+
$options
113125
);
114126
}
115127

@@ -127,7 +139,20 @@ public function recur(int $interval, string $job, array $data, ?string $queueNam
127139
/** @var \Qless\Queues\Queue $queue */
128140
$queue = $this->getConnection()->queues[$queueName];
129141

130-
return $queue->recur($job, $data, $interval);
142+
$options = $data[self::JOB_OPTIONS_KEY] ?? [];
143+
$options = array_merge($options, ['interval' => $interval]);
144+
145+
return $queue->recur(
146+
$job,
147+
$data,
148+
$options['interval'],
149+
$options['offset'] ?? null,
150+
$options['jid'] ?? null,
151+
$options['retries'] ?? null,
152+
$options['priority'] ?? null,
153+
$options['backlog'] ?? null,
154+
$options['tags'] ?? null
155+
);
131156
}
132157

133158
/**
@@ -199,12 +224,18 @@ public function pushToTopic(string $topicName, string $job, array $data = [], ar
199224
{
200225
$topic = new Topic($topicName, $this->getConnection());
201226

227+
$qlessOptions = $payloadData['data'][self::JOB_OPTIONS_KEY] ?? [];
228+
$options = array_merge($qlessOptions, $options);
229+
202230
return $topic->put(
203231
$job,
204232
$data,
205-
null,
206-
$options['timeout'] ?? null,
207-
$options['maxTries'] ?? null
233+
$options['jid'] ?? null,
234+
$options['delay'] ?? null,
235+
$options['retries'] ?? null,
236+
$options['priority'] ?? null,
237+
$options['tags'] ?? null,
238+
$options['depends'] ?? null
208239
);
209240
}
210241

@@ -216,17 +247,18 @@ public function pushToTopic(string $topicName, string $job, array $data = [], ar
216247
*/
217248
protected function makePayload(string $job, $data, $options = []): string
218249
{
250+
$qlessOptions = $data[self::JOB_OPTIONS_KEY] ?? [];
251+
$data[self::JOB_OPTIONS_KEY] = array_merge($qlessOptions, $options);
252+
219253
$payload = json_encode([
220254
'displayName' => explode('@', $job)[0],
221255
'job' => $job,
222-
'maxTries' => array_get($options, 'maxTries'),
223-
'timeout' => array_get($options, 'timeout'),
224256
'data' => $data,
225257
]);
226258

227259
if (JSON_ERROR_NONE !== json_last_error()) {
228260
throw new InvalidPayloadException(
229-
'Unable to JSON encode payload. Error code: '.json_last_error()
261+
'Unable to JSON encode payload. Error code: ' . json_last_error()
230262
);
231263
}
232264

tests/Queue/HandlerTest.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public function testCustomHandler()
2424

2525
protected function getQueue()
2626
{
27-
return new QlessQueue(
27+
$queue = new QlessQueue(
2828
new Client([
2929
'host' => REDIS_HOST,
3030
'port' => REDIS_PORT,
@@ -33,6 +33,10 @@ protected function getQueue()
3333
'queue' => 'test_qless_queue'
3434
]
3535
);
36+
37+
$queue->setContainer($this->app);
38+
39+
return $queue;
3640
}
3741

3842
protected function getApplicationProviders($app)

tests/Queue/Job.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ public function perform(BaseJob $job)
1313
$job->getData()['classHandler'] = self::class;
1414
}
1515

16+
if ($job->getTags()) {
17+
$job->getData()['tags'] = $job->getTags();
18+
}
19+
1620
$job->complete();
1721
}
1822
}

tests/Queue/QlessQueueTest.php

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public function testPushPop()
3838
{
3939
$queue = $this->getQueue();
4040

41-
$queueName = 'test_push_pop_' . random_int(1000, 5000);
41+
$queueName = str_random(16);
4242

4343
$jobId = $queue->push(Job::class, ['firstKey' => 'firstValue'], $queueName);
4444

@@ -54,7 +54,10 @@ public function testPushPop()
5454

5555
$this->assertEquals($job->getName(), Job::class);
5656

57-
$this->assertEquals($job->getData(), ['firstKey' => 'firstValue']);
57+
$data = $job->getData();
58+
unset($data[QlessQueue::JOB_OPTIONS_KEY]);
59+
60+
$this->assertEquals($data, ['firstKey' => 'firstValue']);
5861
}
5962

6063
public function testSubscribe()
@@ -85,8 +88,12 @@ public function testSubscribe()
8588
$this->assertEquals($job2, null);
8689
$this->assertEquals($job3->getName(), Job::class);
8790

88-
$this->assertEquals($job1->getData(), ['key' => 'value']);
89-
$this->assertEquals($job3->getData(), ['key' => 'value']);
91+
$data1 = $job1->getData();
92+
$data3 = $job3->getData();
93+
unset($data1[QlessQueue::JOB_OPTIONS_KEY], $data3[QlessQueue::JOB_OPTIONS_KEY]);
94+
95+
$this->assertEquals($data1, ['key' => 'value']);
96+
$this->assertEquals($data3, ['key' => 'value']);
9097
}
9198

9299
public function testUnSubscribe()
@@ -121,7 +128,7 @@ public function testUnSubscribe()
121128
*/
122129
public function testSize()
123130
{
124-
$queueName = 'test_size' . random_int(1000, 5000);
131+
$queueName = str_random();
125132

126133
$queue = $this->getQueue();
127134

@@ -138,9 +145,37 @@ public function testSize()
138145
$this->assertEquals($queue->size($queueName), 0);
139146
}
140147

148+
/**
149+
* @throws \Exception
150+
*/
151+
public function testJobOptions()
152+
{
153+
$queueName = str_random();
154+
155+
$jid = str_random(16);
156+
157+
$queue = $this->getQueue();
158+
159+
$data = [
160+
QlessQueue::JOB_OPTIONS_KEY => [
161+
'jid' => $jid,
162+
'tags' => ['tag1', 'tag_second'],
163+
],
164+
'firstKey' => 'firstValue',
165+
];
166+
167+
$queue->push(Job::class, $data, $queueName);
168+
169+
$job = $queue->pop($queueName);
170+
$job->fire();
171+
172+
$this->assertEquals($jid, $job->getJobId());
173+
$this->assertEquals(['tag1', 'tag_second'], $job->getData()['tags']);
174+
}
175+
141176
protected function getQueue()
142177
{
143-
return new QlessQueue(
178+
$queue = new QlessQueue(
144179
new Client([
145180
'host' => REDIS_HOST,
146181
'port' => REDIS_PORT,
@@ -149,6 +184,10 @@ protected function getQueue()
149184
'queue' => 'test_qless_queue'
150185
]
151186
);
187+
188+
$queue->setContainer($this->app);
189+
190+
return $queue;
152191
}
153192

154193
protected function getApplicationProviders($app)

0 commit comments

Comments
 (0)