diff --git a/src/Illuminate/Concurrency/ProcessDriver.php b/src/Illuminate/Concurrency/ProcessDriver.php index bab43e61f309..832dc5231835 100644 --- a/src/Illuminate/Concurrency/ProcessDriver.php +++ b/src/Illuminate/Concurrency/ProcessDriver.php @@ -27,30 +27,52 @@ public function __construct(protected ProcessFactory $processFactory) /** * Run the given tasks concurrently and return an array containing the results. */ - public function run(Closure|array $tasks): array + public function run(Closure|array $tasks, int $timeout = null): array { $command = Application::formatCommandString('invoke-serialized-closure'); - $results = $this->processFactory->pool(function (Pool $pool) use ($tasks, $command) { + $results = $this->processFactory->pool(function (Pool $pool) use ($tasks, $command, $timeout) { foreach (Arr::wrap($tasks) as $key => $task) { - $pool->as($key)->path(base_path())->env([ + $process = $pool->as($key)->path(base_path())->env([ 'LARAVEL_INVOKABLE_CLOSURE' => serialize(new SerializableClosure($task)), ])->command($command); + + if ($timeout !== null) { + $process->timeout($timeout); + } } })->start()->wait(); return $results->collect()->mapWithKeys(function ($result, $key) { if ($result->failed()) { - throw new Exception('Concurrent process failed with exit code ['.$result->exitCode().']. Message: '.$result->errorOutput()); + $errorMessage = $result->errorOutput() ?: 'Process failed with no error output'; + throw new Exception("Concurrent process [{$key}] failed with exit code [{$result->exitCode()}]. Error: {$errorMessage}"); } - $result = json_decode($result->output(), true); + $output = $result->output(); + if (empty($output)) { + throw new Exception("Concurrent process [{$key}] produced no output"); + } + + $result = json_decode($output, true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new Exception("Concurrent process [{$key}] produced invalid JSON output: " . json_last_error_msg()); + } if (! $result['successful']) { - throw new $result['exception']( - ...(! empty(array_filter($result['parameters'])) - ? $result['parameters'] - : [$result['message']]) + $exceptionClass = $result['exception'] ?? Exception::class; + $message = $result['message'] ?? 'Unknown error occurred'; + $parameters = $result['parameters'] ?? []; + + // Ensure exception class exists + if (! class_exists($exceptionClass)) { + throw new Exception("Process [{$key}] failed with unknown exception class: {$exceptionClass}. Message: {$message}"); + } + + throw new $exceptionClass( + ...(! empty(array_filter($parameters)) + ? $parameters + : [$message]) ); } diff --git a/src/Illuminate/Concurrency/SyncDriver.php b/src/Illuminate/Concurrency/SyncDriver.php index a2f27bae54a8..ca6424f64a39 100644 --- a/src/Illuminate/Concurrency/SyncDriver.php +++ b/src/Illuminate/Concurrency/SyncDriver.php @@ -16,9 +16,13 @@ class SyncDriver implements Driver */ public function run(Closure|array $tasks): array { - return Collection::wrap($tasks)->map( - fn ($task) => $task() - )->all(); + return Collection::wrap($tasks)->map(function ($task, $key) { + try { + return $task(); + } catch (\Throwable $e) { + throw new \Exception("Synchronous task [{$key}] failed: " . $e->getMessage(), 0, $e); + } + })->all(); } /** diff --git a/src/Illuminate/Database/Connection.php b/src/Illuminate/Database/Connection.php index 9910fe911b66..3923587c4819 100755 --- a/src/Illuminate/Database/Connection.php +++ b/src/Illuminate/Database/Connection.php @@ -154,6 +154,13 @@ class Connection implements ConnectionInterface */ protected $queryLog = []; + /** + * The maximum number of queries to keep in the query log. + * + * @var int + */ + protected $maxQueryLogSize = 1000; + /** * Indicates whether queries are being logged. * @@ -858,6 +865,10 @@ public function logQuery($query, $bindings, $time = null) if ($this->loggingQueries) { $this->queryLog[] = compact('query', 'bindings', 'time'); + + if (count($this->queryLog) > $this->maxQueryLogSize) { + $this->queryLog = array_slice($this->queryLog, -$this->maxQueryLogSize, $this->maxQueryLogSize, false); + } } } @@ -1569,6 +1580,29 @@ public function logging() return $this->loggingQueries; } + /** + * Set the maximum query log size. + * + * @param int $size + * @return $this + */ + public function setMaxQueryLogSize(int $size) + { + $this->maxQueryLogSize = max(1, $size); + + return $this; + } + + /** + * Get the maximum query log size. + * + * @return int + */ + public function getMaxQueryLogSize() + { + return $this->maxQueryLogSize; + } + /** * Get the name of the connected database. * diff --git a/tests/Database/DatabaseConnectionTest.php b/tests/Database/DatabaseConnectionTest.php index 164da72f6a58..31d5c5dfb09b 100755 --- a/tests/Database/DatabaseConnectionTest.php +++ b/tests/Database/DatabaseConnectionTest.php @@ -549,6 +549,37 @@ protected function getMockConnection($methods = [], $pdo = null) return $connection; } + + public function testQueryLogSizeLimit() + { + $connection = $this->getMockConnection(); + $connection->enableQueryLog(); + $connection->setMaxQueryLogSize(3); + + $this->assertEquals(3, $connection->getMaxQueryLogSize()); + + for ($i = 0; $i < 5; $i++) { + $connection->logQuery("SELECT * FROM test WHERE id = ?", [$i], 10.0); + } + + $queryLog = $connection->getQueryLog(); + + $this->assertCount(3, $queryLog); + $this->assertEquals([2], $queryLog[0]['bindings']); + $this->assertEquals([3], $queryLog[1]['bindings']); + $this->assertEquals([4], $queryLog[2]['bindings']); + } + + public function testQueryLogSizeLimitMinimumValue() + { + $connection = $this->getMockConnection(); + + $connection->setMaxQueryLogSize(0); + $this->assertEquals(1, $connection->getMaxQueryLogSize()); + + $connection->setMaxQueryLogSize(-5); + $this->assertEquals(1, $connection->getMaxQueryLogSize()); + } } class DatabaseConnectionTestMockPDO extends PDO diff --git a/tests/Integration/Concurrency/ConcurrencyTest.php b/tests/Integration/Concurrency/ConcurrencyTest.php index 6de1ac4bbb6f..4086e5ac491c 100644 --- a/tests/Integration/Concurrency/ConcurrencyTest.php +++ b/tests/Integration/Concurrency/ConcurrencyTest.php @@ -156,6 +156,79 @@ function () { $this->assertEquals('second', $second); $this->assertEquals('third', $third); } + + public function testTimeoutHandling() + { + $this->expectException(Exception::class); + + $app = new Application(__DIR__); + $processDriver = new ProcessDriver($app->make(ProcessFactory::class)); + + // This should timeout after 1 second + $processDriver->run([ + fn () => sleep(5), // Task that takes 5 seconds + ], 1); + } + + public function testLargeDataHandling() + { + $largeString = str_repeat('x', 1000000); // 1MB string + + $results = Concurrency::driver('sync')->run([ + 'large_data' => fn () => $largeString, + 'small_data' => fn () => 'small', + ]); + + $this->assertEquals($largeString, $results['large_data']); + $this->assertEquals('small', $results['small_data']); + } + + public function testErrorHandlingWithKeys() + { + $this->expectException(Exception::class); + $this->expectExceptionMessageMatches('/task_1.*failed/'); + + Concurrency::driver('sync')->run([ + 'task_1' => fn () => throw new Exception('Test error'), + 'task_2' => fn () => 'success', + ]); + } + + public function testEmptyTaskArray() + { + $results = Concurrency::run([]); + $this->assertEquals([], $results); + } + + public function testSingleTaskExecution() + { + $result = Concurrency::run([fn () => 'single']); + $this->assertEquals(['single'], $result); + } + + public function testNestedArrayResults() + { + $results = Concurrency::run([ + fn () => ['nested' => ['data' => 'value']], + fn () => (object) ['property' => 'object_value'], + ]); + + $this->assertEquals(['nested' => ['data' => 'value']], $results[0]); + $this->assertEquals('object_value', $results[1]->property); + } + + public function testProcessDriverErrorMessages() + { + $this->expectException(Exception::class); + $this->expectExceptionMessageMatches('/Concurrent process.*failed with exit code/'); + + $app = new Application(__DIR__); + $processDriver = new ProcessDriver($app->make(ProcessFactory::class)); + + $processDriver->run([ + 'failing_task' => fn () => exit(1), + ]); + } } class ExceptionWithoutParam extends Exception