Skip to content

Commit f34af24

Browse files
Add ability and command to clear queues
1 parent 257b95c commit f34af24

File tree

8 files changed

+197
-1
lines changed

8 files changed

+197
-1
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Illuminate\Contracts\Queue;
4+
5+
interface ClearableQueue
6+
{
7+
/**
8+
* Clear all jobs from the queue.
9+
*
10+
* @param string $queue
11+
* @return int
12+
*/
13+
public function clear($queue);
14+
}

src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
use Illuminate\Queue\Console\RestartCommand as QueueRestartCommand;
6666
use Illuminate\Queue\Console\RetryBatchCommand as QueueRetryBatchCommand;
6767
use Illuminate\Queue\Console\RetryCommand as QueueRetryCommand;
68+
use Illuminate\Queue\Console\ClearCommand as QueueClearCommand;
6869
use Illuminate\Queue\Console\TableCommand;
6970
use Illuminate\Queue\Console\WorkCommand as QueueWorkCommand;
7071
use Illuminate\Routing\Console\ControllerMakeCommand;
@@ -96,6 +97,7 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
9697
'Optimize' => 'command.optimize',
9798
'OptimizeClear' => 'command.optimize.clear',
9899
'PackageDiscover' => 'command.package.discover',
100+
'QueueClear' => 'command.queue.clear',
99101
'QueueFailed' => 'command.queue.failed',
100102
'QueueFlush' => 'command.queue.flush',
101103
'QueueForget' => 'command.queue.forget',
@@ -712,6 +714,18 @@ protected function registerQueueWorkCommand()
712714
});
713715
}
714716

717+
/**
718+
* Register the command.
719+
*
720+
* @return void
721+
*/
722+
protected function registerQueueClearCommand()
723+
{
724+
$this->app->singleton('command.queue.clear', function () {
725+
return new QueueClearCommand;
726+
});
727+
}
728+
715729
/**
716730
* Register the command.
717731
*
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Console;
4+
5+
use Illuminate\Console\Command;
6+
use Illuminate\Console\ConfirmableTrait;
7+
use Illuminate\Contracts\Queue\ClearableQueue;
8+
use ReflectionClass;
9+
10+
class ClearCommand extends Command
11+
{
12+
use ConfirmableTrait;
13+
14+
/**
15+
* The console command name.
16+
*
17+
* @var string
18+
*/
19+
protected $signature = 'queue:clear
20+
{connection? : The name of the queue connection to clear}
21+
{--queue= : The name of the queue to clear}';
22+
23+
/**
24+
* The console command description.
25+
*
26+
* @var string
27+
*/
28+
protected $description = 'Clear the queue';
29+
30+
/**
31+
* Execute the console command.
32+
*
33+
* @return int|null
34+
*/
35+
public function handle()
36+
{
37+
if (! $this->confirmToProceed()) {
38+
return 1;
39+
}
40+
41+
$connection = $this->argument('connection')
42+
?: $this->laravel['config']['queue.default'];
43+
44+
// We need to get the right queue for the connection which is set in the queue
45+
// configuration file for the application. We will pull it based on the set
46+
// connection being run for the queue operation currently being executed.
47+
$queueName = $this->getQueue($connection);
48+
$queue = ($this->laravel['queue'])->connection($connection);
49+
50+
if($queue instanceof ClearableQueue) {
51+
$count = $queue->clear($queueName);
52+
53+
$this->line('<info>Cleared '.$count.' jobs from the '.$queueName.' queue</info> ');
54+
} else {
55+
$this->line('<error>Clearing queues is not supported on '.(new ReflectionClass($queue))->getShortName().'</error> ');
56+
}
57+
58+
return 0;
59+
}
60+
61+
/**
62+
* Get the queue name to clear.
63+
*
64+
* @param string $connection
65+
* @return string
66+
*/
67+
protected function getQueue($connection)
68+
{
69+
return $this->option('queue') ?: $this->laravel['config']->get(
70+
"queue.connections.{$connection}.queue", 'default'
71+
);
72+
}
73+
}

src/Illuminate/Queue/DatabaseQueue.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,19 @@ protected function markJobAsReserved($job)
303303
return $job;
304304
}
305305

306+
/**
307+
* Clear all jobs from the queue.
308+
*
309+
* @param string $queue
310+
* @return int
311+
*/
312+
public function clear($queue)
313+
{
314+
return $this->database->table($this->table)
315+
->where('queue', $this->getQueue($queue))
316+
->delete();
317+
}
318+
306319
/**
307320
* Delete a reserved job from the queue.
308321
*

src/Illuminate/Queue/LuaScripts.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,24 @@ public static function size()
2020
LUA;
2121
}
2222

23+
/**
24+
* Get the Lua script for clearing the queue.
25+
*
26+
* KEYS[1] - The name of the primary queue
27+
* KEYS[2] - The name of the "delayed" queue
28+
* KEYS[3] - The name of the "reserved" queue
29+
*
30+
* @return string
31+
*/
32+
public static function clear()
33+
{
34+
return <<<'LUA'
35+
local size = redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
36+
redis.call('del', KEYS[1], KEYS[2], KEYS[3])
37+
return size
38+
LUA;
39+
}
40+
2341
/**
2442
* Get the Lua script for pushing jobs onto the queue.
2543
*

src/Illuminate/Queue/RedisQueue.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
namespace Illuminate\Queue;
44

55
use Illuminate\Contracts\Queue\Queue as QueueContract;
6+
use Illuminate\Contracts\Queue\ClearableQueue;
67
use Illuminate\Contracts\Redis\Factory as Redis;
78
use Illuminate\Queue\Jobs\RedisJob;
89
use Illuminate\Support\Str;
910

10-
class RedisQueue extends Queue implements QueueContract
11+
class RedisQueue extends Queue implements QueueContract, ClearableQueue
1112
{
1213
/**
1314
* The Redis factory implementation.
@@ -256,6 +257,21 @@ protected function retrieveNextJob($queue, $block = true)
256257
return [$job, $reserved];
257258
}
258259

260+
/**
261+
* Clear all jobs from the queue.
262+
*
263+
* @param string $queue
264+
* @return int
265+
*/
266+
public function clear($queue)
267+
{
268+
$queue = $this->getQueue($queue);
269+
270+
return $this->getConnection()->eval(
271+
LuaScripts::clear(), 3, $queue, $queue.':delayed', $queue.':reserved'
272+
);
273+
}
274+
259275
/**
260276
* Delete a reserved job from the queue.
261277
*

tests/Queue/QueueDatabaseQueueIntegrationTest.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,35 @@ public function testPoppedJobsIncrementAttempts()
147147
$this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!');
148148
}
149149

150+
/**
151+
* Test that the queue can be cleared.
152+
*/
153+
public function testThatQueueCanBeCleared()
154+
{
155+
$this->connection()
156+
->table('jobs')
157+
->insert([[
158+
'id' => 1,
159+
'queue' => $mock_queue_name = 'mock_queue_name',
160+
'payload' => 'mock_payload',
161+
'attempts' => 0,
162+
'reserved_at' => Carbon::now()->addDay()->getTimestamp(),
163+
'available_at' => Carbon::now()->subDay()->getTimestamp(),
164+
'created_at' => Carbon::now()->getTimestamp(),
165+
], [
166+
'id' => 2,
167+
'queue' => $mock_queue_name,
168+
'payload' => 'mock_payload 2',
169+
'attempts' => 0,
170+
'reserved_at' => null,
171+
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
172+
'created_at' => Carbon::now()->getTimestamp(),
173+
]]);
174+
175+
$this->assertEquals(2, $this->queue->clear($mock_queue_name));
176+
$this->assertEquals(0, $this->queue->size());
177+
}
178+
150179
/**
151180
* Test that jobs that are not reserved and have an available_at value in the future, are not popped.
152181
*/

tests/Queue/RedisQueueIntegrationTest.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,25 @@ public function testDelete($driver)
414414
$this->assertNull($this->queue->pop());
415415
}
416416

417+
/**
418+
* @dataProvider redisDriverProvider
419+
*
420+
* @param string $driver
421+
*/
422+
public function testClear($driver)
423+
{
424+
$this->setQueue($driver);
425+
426+
$job1 = new RedisQueueIntegrationTestJob(30);
427+
$job2 = new RedisQueueIntegrationTestJob(40);
428+
429+
$this->queue->push($job1);
430+
$this->queue->push($job2);
431+
432+
$this->assertEquals(2, $this->queue->clear(null));
433+
$this->assertEquals(0, $this->queue->size());
434+
}
435+
417436
/**
418437
* @dataProvider redisDriverProvider
419438
*

0 commit comments

Comments
 (0)