Skip to content

Commit 856ec01

Browse files
committed
feat(async): AsyncProcess
Signed-off-by: Maxence Lange <[email protected]>
1 parent 34949e4 commit 856ec01

37 files changed

+3008
-1
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
7+
* SPDX-License-Identifier: AGPL-3.0-or-later
8+
*/
9+
namespace OC\Core\BackgroundJobs;
10+
11+
use OC\Async\Db\ProcessMapper;
12+
use OC\Async\ForkManager;
13+
use OC\Async\Wrappers\LoggerProcessWrapper;
14+
use OC\Config\Lexicon\CoreConfigLexicon;
15+
use OCP\AppFramework\Utility\ITimeFactory;
16+
use OCP\Async\Enum\ProcessExecutionTime;
17+
use OCP\BackgroundJob\TimedJob;
18+
use OCP\IAppConfig;
19+
20+
class AsyncProcessJob extends TimedJob {
21+
22+
public function __construct(
23+
ITimeFactory $time,
24+
private IAppConfig $appConfig,
25+
private ForkManager $forkManager,
26+
private ProcessMapper $processMapper,
27+
private LoggerProcessWrapper $loggerProcessWrapper,
28+
) {
29+
parent::__construct($time);
30+
31+
$this->setTimeSensitivity(self::TIME_SENSITIVE);
32+
$this->setInterval(60 * 5);
33+
}
34+
35+
protected function run(mixed $argument): void {
36+
$this->discoverLoopAddress();
37+
38+
$this->forkManager->setWrapper($this->loggerProcessWrapper);
39+
40+
$this->processMapper->resetFailedProcess();
41+
42+
$metadata = ['executionTime' => ProcessExecutionTime::LATER];
43+
foreach ($this->processMapper->getSessionOnStandBy() as $session) {
44+
$this->forkManager->forkSession($session, $metadata);
45+
}
46+
47+
$this->processMapper->removeSuccessfulProcess();
48+
49+
$this->forkManager->waitChildProcess();
50+
}
51+
52+
private function discoverLoopAddress(): void {
53+
if ($this->appConfig->hasKey('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS)) {
54+
return;
55+
}
56+
57+
$found = $this->forkManager->discoverLoopbackEndpoint();
58+
if ($found !== null) {
59+
$this->appConfig->setValueString('core', CoreConfigLexicon::ASYNC_LOOPBACK_ADDRESS, $found);
60+
}
61+
}
62+
}

core/Command/Async/Live.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
7+
* SPDX-License-Identifier: AGPL-3.0-or-later
8+
*/
9+
namespace OC\Core\Command\Async;
10+
11+
use OC\Async\AsyncManager;
12+
use OC\Async\AsyncProcess;
13+
use OC\Async\Db\ProcessMapper;
14+
use OC\Async\ForkManager;
15+
use OC\Async\Wrappers\CliProcessWrapper;
16+
use OCP\Async\Enum\ProcessExecutionTime;
17+
use Symfony\Component\Console\Command\Command;
18+
use Symfony\Component\Console\Input\InputInterface;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
21+
class Live extends Command {
22+
public function __construct(
23+
private readonly ProcessMapper $processMapper,
24+
private readonly ForkManager $forkManager,
25+
) {
26+
parent::__construct();
27+
}
28+
29+
protected function configure() {
30+
parent::configure();
31+
$this->setName('async:live')
32+
->setDescription('test');
33+
}
34+
35+
protected function execute(InputInterface $input, OutputInterface $output): int {
36+
CliProcessWrapper::initStyle($output);
37+
$this->forkManager->setWrapper(new CliProcessWrapper($output));
38+
39+
$metadata = ['_processExecutionTime' => ProcessExecutionTime::ASAP];
40+
while(true) {
41+
$this->processMapper->resetFailedProcess();
42+
43+
foreach ($this->processMapper->getSessionOnStandBy() as $session) {
44+
$this->forkManager->forkSession($session, $metadata);
45+
}
46+
47+
sleep(3);
48+
}
49+
50+
$this->forkManager->waitChildProcess();
51+
52+
return 0;
53+
}
54+
}

core/Command/Async/Manage.php

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
7+
* SPDX-License-Identifier: AGPL-3.0-or-later
8+
*/
9+
10+
namespace OC\Core\Command\Async;
11+
12+
use OC\Async\AsyncManager;
13+
use OC\Async\Db\ProcessMapper;
14+
use OC\Async\ForkManager;
15+
use OC\Async\Model\Process;
16+
use OC\Async\Model\ProcessInterface;
17+
use OC\Async\Model\SessionInterface;
18+
use OCP\Async\Enum\ProcessExecutionTime;
19+
use OCP\Async\Enum\ProcessStatus;
20+
use Symfony\Component\Console\Command\Command;
21+
use Symfony\Component\Console\Formatter\OutputFormatterStyle;
22+
use Symfony\Component\Console\Helper\QuestionHelper;
23+
use Symfony\Component\Console\Input\InputInterface;
24+
use Symfony\Component\Console\Input\InputOption;
25+
use Symfony\Component\Console\Output\OutputInterface;
26+
use Symfony\Component\Console\Question\ConfirmationQuestion;
27+
28+
class Manage extends Command {
29+
private bool $noCrop = false;
30+
31+
public function __construct(
32+
private AsyncManager $asyncManager,
33+
private ForkManager $forkManager,
34+
private ProcessMapper $processMapper,
35+
) {
36+
parent::__construct();
37+
}
38+
39+
protected function configure() {
40+
parent::configure();
41+
$this->setName('async:manage')
42+
->addOption('clean', '', InputOption::VALUE_NONE, 'remove successful session')
43+
->addOption('session', '', InputOption::VALUE_REQUIRED, 'list all processes from a session', '')
44+
->addOption( 'details', '', InputOption::VALUE_REQUIRED, 'get details about a specific process', '')
45+
->addOption( 'full-details', '', InputOption::VALUE_NONE, 'get full details')
46+
->addOption( 'replay', '', InputOption::VALUE_REQUIRED, 'replay a specific process', '')
47+
->setDescription('manage');
48+
}
49+
50+
protected function execute(InputInterface $input, OutputInterface $output): int {
51+
if ($input->getOption('clean')) {
52+
$count = $this->processMapper->removeSuccessfulProcess();
53+
$output->writeln('deleted ' . $count . ' process');
54+
return 0;
55+
}
56+
57+
$replay = $input->getOption('replay');
58+
if ($replay !== '') {
59+
$this->replayProcess($input, $output, $replay);
60+
return 0;
61+
}
62+
63+
$this->noCrop = $input->getOption('full-details');
64+
$output->getFormatter()->setStyle('data', new OutputFormatterStyle('#666', '', []));
65+
$output->getFormatter()->setStyle('prep', new OutputFormatterStyle('#ccc', '', []));
66+
$output->getFormatter()->setStyle('standby', new OutputFormatterStyle('#aaa', '', ['bold']));
67+
$output->getFormatter()->setStyle('running', new OutputFormatterStyle('#0a0', '', []));
68+
$output->getFormatter()->setStyle('blocker', new OutputFormatterStyle('#c00', '', ['bold']));
69+
$output->getFormatter()->setStyle('error', new OutputFormatterStyle('#d00', '', []));
70+
$output->getFormatter()->setStyle('success', new OutputFormatterStyle('#0c0', '', ['bold']));
71+
72+
$details = $input->getOption('details');
73+
if ($details !== '') {
74+
$this->displayProcess($output, $details);
75+
return 0;
76+
}
77+
78+
$session = $input->getOption('session');
79+
if ($session !== '') {
80+
$this->displaySession($output, $session);
81+
return 0;
82+
}
83+
84+
$this->summary($output);
85+
return 0;
86+
}
87+
88+
private function summary(OutputInterface $output): void {
89+
$statuses = [];
90+
foreach ($this->processMapper->getSessions() as $token) {
91+
$sessionProcesses = $this->processMapper->getBySession($token);
92+
$sessionIface = new SessionInterface(ProcessInterface::asProcessInterfaces($sessionProcesses));
93+
94+
$status = $sessionIface->getGlobalStatus()->value;
95+
if (!array_key_exists($status, $statuses)) {
96+
$statuses[$status] = [];
97+
}
98+
$statuses[$status][] = $token;
99+
}
100+
101+
if (!empty($success = $statuses[(string)ProcessStatus::SUCCESS->value] ?? [])) {
102+
$output->writeln('<comment>Successful</comment> session to be removed next cron (' . count($success) . '): <info>' . implode('</info>,<info> ', $success) . '</info>');
103+
}
104+
105+
if (!empty($prep = $statuses[(string)ProcessStatus::PREP->value] ?? [])) {
106+
$output->writeln('Session with <comment>PREP</comment> status (' . count($prep) . '): <info>' . implode('</info>,<info> ', $prep) . '</info>');
107+
}
108+
109+
if (!empty($stand = $statuses[ProcessStatus::STANDBY->value] ?? [])) {
110+
$output->writeln('Session in <comment>stand-by</comment> (' . count($stand) . '): <info>' . implode('</info>,<info> ', $stand) . '</info>');
111+
}
112+
113+
if (!empty($running = $statuses[ProcessStatus::RUNNING->value] ?? [])) {
114+
$output->writeln('Currently <comment>running</comment> session (' . count($running) . '): <info>' . implode('</info>,<info> ', $running) . '</info>');
115+
}
116+
117+
if (!empty($err = $statuses[ProcessStatus::ERROR->value] ?? [])) {
118+
$output->writeln('<comment>Erroneous</comment> session (' . count($err) . '): <info>' . implode('</info>,<info> ', $err) . '</info>');
119+
}
120+
121+
if (!empty($blocker = $statuses[ProcessStatus::BLOCKER->value] ?? [])) {
122+
$output->writeln('<comment>Blocked</comment> session (' . count($blocker) . '): <info>' . implode('</info>,<info> ', $blocker) . '</info>');
123+
}
124+
}
125+
126+
127+
private function displaySession(OutputInterface $output, string $token): void {
128+
foreach ($this->processMapper->getBySession($token) as $process) {
129+
$output->writeln('ProcessToken: <data>' . $process->getToken() . '</data>');
130+
$output->writeln('ProcessType: <data>' . $process->getProcessType()->name . '</data>');
131+
$output->writeln('Interface: <data>' . $this->displayInterface($process) . '</data>');
132+
$output->writeln('ProcessStatus: ' . $this->displayStatus($process));
133+
$output->writeln('Replay: <data>' . $process->getReplayCount() . '</data>');
134+
135+
$output->writeln('');
136+
}
137+
138+
}
139+
140+
141+
private function displayProcess(OutputInterface $output, string $token): void {
142+
$process = $this->processMapper->getByToken($token);
143+
144+
$output->writeln('SessionToken: <data>' . $process->getSessionToken() . '</data>');
145+
$output->writeln('ProcessToken: <data>' . $process->getToken() . '</data>');
146+
$output->writeln('ProcessType: <data>' . $process->getProcessType()->name . '</data>');
147+
$output->writeln('Interface: <data>' . $this->displayInterface($process) . '</data>');
148+
149+
$output->writeln('Code: <data>' . $this->cropContent($process->getCode(), 3000, 15) . '</data>');
150+
$output->writeln('Params: <data>' . $this->cropContent(json_encode($process->getParams()), 3000, 15) . '</data>');
151+
$output->writeln('Metadata: <data>' . $this->cropContent(json_encode($process->getMetadata()), 3000, 15) . '</data>');
152+
$output->writeln('Result: <data>' . $this->cropContent(json_encode($process->getResult()), 3000, 15) . '</data>');
153+
154+
$output->writeln('Dataset: <data>' . $this->cropContent(json_encode($process->getDataset()), 3000, 15) . '</data>');
155+
$output->writeln('Links: <data>' . $this->cropContent(json_encode($process->getLinks()), 3000, 15) . '</data>');
156+
$output->writeln('Orig: <data>' . $this->cropContent(json_encode($process->getOrig()), 3000, 15) . '</data>');
157+
158+
$output->writeln('ExecutionTime: <data>' . ProcessExecutionTime::tryFrom($process->getExecutionTime())?->name . '</data>');
159+
$output->writeln('Creation: <data>' . $process->getCreation() . '</data>');
160+
$output->writeln('LastRun: <data>' . $process->getLastRun() . '</data>');
161+
$output->writeln('NextRun: <data>' . $process->getNextRun() . '</data>');
162+
$output->writeln('ProcessStatus: ' . $this->displayStatus($process));
163+
$output->writeln('Replay: <data>' . $process->getReplayCount() . '</data>');
164+
}
165+
166+
167+
private function displayStatus(Process $process): string {
168+
$name = $process->getProcessStatus()->name;
169+
$color = strtolower($name);
170+
return '<' . $color . '>' . $name . '</' . $color . '>';
171+
}
172+
173+
private function displayInterface(Process $process): string {
174+
$iface = new ProcessInterface(null, $process);
175+
176+
$data = [];
177+
$data[] = ($iface->getId() === '') ? '' : 'id=' . $iface->getId();
178+
$data[] = ($iface->getName() === '') ? '' : 'name=' . $iface->getName();
179+
$data[] = (!$iface->isReplayable()) ? '' : 'replayable=true';
180+
$data[] = (!$iface->isBlocker()) ? '' : 'blocker=true';
181+
$data[] = (empty($iface->getRequire())) ? '' : 'require=' . implode('.', $iface->getRequire());
182+
183+
return implode(', ', array_filter($data));
184+
}
185+
186+
187+
188+
189+
private function replayProcess(InputInterface $input, OutputInterface $output, string $token): void {
190+
$process = $this->processMapper->getByToken($token);
191+
if (!in_array($process->getProcessStatus(), [ProcessStatus::ERROR, ProcessStatus::BLOCKER], true)) {
192+
$output->writeln('only Process set as ERROR or BLOCKER can be replayed');
193+
return;
194+
}
195+
196+
$iface = new ProcessInterface(null, $process);
197+
if (!$iface->isReplayable()) {
198+
$output->writeln('');
199+
$output->writeln('Process is not set as <comment>replayable</comment>.');
200+
$output->writeln('Replaying this Process can create issues.');
201+
$output->writeln('');
202+
$question = new ConfirmationQuestion(
203+
'<comment>Do you really want to replay the Process ' . $token . ' ?</comment> (y/N) ',
204+
false,
205+
'/^(y|Y)/i'
206+
);
207+
208+
/** @var QuestionHelper $helper */
209+
$helper = $this->getHelper('question');
210+
if (!$helper->ask($input, $output, $question)) {
211+
$output->writeln('aborted.');
212+
return;
213+
}
214+
}
215+
216+
$process->replay(true);
217+
$this->processMapper->update($process);
218+
}
219+
220+
221+
/**
222+
* crop content after n lines or n chars
223+
*/
224+
private function cropContent(string $content, int $maxChars, int $maxLines): string {
225+
if ($this->noCrop) {
226+
return $content;
227+
}
228+
preg_match_all("/\\n/", utf8_decode($content), $matches, PREG_OFFSET_CAPTURE);
229+
return substr($content, 0, min($matches[0][$maxLines-1][1] ?? 99999, $maxChars));
230+
}
231+
}
232+

0 commit comments

Comments
 (0)