Skip to content

Commit 3ccfd9a

Browse files
committed
Implement copilot suggestions if valid
1 parent 3ce6055 commit 3ccfd9a

File tree

3 files changed

+108
-48
lines changed

3 files changed

+108
-48
lines changed

src/Command/SubprocessJobRunnerCommand.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
use Cake\Queue\Queue\Processor;
2525
use Enqueue\Null\NullConnectionFactory;
2626
use Enqueue\Null\NullMessage;
27+
use Interop\Queue\Message as QueueMessage;
2728
use Interop\Queue\Processor as InteropProcessor;
2829
use Psr\Log\NullLogger;
30+
use RuntimeException;
2931
use Throwable;
3032

3133
/**
@@ -124,9 +126,6 @@ protected function readInput(ConsoleIo $io): string
124126
}
125127

126128
$input .= $chunk;
127-
if ($input !== '' && strlen($chunk) < 8192) {
128-
break;
129-
}
130129
}
131130

132131
return $input;
@@ -144,6 +143,12 @@ protected function executeJob(array $data): string
144143
$context = $connectionFactory->createContext();
145144

146145
$messageClass = $data['messageClass'] ?? NullMessage::class;
146+
147+
// Validate message class for security
148+
if (!class_exists($messageClass) || !is_subclass_of($messageClass, QueueMessage::class)) {
149+
throw new RuntimeException(sprintf('Invalid message class: %s', $messageClass));
150+
}
151+
147152
$messageBody = json_encode($data['body']);
148153

149154
/** @var \Interop\Queue\Message $queueMessage */

src/Command/WorkerCommand.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,20 +197,21 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
197197
$this->abort();
198198
}
199199

200-
$processor = new $processorClass($logger, $this->container);
201-
200+
// Check subprocess mode before instantiating processor
202201
if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) {
203202
$subprocessConfig = array_merge(
204203
$config['subprocess'] ?? [],
205204
['enabled' => true],
206205
);
207206

208-
if (!($processor instanceof Processor)) {
207+
if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) {
209208
$io->error('Subprocess mode is only supported with the default Processor class');
210209
$this->abort();
211210
}
212211

213212
$processor = new SubprocessProcessor($logger, $subprocessConfig, $this->container);
213+
} else {
214+
$processor = new $processorClass($logger, $this->container);
214215
}
215216

216217
return $processor;

src/Queue/SubprocessProcessor.php

Lines changed: 96 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,36 @@
2929

3030
/**
3131
* Subprocess processor that executes jobs in isolated PHP processes.
32+
*
33+
* This processor spawns a new PHP process for each job, providing complete isolation
34+
* between jobs. This is useful for development environments where code changes need
35+
* to be reloaded without restarting the worker.
36+
*
37+
* Configuration options:
38+
* - `command`: Full command to execute (default: 'php bin/cake.php queue subprocess-runner')
39+
* - `timeout`: Maximum execution time in seconds (default: 300)
40+
* - `maxOutputSize`: Maximum output size in bytes (default: 1048576 = 1MB)
41+
*
42+
* Example configuration:
43+
* ```
44+
* 'Queue' => [
45+
* 'default' => [
46+
* 'subprocess' => [
47+
* 'command' => 'php bin/cake.php queue subprocess-runner',
48+
* 'timeout' => 60,
49+
* 'maxOutputSize' => 2097152, // 2MB
50+
* ],
51+
* ],
52+
* ],
53+
* ```
54+
*
3255
* Extends Processor to reuse event handling and processing logic (DRY principle).
3356
*/
3457
class SubprocessProcessor extends Processor
3558
{
3659
/**
3760
* @param \Psr\Log\LoggerInterface $logger Logger instance
38-
* @param array<string, mixed> $config Subprocess configuration
61+
* @param array<string, mixed> $config Subprocess configuration options
3962
* @param \Cake\Core\ContainerInterface|null $container DI container instance
4063
*/
4164
public function __construct(
@@ -188,63 +211,94 @@ protected function executeInSubprocess(array $jobData): array
188211
throw new RuntimeException('Failed to create subprocess');
189212
}
190213

191-
$jobDataJson = json_encode($jobData);
192-
if ($jobDataJson !== false) {
193-
fwrite($pipes[0], $jobDataJson);
194-
}
214+
try {
215+
$jobDataJson = json_encode($jobData);
216+
if ($jobDataJson !== false) {
217+
fwrite($pipes[0], $jobDataJson);
218+
}
195219

196-
fclose($pipes[0]);
220+
fclose($pipes[0]);
197221

198-
$output = '';
199-
$errorOutput = '';
200-
$startTime = time();
222+
$output = '';
223+
$errorOutput = '';
224+
$startTime = time();
225+
$maxOutputSize = $this->config['maxOutputSize'] ?? 1048576; // 1MB default
201226

202-
stream_set_blocking($pipes[1], false);
203-
stream_set_blocking($pipes[2], false);
227+
stream_set_blocking($pipes[1], false);
228+
stream_set_blocking($pipes[2], false);
204229

205-
while (true) {
206-
if ($timeout > 0 && (time() - $startTime) > $timeout) {
207-
proc_terminate($process, 9);
208-
fclose($pipes[1]);
209-
fclose($pipes[2]);
210-
proc_close($process);
230+
while (true) {
231+
if ($timeout > 0 && (time() - $startTime) > $timeout) {
232+
proc_terminate($process, 9);
211233

212-
return [
213-
'success' => false,
214-
'error' => sprintf('Subprocess execution timeout after %d seconds', $timeout),
215-
];
216-
}
234+
return [
235+
'success' => false,
236+
'error' => sprintf('Subprocess execution timeout after %d seconds', $timeout),
237+
];
238+
}
217239

218-
$read = [$pipes[1], $pipes[2]];
219-
$write = null;
220-
$except = null;
221-
$selectResult = stream_select($read, $write, $except, 1);
240+
$read = [$pipes[1], $pipes[2]];
241+
$write = null;
242+
$except = null;
243+
$selectResult = stream_select($read, $write, $except, 1);
222244

223-
if ($selectResult === false) {
224-
break;
225-
}
245+
if ($selectResult === false) {
246+
return [
247+
'success' => false,
248+
'error' => 'Stream select failed',
249+
];
250+
}
251+
252+
if (in_array($pipes[1], $read)) {
253+
$chunk = fread($pipes[1], 8192);
254+
if ($chunk !== false) {
255+
if (strlen($output) + strlen($chunk) > $maxOutputSize) {
256+
proc_terminate($process, 9);
226257

227-
if (in_array($pipes[1], $read)) {
228-
$chunk = fread($pipes[1], 8192);
229-
if ($chunk !== false) {
230-
$output .= $chunk;
258+
return [
259+
'success' => false,
260+
'error' => sprintf('Subprocess output exceeded maximum size of %d bytes', $maxOutputSize),
261+
];
262+
}
263+
264+
$output .= $chunk;
265+
}
231266
}
232-
}
233267

234-
if (in_array($pipes[2], $read)) {
235-
$chunk = fread($pipes[2], 8192);
236-
if ($chunk !== false) {
237-
$errorOutput .= $chunk;
268+
if (in_array($pipes[2], $read)) {
269+
$chunk = fread($pipes[2], 8192);
270+
if ($chunk !== false) {
271+
if (strlen($errorOutput) + strlen($chunk) > $maxOutputSize) {
272+
proc_terminate($process, 9);
273+
274+
return [
275+
'success' => false,
276+
'error' => sprintf(
277+
'Subprocess error output exceeded maximum size of %d bytes',
278+
$maxOutputSize,
279+
),
280+
];
281+
}
282+
283+
$errorOutput .= $chunk;
284+
}
285+
}
286+
287+
if (feof($pipes[1]) && feof($pipes[2])) {
288+
break;
238289
}
239290
}
291+
} finally {
292+
// Always cleanup resources
293+
if (is_resource($pipes[1])) {
294+
fclose($pipes[1]);
295+
}
240296

241-
if (feof($pipes[1]) && feof($pipes[2])) {
242-
break;
297+
if (is_resource($pipes[2])) {
298+
fclose($pipes[2]);
243299
}
244300
}
245301

246-
fclose($pipes[1]);
247-
fclose($pipes[2]);
248302
$exitCode = proc_close($process);
249303

250304
if ($exitCode !== 0 && empty($output)) {

0 commit comments

Comments
 (0)