Skip to content

Commit 34e84fe

Browse files
committed
feat: Add async import queue infrastructure
Add Symfony Messenger and TYPO3 Scheduler integration for asynchronous XLIFF import processing, enabling background job handling and improved user experience for large file imports. **Symfony Messenger Integration:** - ImportTranslationsMessage: Serializable message for import jobs - ImportTranslationsMessageHandler: Processes import messages via queue - Messenger routing configuration to 'doctrine' transport **TYPO3 Scheduler Integration:** - ProcessMessengerQueueTask: Scheduler task for queue processing - ProcessMessengerQueueTaskAdditionalFieldProvider: Backend form fields - Allows automatic processing via Scheduler vs systemd/supervisor **Database Schema:** - tx_nrtextdb_import_job_status table for async job tracking - Stores job metadata, progress, and completion status This infrastructure enables: - Non-blocking imports for better UX - Automatic background processing via cron - Job status tracking and error handling - Horizontal scaling via multiple workers Technical implementation follows TYPO3 v13 patterns: - PSR-14 MessageHandler attribute for auto-registration - Constructor dependency injection - Strict types and PHPStan level 10 compliance
1 parent edc5884 commit 34e84fe

File tree

6 files changed

+512
-0
lines changed

6 files changed

+512
-0
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
<?php
2+
3+
/**
4+
* This file is part of the package netresearch/nr-textdb.
5+
*
6+
* For the full copyright and license information, please read the
7+
* LICENSE file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Netresearch\NrTextdb\Queue\Handler;
13+
14+
use Netresearch\NrTextdb\Domain\Repository\ImportJobStatusRepository;
15+
use Netresearch\NrTextdb\Queue\Message\ImportTranslationsMessage;
16+
use Netresearch\NrTextdb\Service\ImportService;
17+
use Psr\Log\LoggerInterface;
18+
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
19+
use Throwable;
20+
21+
/**
22+
* ImportTranslationsMessageHandler - Processes async XLIFF import jobs.
23+
*
24+
* This handler is automatically registered with TYPO3 v13 Symfony Messenger via
25+
* the #[AsMessageHandler] attribute. It processes ImportTranslationsMessage instances
26+
* from the queue and performs the actual import with progress tracking.
27+
*
28+
* CRITICAL: Errors are caught and logged to prevent worker crashes. TYPO3 Messenger
29+
* does NOT automatically retry failed messages.
30+
*
31+
* @author Rico Sonntag <[email protected]>
32+
* @license Netresearch https://www.netresearch.de
33+
* @link https://www.netresearch.de
34+
*/
35+
#[AsMessageHandler]
36+
final readonly class ImportTranslationsMessageHandler
37+
{
38+
/**
39+
* Constructor with dependency injection.
40+
*/
41+
public function __construct(
42+
private ImportService $importService,
43+
private ImportJobStatusRepository $jobStatusRepository,
44+
private LoggerInterface $logger,
45+
) {
46+
}
47+
48+
/**
49+
* Process the import message.
50+
*
51+
* This method is called by the Symfony Messenger worker when a message is consumed.
52+
* It coordinates the entire import process with status tracking and error handling.
53+
*
54+
* @param ImportTranslationsMessage $message The message to process
55+
*/
56+
public function __invoke(ImportTranslationsMessage $message): void
57+
{
58+
$jobId = $message->jobId;
59+
$filePath = $message->filePath;
60+
61+
$this->logger->info('Import job started', [
62+
'jobId' => $jobId,
63+
'filename' => $message->originalFilename,
64+
'fileSize' => $message->fileSize,
65+
]);
66+
67+
try {
68+
// Update status to processing
69+
$this->jobStatusRepository->updateStatus($jobId, 'processing');
70+
71+
// Perform the import
72+
$imported = 0;
73+
$updated = 0;
74+
$errors = [];
75+
76+
$this->importService->importFile(
77+
$filePath,
78+
$message->forceUpdate,
79+
$imported,
80+
$updated,
81+
$errors
82+
);
83+
84+
// Update progress counters
85+
$this->jobStatusRepository->updateProgress($jobId, $imported, $updated);
86+
87+
// Check if there were any errors
88+
if ($errors !== []) {
89+
$errorMessage = implode("\n", $errors);
90+
$this->jobStatusRepository->updateStatus($jobId, 'completed', $errorMessage);
91+
$this->logger->warning('Import completed with errors', [
92+
'jobId' => $jobId,
93+
'imported' => $imported,
94+
'updated' => $updated,
95+
'errors' => count($errors),
96+
]);
97+
} else {
98+
$this->jobStatusRepository->updateStatus($jobId, 'completed');
99+
$this->logger->info('Import job completed successfully', [
100+
'jobId' => $jobId,
101+
'imported' => $imported,
102+
'updated' => $updated,
103+
]);
104+
}
105+
106+
// Clean up temporary file
107+
if (file_exists($filePath)) {
108+
@unlink($filePath);
109+
}
110+
} catch (Throwable $e) {
111+
// CRITICAL: Catch all errors to prevent worker crashes
112+
// TYPO3 Messenger does NOT automatically retry failed messages
113+
$errorMessage = sprintf(
114+
'Import failed: %s in %s:%d',
115+
$e->getMessage(),
116+
$e->getFile(),
117+
$e->getLine()
118+
);
119+
120+
$this->jobStatusRepository->updateStatus($jobId, 'failed', $errorMessage);
121+
122+
$this->logger->error('Import job failed', [
123+
'jobId' => $jobId,
124+
'filename' => $message->originalFilename,
125+
'error' => $e->getMessage(),
126+
'trace' => $e->getTraceAsString(),
127+
]);
128+
129+
// Clean up temporary file even on failure
130+
if (file_exists($filePath)) {
131+
@unlink($filePath);
132+
}
133+
134+
// Do NOT rethrow - would crash worker
135+
// Errors are logged and status is updated for user feedback
136+
}
137+
}
138+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
/**
4+
* This file is part of the package netresearch/nr-textdb.
5+
*
6+
* For the full copyright and license information, please read the
7+
* LICENSE file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Netresearch\NrTextdb\Queue\Message;
13+
14+
/**
15+
* ImportTranslationsMessage - Symfony Messenger message for async import processing.
16+
*
17+
* Simple DTO (Data Transfer Object) containing all data needed for async XLIFF import.
18+
* This message is dispatched to the Symfony Messenger queue and consumed by
19+
* ImportTranslationsMessageHandler in a background worker.
20+
*
21+
* @author Rico Sonntag <[email protected]>
22+
* @license Netresearch https://www.netresearch.de
23+
* @link https://www.netresearch.de
24+
*/
25+
final readonly class ImportTranslationsMessage
26+
{
27+
/**
28+
* @param string $jobId Unique job identifier (UUID)
29+
* @param string $filePath Absolute path to uploaded XLIFF file
30+
* @param string $originalFilename Original filename from upload
31+
* @param int $fileSize File size in bytes
32+
* @param bool $forceUpdate Whether to force update existing translations
33+
* @param int $backendUserId Backend user who initiated the import
34+
*/
35+
public function __construct(
36+
public string $jobId,
37+
public string $filePath,
38+
public string $originalFilename,
39+
public int $fileSize,
40+
public bool $forceUpdate,
41+
public int $backendUserId,
42+
) {
43+
}
44+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
<?php
2+
3+
/**
4+
* This file is part of the package netresearch/nr-textdb.
5+
*
6+
* For the full copyright and license information, please read the
7+
* LICENSE file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Netresearch\NrTextdb\Task;
13+
14+
use Exception;
15+
use Psr\Log\LoggerAwareInterface;
16+
use Psr\Log\LoggerAwareTrait;
17+
use Psr\Log\LoggerInterface;
18+
use Symfony\Component\Process\Process;
19+
use TYPO3\CMS\Core\Utility\GeneralUtility;
20+
use TYPO3\CMS\Scheduler\Task\AbstractTask;
21+
22+
/**
23+
* Scheduler task to process Symfony Messenger queue messages.
24+
*
25+
* This task allows running the Messenger consumer via TYPO3 Scheduler instead of
26+
* requiring systemd/supervisor. It runs messenger:consume with a time limit to
27+
* process queued messages in batches.
28+
*
29+
* Setup: Add this task to Scheduler with frequency (e.g., every 5 minutes).
30+
* The task will process messages for a limited time (default 2 minutes) then exit,
31+
* allowing the next Scheduler run to pick up where it left off.
32+
*
33+
* @author Rico Sonntag <[email protected]>
34+
* @license Netresearch https://www.netresearch.de
35+
* @link https://www.netresearch.de
36+
*/
37+
class ProcessMessengerQueueTask extends AbstractTask implements LoggerAwareInterface
38+
{
39+
use LoggerAwareTrait;
40+
41+
/**
42+
* Time limit in seconds for message processing.
43+
* Should be LESS than the Scheduler frequency to avoid overlapping runs.
44+
*/
45+
public int $timeLimit = 120; // 2 minutes
46+
47+
/**
48+
* Transport name to consume from (default: doctrine).
49+
*/
50+
public string $transport = 'doctrine';
51+
52+
/**
53+
* Execute the Messenger consumer.
54+
*
55+
* This method is called by the TYPO3 Scheduler when the task runs.
56+
* It spawns messenger:consume as a subprocess with time limit.
57+
*
58+
* @return bool TRUE on success, FALSE on failure
59+
*/
60+
public function execute(): bool
61+
{
62+
$vendorBin = GeneralUtility::getFileAbsFileName('typo3conf/../vendor/bin/typo3');
63+
64+
if (!file_exists($vendorBin)) {
65+
$this->logError('TYPO3 CLI binary not found at: ' . $vendorBin);
66+
67+
return false;
68+
}
69+
70+
// Build command: messenger:consume <transport> --time-limit=<seconds>
71+
$command = [
72+
PHP_BINARY,
73+
$vendorBin,
74+
'messenger:consume',
75+
$this->transport,
76+
'--time-limit=' . $this->timeLimit,
77+
'--quiet', // Suppress output
78+
];
79+
80+
try {
81+
$process = new Process($command);
82+
$process->setTimeout($this->timeLimit + 30); // Allow 30s buffer
83+
$process->run();
84+
85+
if (!$process->isSuccessful()) {
86+
$this->logError('Messenger consumer failed: ' . $process->getErrorOutput());
87+
88+
return false;
89+
}
90+
91+
// Success - messages processed (or none were pending)
92+
return true;
93+
} catch (Exception $e) {
94+
$this->logError('Exception during message processing: ' . $e->getMessage());
95+
96+
return false;
97+
}
98+
}
99+
100+
/**
101+
* Log error message.
102+
*/
103+
private function logError(string $message): void
104+
{
105+
if ($this->logger instanceof LoggerInterface) {
106+
$this->logger->error($message);
107+
}
108+
}
109+
110+
/**
111+
* Get additional information for task display in backend.
112+
*/
113+
public function getAdditionalInformation(): string
114+
{
115+
return sprintf(
116+
'Transport: %s | Time limit: %d seconds',
117+
$this->transport,
118+
$this->timeLimit
119+
);
120+
}
121+
}

0 commit comments

Comments
 (0)