Skip to content

Commit 1ae13ff

Browse files
committed
feat(taskprocessing): add cleanup flag to tasks to decide if they should be cleaned up automatically
Signed-off-by: Julien Veyssier <[email protected]>
1 parent 84323ff commit 1ae13ff

File tree

12 files changed

+190
-50
lines changed

12 files changed

+190
-50
lines changed

core/Controller/TaskProcessingApiController.php

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public function setFileContentsExApp(int $taskId): DataResponse {
391391
* @return StreamResponse<Http::STATUS_OK, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_NOT_FOUND, array{message: string}, array{}>
392392
*/
393393
private function getFileContentsInternal(Task $task, int $fileId): StreamResponse|DataResponse {
394-
$ids = $this->extractFileIdsFromTask($task);
394+
$ids = $this->taskProcessingManager->extractFileIdsFromTask($task);
395395
if (!in_array($fileId, $ids)) {
396396
return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND);
397397
}
@@ -428,45 +428,6 @@ private function getFileContentsInternal(Task $task, int $fileId): StreamRespons
428428
return $response;
429429
}
430430

431-
/**
432-
* @param Task $task
433-
* @return list<int>
434-
* @throws NotFoundException
435-
*/
436-
private function extractFileIdsFromTask(Task $task): array {
437-
$ids = [];
438-
$taskTypes = $this->taskProcessingManager->getAvailableTaskTypes();
439-
if (!isset($taskTypes[$task->getTaskTypeId()])) {
440-
throw new NotFoundException('Could not find task type');
441-
}
442-
$taskType = $taskTypes[$task->getTaskTypeId()];
443-
foreach ($taskType['inputShape'] + $taskType['optionalInputShape'] as $key => $descriptor) {
444-
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
445-
/** @var int|list<int> $inputSlot */
446-
$inputSlot = $task->getInput()[$key];
447-
if (is_array($inputSlot)) {
448-
$ids = array_merge($inputSlot, $ids);
449-
} else {
450-
$ids[] = $inputSlot;
451-
}
452-
}
453-
}
454-
if ($task->getOutput() !== null) {
455-
foreach ($taskType['outputShape'] + $taskType['optionalOutputShape'] as $key => $descriptor) {
456-
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
457-
/** @var int|list<int> $outputSlot */
458-
$outputSlot = $task->getOutput()[$key];
459-
if (is_array($outputSlot)) {
460-
$ids = array_merge($outputSlot, $ids);
461-
} else {
462-
$ids[] = $outputSlot;
463-
}
464-
}
465-
}
466-
}
467-
return $ids;
468-
}
469-
470431
/**
471432
* Sets the task progress
472433
*
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
7+
* SPDX-License-Identifier: AGPL-3.0-or-later
8+
*/
9+
namespace OC\Core\Migrations;
10+
11+
use Closure;
12+
use OCP\DB\ISchemaWrapper;
13+
use OCP\DB\Types;
14+
use OCP\Migration\Attributes\AddColumn;
15+
use OCP\Migration\Attributes\ColumnType;
16+
use OCP\Migration\IOutput;
17+
use OCP\Migration\SimpleMigrationStep;
18+
19+
/**
20+
*
21+
*/
22+
#[AddColumn(table: 'taskprocessing_tasks', name: 'cleanup', type: ColumnType::SMALLINT)]
23+
class Version32000Date20250806110519 extends SimpleMigrationStep {
24+
25+
/**
26+
* @param IOutput $output
27+
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
28+
* @param array $options
29+
* @return null|ISchemaWrapper
30+
*/
31+
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
32+
/** @var ISchemaWrapper $schema */
33+
$schema = $schemaClosure();
34+
35+
if ($schema->hasTable('taskprocessing_tasks')) {
36+
$table = $schema->getTable('taskprocessing_tasks');
37+
if (!$table->hasColumn('cleanup')) {
38+
$table->addColumn('cleanup', Types::SMALLINT, [
39+
'notnull' => true,
40+
'default' => 1,
41+
'unsigned' => true,
42+
]);
43+
return $schema;
44+
}
45+
}
46+
47+
return null;
48+
}
49+
}

core/ResponseDefinitions.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@
200200
* scheduledAt: ?int,
201201
* startedAt: ?int,
202202
* endedAt: ?int,
203+
* cleanup: bool,
203204
* }
204205
*
205206
* @psalm-type CoreProfileAction = array{

lib/composer/composer/autoload_classmap.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,7 @@
15121512
'OC\\Core\\Migrations\\Version31000Date20250213102442' => $baseDir . '/core/Migrations/Version31000Date20250213102442.php',
15131513
'OC\\Core\\Migrations\\Version32000Date20250620081925' => $baseDir . '/core/Migrations/Version32000Date20250620081925.php',
15141514
'OC\\Core\\Migrations\\Version32000Date20250731062008' => $baseDir . '/core/Migrations/Version32000Date20250731062008.php',
1515+
'OC\\Core\\Migrations\\Version32000Date20250806110519' => $baseDir . '/core/Migrations/Version32000Date20250806110519.php',
15151516
'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php',
15161517
'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php',
15171518
'OC\\Core\\Service\\LoginFlowV2Service' => $baseDir . '/core/Service/LoginFlowV2Service.php',

lib/composer/composer/autoload_static.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,6 +1553,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
15531553
'OC\\Core\\Migrations\\Version31000Date20250213102442' => __DIR__ . '/../../..' . '/core/Migrations/Version31000Date20250213102442.php',
15541554
'OC\\Core\\Migrations\\Version32000Date20250620081925' => __DIR__ . '/../../..' . '/core/Migrations/Version32000Date20250620081925.php',
15551555
'OC\\Core\\Migrations\\Version32000Date20250731062008' => __DIR__ . '/../../..' . '/core/Migrations/Version32000Date20250731062008.php',
1556+
'OC\\Core\\Migrations\\Version32000Date20250806110519' => __DIR__ . '/../../..' . '/core/Migrations/Version32000Date20250806110519.php',
15561557
'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php',
15571558
'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php',
15581559
'OC\\Core\\Service\\LoginFlowV2Service' => __DIR__ . '/../../..' . '/core/Service/LoginFlowV2Service.php',

lib/private/TaskProcessing/Db/Task.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
* @method int getStartedAt()
4646
* @method setEndedAt(int $endedAt)
4747
* @method int getEndedAt()
48+
* @method setCleanup(int $cleanup)
49+
* @method int getCleanup()
4850
*/
4951
class Task extends Entity {
5052
protected $lastUpdated;
@@ -63,16 +65,17 @@ class Task extends Entity {
6365
protected $scheduledAt;
6466
protected $startedAt;
6567
protected $endedAt;
68+
protected $cleanup;
6669

6770
/**
6871
* @var string[]
6972
*/
70-
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at'];
73+
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'cleanup'];
7174

7275
/**
7376
* @var string[]
7477
*/
75-
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt'];
78+
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'cleanup'];
7679

7780

7881
public function __construct() {
@@ -94,6 +97,7 @@ public function __construct() {
9497
$this->addType('scheduledAt', 'integer');
9598
$this->addType('startedAt', 'integer');
9699
$this->addType('endedAt', 'integer');
100+
$this->addType('cleanup', 'integer');
97101
}
98102

99103
public function toRow(): array {
@@ -122,6 +126,7 @@ public static function fromPublicTask(OCPTask $task): self {
122126
'scheduledAt' => $task->getScheduledAt(),
123127
'startedAt' => $task->getStartedAt(),
124128
'endedAt' => $task->getEndedAt(),
129+
'cleanup' => $task->getCleanup(),
125130
]);
126131
return $taskEntity;
127132
}
@@ -144,6 +149,7 @@ public function toPublicTask(): OCPTask {
144149
$task->setScheduledAt($this->getScheduledAt());
145150
$task->setStartedAt($this->getStartedAt());
146151
$task->setEndedAt($this->getEndedAt());
152+
$task->setCleanup($this->getCleanup() !== 0);
147153
return $task;
148154
}
149155
}

lib/private/TaskProcessing/Db/TaskMapper.php

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,16 +183,38 @@ public function findTasks(
183183

184184
/**
185185
* @param int $timeout
186+
* @param bool $force If true, ignore the cleanup flag
186187
* @return int the number of deleted tasks
187188
* @throws Exception
188189
*/
189-
public function deleteOlderThan(int $timeout): int {
190+
public function deleteOlderThan(int $timeout, bool $force = false): int {
190191
$qb = $this->db->getQueryBuilder();
191192
$qb->delete($this->tableName)
192193
->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
194+
if (!$force) {
195+
$qb->andWhere($qb->expr()->eq('cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));
196+
}
193197
return $qb->executeStatement();
194198
}
195199

200+
/**
201+
* @param int $timeout
202+
* @param bool $force If true, ignore the cleanup flag
203+
* @return \Generator<Task>
204+
* @throws Exception
205+
*/
206+
public function getTasksToCleanup(int $timeout, bool $force = false): \Generator {
207+
$qb = $this->db->getQueryBuilder();
208+
$qb->select($this->tableName)
209+
->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
210+
if (!$force) {
211+
$qb->andWhere($qb->expr()->eq('cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));
212+
}
213+
foreach ($this->yieldEntities($qb) as $entity) {
214+
yield $entity;
215+
};
216+
}
217+
196218
public function update(Entity $entity): Entity {
197219
$entity->setLastUpdated($this->timeFactory->now()->getTimestamp());
198220
return parent::update($entity);

lib/private/TaskProcessing/Manager.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,45 @@ private function validateUserAccessToFile(mixed $fileId, ?string $userId): void
14481448
}
14491449
}
14501450

1451+
/**
1452+
* @param Task $task
1453+
* @return list<int>
1454+
* @throws NotFoundException
1455+
*/
1456+
public function extractFileIdsFromTask(Task $task): array {
1457+
$ids = [];
1458+
$taskTypes = $this->getAvailableTaskTypes();
1459+
if (!isset($taskTypes[$task->getTaskTypeId()])) {
1460+
throw new NotFoundException('Could not find task type');
1461+
}
1462+
$taskType = $taskTypes[$task->getTaskTypeId()];
1463+
foreach ($taskType['inputShape'] + $taskType['optionalInputShape'] as $key => $descriptor) {
1464+
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
1465+
/** @var int|list<int> $inputSlot */
1466+
$inputSlot = $task->getInput()[$key];
1467+
if (is_array($inputSlot)) {
1468+
$ids = array_merge($inputSlot, $ids);
1469+
} else {
1470+
$ids[] = $inputSlot;
1471+
}
1472+
}
1473+
}
1474+
if ($task->getOutput() !== null) {
1475+
foreach ($taskType['outputShape'] + $taskType['optionalOutputShape'] as $key => $descriptor) {
1476+
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
1477+
/** @var int|list<int> $outputSlot */
1478+
$outputSlot = $task->getOutput()[$key];
1479+
if (is_array($outputSlot)) {
1480+
$ids = array_merge($outputSlot, $ids);
1481+
} else {
1482+
$ids[] = $outputSlot;
1483+
}
1484+
}
1485+
}
1486+
}
1487+
return $ids;
1488+
}
1489+
14511490
/**
14521491
* Make a request to the task's webhookUri if necessary
14531492
*

lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@
99
use OC\TaskProcessing\Db\TaskMapper;
1010
use OCP\AppFramework\Utility\ITimeFactory;
1111
use OCP\BackgroundJob\TimedJob;
12+
use OCP\DB\Exception;
1213
use OCP\Files\AppData\IAppDataFactory;
14+
use OCP\Files\File;
15+
use OCP\Files\InvalidPathException;
16+
use OCP\Files\IRootFolder;
1317
use OCP\Files\NotFoundException;
1418
use OCP\Files\NotPermittedException;
1519
use OCP\Files\SimpleFS\ISimpleFolder;
20+
use OCP\TaskProcessing\IManager;
1621
use Psr\Log\LoggerInterface;
1722

1823
class RemoveOldTasksBackgroundJob extends TimedJob {
@@ -22,6 +27,8 @@ class RemoveOldTasksBackgroundJob extends TimedJob {
2227
public function __construct(
2328
ITimeFactory $timeFactory,
2429
private TaskMapper $taskMapper,
30+
private IManager $taskProcessingManager,
31+
private IRootFolder $rootFolder,
2532
private LoggerInterface $logger,
2633
IAppDataFactory $appDataFactory,
2734
) {
@@ -37,6 +44,11 @@ public function __construct(
3744
* @inheritDoc
3845
*/
3946
protected function run($argument): void {
47+
try {
48+
$this->cleanupTaskProcessingTaskFiles();
49+
} catch (\Exception $e) {
50+
$this->logger->warning('Failed to delete stale task processing tasks files', ['exception' => $e]);
51+
}
4052
try {
4153
$this->taskMapper->deleteOlderThan(self::MAX_TASK_AGE_SECONDS);
4254
} catch (\OCP\DB\Exception $e) {
@@ -52,11 +64,6 @@ protected function run($argument): void {
5264
} catch (NotFoundException $e) {
5365
// noop
5466
}
55-
try {
56-
$this->clearFilesOlderThan($this->appData->getFolder('TaskProcessing'), self::MAX_TASK_AGE_SECONDS);
57-
} catch (NotFoundException $e) {
58-
// noop
59-
}
6067
}
6168

6269
/**
@@ -76,4 +83,29 @@ private function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds):
7683
}
7784
}
7885

86+
/**
87+
* @return void
88+
* @throws InvalidPathException
89+
* @throws NotFoundException
90+
* @throws \JsonException
91+
* @throws Exception
92+
* @throws \OCP\TaskProcessing\Exception\NotFoundException
93+
*/
94+
private function cleanupTaskProcessingTaskFiles(): void {
95+
foreach ($this->taskMapper->getTasksToCleanup(self::MAX_TASK_AGE_SECONDS) as $task) {
96+
$ocpTask = $task->toPublicTask();
97+
$fileIds = $this->taskProcessingManager->extractFileIdsFromTask($ocpTask);
98+
foreach ($fileIds as $fileId) {
99+
// only look for output files stored in appData/TaskProcessing/
100+
$file = $this->rootFolder->getFirstNodeByIdInPath($fileId, '/' . $this->rootFolder->getAppDataDirectoryName() . '/TaskProcessing/');
101+
if ($file instanceof File) {
102+
try {
103+
$file->delete();
104+
} catch (NotPermittedException $e) {
105+
$this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
106+
}
107+
}
108+
}
109+
}
110+
}
79111
}

lib/public/TaskProcessing/IManager.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,4 +234,14 @@ public function lockTask(Task $task): bool;
234234
* @since 30.0.0
235235
*/
236236
public function setTaskStatus(Task $task, int $status): void;
237+
238+
/**
239+
* Extract all input and output file IDs from a task
240+
*
241+
* @param Task $task
242+
* @return list<int>
243+
* @throws NotFoundException
244+
* @since 32.0.0
245+
*/
246+
public function extractFileIdsFromTask(Task $task): array;
237247
}

0 commit comments

Comments
 (0)