Skip to content

Commit 362d806

Browse files
committed
Improve performance of queries.
1 parent 1bfd32c commit 362d806

File tree

4 files changed

+59
-10
lines changed

4 files changed

+59
-10
lines changed

config/Migrations/20240307154751_MigrationQueueInitV8.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ class MigrationQueueInitV8 extends BaseMigration {
99
/**
1010
* Up Method.
1111
*
12-
* More information on this method is available here:
13-
* https://book.cakephp.org/phinx/0/en/migrations.html#the-up-method
14-
*
1512
* @return void
1613
*/
1714
public function up(): void {

config/Migrations/20250508121432_MigrationQueueMemory.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ class MigrationQueueMemory extends BaseMigration {
88
/**
99
* Change Method.
1010
*
11-
* More information on this method is available here:
12-
* https://book.cakephp.org/migrations/4/en/migrations.html#the-change-method
13-
*
1411
* @return void
1512
*/
1613
public function change(): void {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
use Migrations\BaseMigration;
5+
6+
class MigrationQueueIndexOptimization extends BaseMigration {
7+
8+
/**
9+
* Change Method.
10+
*
11+
* @return void
12+
*/
13+
public function change(): void {
14+
$this->table('queued_jobs')
15+
// Main queue fetch optimization - CRITICAL
16+
// Covers the primary requestJob() query which filters by completed IS NULL
17+
// and orders by priority, age (calculated from notbefore), and id
18+
->addIndex(
19+
['completed', 'priority', 'notbefore', 'id'],
20+
[
21+
'name' => 'queue_fetch_optimization',
22+
],
23+
)
24+
// Cleanup queries optimization
25+
// Used by flushFailedJobs() which filters by fetched < threshold
26+
->addIndex(
27+
['fetched'],
28+
[
29+
'name' => 'fetched',
30+
],
31+
)
32+
// Stats and job type queries optimization
33+
// Used by getStats() and various job_task lookups
34+
->addIndex(
35+
['job_task', 'completed'],
36+
[
37+
'name' => 'job_task_completed',
38+
],
39+
)
40+
// Worker tracking optimization
41+
// Used in requestJob() for cost and unique constraints
42+
->addIndex(
43+
['workerkey'],
44+
[
45+
'name' => 'workerkey',
46+
],
47+
)
48+
->update();
49+
}
50+
51+
}

src/Model/Table/QueuedJobsTable.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ public function requestJob(array $tasks, array $groups = [], array $types = []):
613613

614614
break;
615615
case static::DRIVER_SQLITE:
616-
//TODO
616+
$tmp['strftime("%s", "now") >='] = $this->rateHistory[$tmp['job_task']] + $task['rate'];
617617

618618
break;
619619
}
@@ -632,8 +632,10 @@ public function requestJob(array $tasks, array $groups = [], array $types = []):
632632

633633
break;
634634
case static::DRIVER_SQLSERVER:
635-
// the ORM does not support ROW locking at the moment
636-
// TODO
635+
// Row-level locking (ROWLOCK, UPDLOCK hints) not supported by CakePHP ORM.
636+
// SQLServer uses default isolation level (READ COMMITTED) with automatic
637+
// row locking on UPDATE. This may allow race conditions in high-concurrency
638+
// scenarios. Consider using application-level locking or advisory locks if needed.
637639

638640
break;
639641
case static::DRIVER_SQLITE:
@@ -925,8 +927,10 @@ public function cleanOldJobs(): int {
925927
return 0;
926928
}
927929

930+
$threshold = (new DateTime())->subSeconds((int)Configure::read('Queue.cleanuptimeout'));
931+
928932
return $this->deleteAll([
929-
'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout'),
933+
'completed <' => $threshold,
930934
]);
931935
}
932936

0 commit comments

Comments
 (0)