Skip to content

Commit 9491898

Browse files
committed
Merge branch 'clear_queues' into 8.x
2 parents 159bf1a + 06b378c commit 9491898

File tree

8 files changed

+200
-2
lines changed

8 files changed

+200
-2
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Illuminate\Contracts\Queue;
4+
5+
interface ClearableQueue
6+
{
7+
/**
8+
* Delete all of the jobs from the queue.
9+
*
10+
* @param string $queue
11+
* @return int
12+
*/
13+
public function clear($queue);
14+
}

src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
use Illuminate\Foundation\Console\ViewClearCommand;
5858
use Illuminate\Notifications\Console\NotificationTableCommand;
5959
use Illuminate\Queue\Console\BatchesTableCommand;
60+
use Illuminate\Queue\Console\ClearCommand as QueueClearCommand;
6061
use Illuminate\Queue\Console\FailedTableCommand;
6162
use Illuminate\Queue\Console\FlushFailedCommand as FlushFailedQueueCommand;
6263
use Illuminate\Queue\Console\ForgetFailedCommand as ForgetFailedQueueCommand;
@@ -96,6 +97,7 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
9697
'Optimize' => 'command.optimize',
9798
'OptimizeClear' => 'command.optimize.clear',
9899
'PackageDiscover' => 'command.package.discover',
100+
'QueueClear' => 'command.queue.clear',
99101
'QueueFailed' => 'command.queue.failed',
100102
'QueueFlush' => 'command.queue.flush',
101103
'QueueForget' => 'command.queue.forget',
@@ -712,6 +714,18 @@ protected function registerQueueWorkCommand()
712714
});
713715
}
714716

717+
/**
718+
* Register the command.
719+
*
720+
* @return void
721+
*/
722+
protected function registerQueueClearCommand()
723+
{
724+
$this->app->singleton('command.queue.clear', function () {
725+
return new QueueClearCommand;
726+
});
727+
}
728+
715729
/**
716730
* Register the command.
717731
*
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Console;
4+
5+
use Illuminate\Console\Command;
6+
use Illuminate\Console\ConfirmableTrait;
7+
use Illuminate\Contracts\Queue\ClearableQueue;
8+
use ReflectionClass;
9+
10+
class ClearCommand extends Command
11+
{
12+
use ConfirmableTrait;
13+
14+
/**
15+
* The console command name.
16+
*
17+
* @var string
18+
*/
19+
protected $signature = 'queue:clear
20+
{connection? : The name of the queue connection to clear}
21+
{--queue= : The name of the queue to clear}';
22+
23+
/**
24+
* The console command description.
25+
*
26+
* @var string
27+
*/
28+
protected $description = 'Delete all of the jobs from the specified queue';
29+
30+
/**
31+
* Execute the console command.
32+
*
33+
* @return int|null
34+
*/
35+
public function handle()
36+
{
37+
if (! $this->confirmToProceed()) {
38+
return 1;
39+
}
40+
41+
$connection = $this->argument('connection')
42+
?: $this->laravel['config']['queue.default'];
43+
44+
// We need to get the right queue for the connection which is set in the queue
45+
// configuration file for the application. We will pull it based on the set
46+
// connection being run for the queue operation currently being executed.
47+
$queueName = $this->getQueue($connection);
48+
49+
$queue = ($this->laravel['queue'])->connection($connection);
50+
51+
if ($queue instanceof ClearableQueue) {
52+
$count = $queue->clear($queueName);
53+
54+
$this->line('<info>Cleared '.$count.' jobs from the ['.$queueName.'] queue</info> ');
55+
} else {
56+
$this->line('<error>Clearing queues is not supported on ['.(new ReflectionClass($queue))->getShortName().']</error> ');
57+
}
58+
59+
return 0;
60+
}
61+
62+
/**
63+
* Get the queue name to clear.
64+
*
65+
* @param string $connection
66+
* @return string
67+
*/
68+
protected function getQueue($connection)
69+
{
70+
return $this->option('queue') ?: $this->laravel['config']->get(
71+
"queue.connections.{$connection}.queue", 'default'
72+
);
73+
}
74+
}

src/Illuminate/Queue/DatabaseQueue.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
namespace Illuminate\Queue;
44

5+
use Illuminate\Contracts\Queue\ClearableQueue;
56
use Illuminate\Contracts\Queue\Queue as QueueContract;
67
use Illuminate\Database\Connection;
78
use Illuminate\Queue\Jobs\DatabaseJob;
89
use Illuminate\Queue\Jobs\DatabaseJobRecord;
910
use Illuminate\Support\Carbon;
1011
use PDO;
1112

12-
class DatabaseQueue extends Queue implements QueueContract
13+
class DatabaseQueue extends Queue implements QueueContract, ClearableQueue
1314
{
1415
/**
1516
* The database connection instance.
@@ -340,6 +341,19 @@ public function deleteAndRelease($queue, $job, $delay)
340341
});
341342
}
342343

344+
/**
345+
* Delete all of the jobs from the queue.
346+
*
347+
* @param string $queue
348+
* @return int
349+
*/
350+
public function clear($queue)
351+
{
352+
return $this->database->table($this->table)
353+
->where('queue', $this->getQueue($queue))
354+
->delete();
355+
}
356+
343357
/**
344358
* Get the queue or return the default.
345359
*

src/Illuminate/Queue/LuaScripts.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,24 @@ public static function migrateExpiredJobs()
124124
end
125125
126126
return val
127+
LUA;
128+
}
129+
130+
/**
131+
* Get the Lua script for removing all jobs from the queue.
132+
*
133+
* KEYS[1] - The name of the primary queue
134+
* KEYS[2] - The name of the "delayed" queue
135+
* KEYS[3] - The name of the "reserved" queue
136+
*
137+
* @return string
138+
*/
139+
public static function clear()
140+
{
141+
return <<<'LUA'
142+
local size = redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
143+
redis.call('del', KEYS[1], KEYS[2], KEYS[3])
144+
return size
127145
LUA;
128146
}
129147
}

src/Illuminate/Queue/RedisQueue.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
namespace Illuminate\Queue;
44

5+
use Illuminate\Contracts\Queue\ClearableQueue;
56
use Illuminate\Contracts\Queue\Queue as QueueContract;
67
use Illuminate\Contracts\Redis\Factory as Redis;
78
use Illuminate\Queue\Jobs\RedisJob;
89
use Illuminate\Support\Str;
910

10-
class RedisQueue extends Queue implements QueueContract
11+
class RedisQueue extends Queue implements QueueContract, ClearableQueue
1112
{
1213
/**
1314
* The Redis factory implementation.
@@ -286,6 +287,21 @@ public function deleteAndRelease($queue, $job, $delay)
286287
);
287288
}
288289

290+
/**
291+
* Delete all of the jobs from the queue.
292+
*
293+
* @param string $queue
294+
* @return int
295+
*/
296+
public function clear($queue)
297+
{
298+
$queue = $this->getQueue($queue);
299+
300+
return $this->getConnection()->eval(
301+
LuaScripts::clear(), 3, $queue, $queue.':delayed', $queue.':reserved'
302+
);
303+
}
304+
289305
/**
290306
* Get a random ID string.
291307
*

tests/Queue/QueueDatabaseQueueIntegrationTest.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,35 @@ public function testPoppedJobsIncrementAttempts()
147147
$this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!');
148148
}
149149

150+
/**
151+
* Test that the queue can be cleared.
152+
*/
153+
public function testThatQueueCanBeCleared()
154+
{
155+
$this->connection()
156+
->table('jobs')
157+
->insert([[
158+
'id' => 1,
159+
'queue' => $mock_queue_name = 'mock_queue_name',
160+
'payload' => 'mock_payload',
161+
'attempts' => 0,
162+
'reserved_at' => Carbon::now()->addDay()->getTimestamp(),
163+
'available_at' => Carbon::now()->subDay()->getTimestamp(),
164+
'created_at' => Carbon::now()->getTimestamp(),
165+
], [
166+
'id' => 2,
167+
'queue' => $mock_queue_name,
168+
'payload' => 'mock_payload 2',
169+
'attempts' => 0,
170+
'reserved_at' => null,
171+
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
172+
'created_at' => Carbon::now()->getTimestamp(),
173+
]]);
174+
175+
$this->assertEquals(2, $this->queue->clear($mock_queue_name));
176+
$this->assertEquals(0, $this->queue->size());
177+
}
178+
150179
/**
151180
* Test that jobs that are not reserved and have an available_at value in the future, are not popped.
152181
*/

tests/Queue/RedisQueueIntegrationTest.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,25 @@ public function testDelete($driver)
414414
$this->assertNull($this->queue->pop());
415415
}
416416

417+
/**
418+
* @dataProvider redisDriverProvider
419+
*
420+
* @param string $driver
421+
*/
422+
public function testClear($driver)
423+
{
424+
$this->setQueue($driver);
425+
426+
$job1 = new RedisQueueIntegrationTestJob(30);
427+
$job2 = new RedisQueueIntegrationTestJob(40);
428+
429+
$this->queue->push($job1);
430+
$this->queue->push($job2);
431+
432+
$this->assertEquals(2, $this->queue->clear(null));
433+
$this->assertEquals(0, $this->queue->size());
434+
}
435+
417436
/**
418437
* @dataProvider redisDriverProvider
419438
*

0 commit comments

Comments
 (0)