Skip to content

Commit 9e7e32f

Browse files
authored
Merge pull request #59015 from nextcloud/copilot/add-taskprocessing-worker-command
feat(taskprocessing): add worker command for synchronous task processing
2 parents 9e65c59 + a51d744 commit 9e7e32f

File tree

5 files changed

+546
-0
lines changed

5 files changed

+546
-0
lines changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
7+
* SPDX-License-Identifier: AGPL-3.0-or-later
8+
*/
9+
namespace OC\Core\Command\TaskProcessing;
10+
11+
use OC\Core\Command\Base;
12+
use OC\Core\Command\InterruptedException;
13+
use OCP\TaskProcessing\Exception\Exception;
14+
use OCP\TaskProcessing\Exception\NotFoundException;
15+
use OCP\TaskProcessing\IManager;
16+
use OCP\TaskProcessing\ISynchronousProvider;
17+
use Psr\Log\LoggerInterface;
18+
use Symfony\Component\Console\Input\InputInterface;
19+
use Symfony\Component\Console\Input\InputOption;
20+
use Symfony\Component\Console\Output\OutputInterface;
21+
22+
class WorkerCommand extends Base {
23+
public function __construct(
24+
private readonly IManager $taskProcessingManager,
25+
private readonly LoggerInterface $logger,
26+
) {
27+
parent::__construct();
28+
}
29+
30+
protected function configure(): void {
31+
$this
32+
->setName('taskprocessing:worker')
33+
->setDescription('Run a dedicated worker for synchronous TaskProcessing providers')
34+
->addOption(
35+
'timeout',
36+
't',
37+
InputOption::VALUE_OPTIONAL,
38+
'Duration in seconds after which the worker exits (0 = run indefinitely). You should regularly (e.g. every 5 minutes) restart this worker by using this option to make sure it picks up configuration changes.',
39+
0
40+
)
41+
->addOption(
42+
'interval',
43+
'i',
44+
InputOption::VALUE_OPTIONAL,
45+
'Sleep duration in seconds between polling iterations when no task was processed',
46+
1
47+
)
48+
->addOption(
49+
'once',
50+
null,
51+
InputOption::VALUE_NONE,
52+
'Process at most one task then exit'
53+
)
54+
->addOption(
55+
'taskTypes',
56+
null,
57+
InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY,
58+
'Only process tasks of the given task type IDs (can be specified multiple times)'
59+
);
60+
parent::configure();
61+
}
62+
63+
protected function execute(InputInterface $input, OutputInterface $output): int {
64+
$startTime = time();
65+
$timeout = (int)$input->getOption('timeout');
66+
$interval = (int)$input->getOption('interval');
67+
$once = $input->getOption('once') === true;
68+
/** @var list<string> $taskTypes */
69+
$taskTypes = $input->getOption('taskTypes');
70+
71+
if ($timeout > 0) {
72+
$output->writeln('<info>Task processing worker will stop after ' . $timeout . ' seconds</info>');
73+
}
74+
75+
while (true) {
76+
// Stop if timeout exceeded
77+
if ($timeout > 0 && ($startTime + $timeout) < time()) {
78+
$output->writeln('Timeout reached, exiting...', OutputInterface::VERBOSITY_VERBOSE);
79+
break;
80+
}
81+
82+
// Handle SIGTERM/SIGINT gracefully
83+
try {
84+
$this->abortIfInterrupted();
85+
} catch (InterruptedException $e) {
86+
$output->writeln('<info>Task processing worker stopped</info>');
87+
break;
88+
}
89+
90+
$processedTask = $this->processNextTask($output, $taskTypes);
91+
92+
if ($once) {
93+
break;
94+
}
95+
96+
if (!$processedTask) {
97+
$output->writeln('No task processed, waiting ' . $interval . ' second(s)...', OutputInterface::VERBOSITY_VERBOSE);
98+
sleep($interval);
99+
}
100+
}
101+
102+
return 0;
103+
}
104+
105+
/**
106+
* Attempt to process one task across all preferred synchronous providers.
107+
*
108+
* To avoid starvation, all eligible task types are first collected and then
109+
* the oldest scheduled task across all of them is fetched in a single query.
110+
* This ensures that tasks are processed in the order they were scheduled,
111+
* regardless of which provider handles them.
112+
*
113+
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
114+
* @return bool True if a task was processed, false if no task was found
115+
*/
116+
private function processNextTask(OutputInterface $output, array $taskTypes = []): bool {
117+
$providers = $this->taskProcessingManager->getProviders();
118+
119+
// Build a map of eligible taskTypeId => provider for all preferred synchronous providers
120+
/** @var array<string, ISynchronousProvider> $eligibleProviders */
121+
$eligibleProviders = [];
122+
foreach ($providers as $provider) {
123+
if (!$provider instanceof ISynchronousProvider) {
124+
continue;
125+
}
126+
127+
$taskTypeId = $provider->getTaskTypeId();
128+
129+
// If a task type whitelist was provided, skip providers not in the list
130+
if (!empty($taskTypes) && !in_array($taskTypeId, $taskTypes, true)) {
131+
continue;
132+
}
133+
134+
// Only use this provider if it is the preferred one for the task type
135+
try {
136+
$preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId);
137+
} catch (Exception $e) {
138+
$this->logger->error('Failed to get preferred provider for task type ' . $taskTypeId, ['exception' => $e]);
139+
continue;
140+
}
141+
142+
if ($provider->getId() !== $preferredProvider->getId()) {
143+
continue;
144+
}
145+
146+
$eligibleProviders[$taskTypeId] = $provider;
147+
}
148+
149+
if (empty($eligibleProviders)) {
150+
return false;
151+
}
152+
153+
// Fetch the oldest scheduled task across all eligible task types in one query.
154+
// This naturally prevents starvation: regardless of how many tasks one provider
155+
// has queued, another provider's older tasks will be picked up first.
156+
try {
157+
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
158+
} catch (NotFoundException) {
159+
return false;
160+
} catch (Exception $e) {
161+
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
162+
return false;
163+
}
164+
165+
$taskTypeId = $task->getTaskTypeId();
166+
$provider = $eligibleProviders[$taskTypeId];
167+
168+
$output->writeln(
169+
'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(),
170+
OutputInterface::VERBOSITY_VERBOSE
171+
);
172+
173+
$this->taskProcessingManager->processTask($task, $provider);
174+
175+
$output->writeln(
176+
'Finished processing task ' . $task->getId(),
177+
OutputInterface::VERBOSITY_VERBOSE
178+
);
179+
180+
return true;
181+
}
182+
}

core/register_command.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
use OC\Core\Command\TaskProcessing\EnabledCommand;
9292
use OC\Core\Command\TaskProcessing\GetCommand;
9393
use OC\Core\Command\TaskProcessing\Statistics;
94+
use OC\Core\Command\TaskProcessing\WorkerCommand;
9495
use OC\Core\Command\TwoFactorAuth\Cleanup;
9596
use OC\Core\Command\TwoFactorAuth\Enforce;
9697
use OC\Core\Command\TwoFactorAuth\State;
@@ -255,6 +256,7 @@
255256
$application->add(Server::get(Command\TaskProcessing\ListCommand::class));
256257
$application->add(Server::get(Statistics::class));
257258
$application->add(Server::get(Command\TaskProcessing\Cleanup::class));
259+
$application->add(Server::get(WorkerCommand::class));
258260

259261
$application->add(Server::get(RedisCommand::class));
260262
$application->add(Server::get(DistributedClear::class));

lib/composer/composer/autoload_classmap.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,6 +1423,7 @@
14231423
'OC\\Core\\Command\\TaskProcessing\\GetCommand' => $baseDir . '/core/Command/TaskProcessing/GetCommand.php',
14241424
'OC\\Core\\Command\\TaskProcessing\\ListCommand' => $baseDir . '/core/Command/TaskProcessing/ListCommand.php',
14251425
'OC\\Core\\Command\\TaskProcessing\\Statistics' => $baseDir . '/core/Command/TaskProcessing/Statistics.php',
1426+
'OC\\Core\\Command\\TaskProcessing\\WorkerCommand' => $baseDir . '/core/Command/TaskProcessing/WorkerCommand.php',
14261427
'OC\\Core\\Command\\TwoFactorAuth\\Base' => $baseDir . '/core/Command/TwoFactorAuth/Base.php',
14271428
'OC\\Core\\Command\\TwoFactorAuth\\Cleanup' => $baseDir . '/core/Command/TwoFactorAuth/Cleanup.php',
14281429
'OC\\Core\\Command\\TwoFactorAuth\\Disable' => $baseDir . '/core/Command/TwoFactorAuth/Disable.php',

lib/composer/composer/autoload_static.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,6 +1464,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
14641464
'OC\\Core\\Command\\TaskProcessing\\GetCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/GetCommand.php',
14651465
'OC\\Core\\Command\\TaskProcessing\\ListCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/ListCommand.php',
14661466
'OC\\Core\\Command\\TaskProcessing\\Statistics' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/Statistics.php',
1467+
'OC\\Core\\Command\\TaskProcessing\\WorkerCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/WorkerCommand.php',
14671468
'OC\\Core\\Command\\TwoFactorAuth\\Base' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Base.php',
14681469
'OC\\Core\\Command\\TwoFactorAuth\\Cleanup' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Cleanup.php',
14691470
'OC\\Core\\Command\\TwoFactorAuth\\Disable' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Disable.php',

0 commit comments

Comments
 (0)