Skip to content

Commit 403f007

Browse files
authored
Merge branch 'master' into main
2 parents cbba102 + cc8193d commit 403f007

File tree

7 files changed

+80
-13
lines changed

7 files changed

+80
-13
lines changed

config/Migrations/20240307154751_MigrationQueueInitV8.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ class MigrationQueueInitV8 extends BaseMigration {
99
/**
1010
* Up Method.
1111
*
12-
* More information on this method is available here:
13-
* https://book.cakephp.org/phinx/0/en/migrations.html#the-up-method
14-
*
1512
* @return void
1613
*/
1714
public function up(): void {

config/Migrations/20250508121432_MigrationQueueMemory.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ class MigrationQueueMemory extends BaseMigration {
88
/**
99
* Change Method.
1010
*
11-
* More information on this method is available here:
12-
* https://book.cakephp.org/migrations/4/en/migrations.html#the-change-method
13-
*
1411
* @return void
1512
*/
1613
public function change(): void {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
use Migrations\BaseMigration;
5+
6+
class MigrationQueueIndexOptimization extends BaseMigration {
7+
8+
/**
9+
* Change Method.
10+
*
11+
* @return void
12+
*/
13+
public function change(): void {
14+
$this->table('queued_jobs')
15+
// Main queue fetch optimization - CRITICAL
16+
// Covers the primary requestJob() query which filters by completed IS NULL
17+
// and orders by priority, age (calculated from notbefore), and id
18+
->addIndex(
19+
['completed', 'priority', 'notbefore', 'id'],
20+
[
21+
'name' => 'queue_fetch_optimization',
22+
],
23+
)
24+
// Cleanup queries optimization
25+
// Used by flushFailedJobs() which filters by fetched < threshold
26+
->addIndex(
27+
['fetched'],
28+
[
29+
'name' => 'fetched',
30+
],
31+
)
32+
// Stats and job type queries optimization
33+
// Used by getStats() and various job_task lookups
34+
->addIndex(
35+
['job_task', 'completed'],
36+
[
37+
'name' => 'job_task_completed',
38+
],
39+
)
40+
// Worker tracking optimization
41+
// Used in requestJob() for cost and unique constraints
42+
->addIndex(
43+
['workerkey'],
44+
[
45+
'name' => 'workerkey',
46+
],
47+
)
48+
->update();
49+
}
50+
51+
}

phpstan.neon

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ parameters:
1414
- identifier: missingType.iterableValue
1515
- identifier: missingType.generics
1616
- identifier: trait.unused
17-
- '#Negated boolean expression is always false.#'
1817
- '#Parameter \#1 \$.+ of function call_user_func_array expects .+, array.+ given.#'
1918

2019
includes:

src/Controller/Admin/QueuedJobsController.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,27 @@ public function import() {
142142
/** @var \Laminas\Diactoros\UploadedFile|null $file */
143143
$file = $this->request->getData('file');
144144
if ($file && $file->getError() == UPLOAD_ERR_OK && $file->getSize() > 0) {
145+
$clientMediaType = $file->getClientMediaType();
146+
if ($clientMediaType !== 'application/json') {
147+
throw new RuntimeException('Only JSON files are allowed');
148+
}
149+
145150
$content = file_get_contents($file->getStream()->getMetadata('uri'));
146151
if ($content === false) {
147152
throw new RuntimeException('Cannot parse file');
148153
}
154+
149155
$json = json_decode($content, true);
156+
if (json_last_error() !== JSON_ERROR_NONE) {
157+
throw new RuntimeException('Invalid JSON: ' . json_last_error_msg());
158+
}
159+
150160
if (!$json || empty($json['queuedJob'])) {
151-
throw new RuntimeException('Invalid JSON content');
161+
throw new RuntimeException('Invalid JSON content: missing queuedJob data');
162+
}
163+
164+
if (!is_array($json['queuedJob'])) {
165+
throw new RuntimeException('Invalid JSON structure: queuedJob must be an array');
152166
}
153167

154168
$data = $json['queuedJob'];

src/Model/Table/QueuedJobsTable.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ public function requestJob(array $tasks, array $groups = [], array $types = []):
613613

614614
break;
615615
case static::DRIVER_SQLITE:
616-
//TODO
616+
$tmp['strftime("%s", "now") >='] = $this->rateHistory[$tmp['job_task']] + $task['rate'];
617617

618618
break;
619619
}
@@ -632,8 +632,10 @@ public function requestJob(array $tasks, array $groups = [], array $types = []):
632632

633633
break;
634634
case static::DRIVER_SQLSERVER:
635-
// the ORM does not support ROW locking at the moment
636-
// TODO
635+
// Row-level locking (ROWLOCK, UPDLOCK hints) not supported by CakePHP ORM.
636+
// SQLServer uses default isolation level (READ COMMITTED) with automatic
637+
// row locking on UPDATE. This may allow race conditions in high-concurrency
638+
// scenarios. Consider using application-level locking or advisory locks if needed.
637639

638640
break;
639641
case static::DRIVER_SQLITE:
@@ -925,8 +927,10 @@ public function cleanOldJobs(): int {
925927
return 0;
926928
}
927929

930+
$threshold = (new DateTime())->subSeconds((int)Configure::read('Queue.cleanuptimeout'));
931+
928932
return $this->deleteAll([
929-
'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout'),
933+
'completed <' => $threshold,
930934
]);
931935
}
932936

@@ -979,7 +983,7 @@ public function key(): string {
979983
if ($this->_key !== null) {
980984
return $this->_key;
981985
}
982-
$this->_key = sha1(microtime() . mt_rand(100, 999));
986+
$this->_key = bin2hex(random_bytes(20));
983987
if (!$this->_key) {
984988
throw new RuntimeException('Invalid key generated');
985989
}

src/Queue/Task/ExecuteTask.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
use Cake\Console\CommandInterface;
1212
use Cake\Console\ConsoleIo;
13+
use Cake\Core\Configure;
1314
use Cake\Log\LogTrait;
1415
use Queue\Model\QueueException;
1516
use Queue\Queue\AddInterface;
@@ -85,6 +86,10 @@ public function run(array $data, int $jobId): void {
8586
'accepted' => [CommandInterface::CODE_SUCCESS],
8687
];
8788

89+
if (!$data['escape'] && !Configure::read('debug')) {
90+
throw new QueueException('Command escaping must be enabled when debug mode is off for security reasons');
91+
}
92+
8893
$command = $data['command'];
8994
if ($data['escape']) {
9095
$command = escapeshellcmd($data['command']);

0 commit comments

Comments
 (0)