Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"psr/log": "^3.0.0",
"ramsey/uuid": "^4.7",
"symfony/cache": "^7.2",
"symfony/process": "^7.1",
"symfony/var-dumper": "^7.1",
"symfony/var-exporter": "^7.1",
"tempest/highlight": "^2.0",
Expand Down
12 changes: 12 additions & 0 deletions src/Tempest/CommandBus/src/AsyncCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Attribute;

#[Attribute]
final readonly class AsyncCommand
{
}
38 changes: 38 additions & 0 deletions src/Tempest/CommandBus/src/AsyncCommandMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Ramsey\Uuid\Uuid;
use Tempest\Core\KernelEvent;
use Tempest\EventBus\EventHandler;
use Tempest\Reflection\ClassReflector;

final readonly class AsyncCommandMiddleware implements CommandBusMiddleware
{
public function __construct(
private CommandBusConfig $commandBusConfig,
private CommandRepository $repository,
) {
}

#[EventHandler(KernelEvent::BOOTED)]
public function onBooted(): void
{
$this->commandBusConfig->addMiddleware(self::class);
}

public function __invoke(object $command, CommandBusMiddlewareCallable $next): void
{
$reflector = new ClassReflector($command);

if ($reflector->hasAttribute(AsyncCommand::class)) {
$this->repository->store(Uuid::uuid7()->toString(), $command);

return;
}

$next($command);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus\AsyncCommandRepositories;

use Tempest\CommandBus\CommandRepository;
use Tempest\CommandBus\Exceptions\CouldNotResolveCommand;
use function Tempest\Support\arr;

final readonly class FileCommandRepository implements CommandRepository
{
public function store(string $uuid, object $command): void
{
$payload = serialize($command);

file_put_contents(__DIR__ . "/../stored-commands/{$uuid}.pending.txt", $payload);
}

public function find(string $uuid): object
{
$path = __DIR__ . "/../stored-commands/{$uuid}.pending.txt";

if (! file_exists($path)) {
throw new CouldNotResolveCommand($uuid);
}

$payload = file_get_contents($path);

return unserialize($payload);
}

public function markAsDone(string $uuid): void
{
$path = __DIR__ . "/../stored-commands/{$uuid}.pending.txt";

unlink($path);
}

public function markAsFailed(string $uuid): void
{
rename(
from: __DIR__ . "/../stored-commands/{$uuid}.pending.txt",
to: __DIR__ . "/../stored-commands/{$uuid}.failed.txt",
);
}

public function getPendingUuids(): array
{
return arr(glob(__DIR__ . "/../stored-commands/*.pending.txt"))
->map(function (string $path) {
return str_replace('.pending.txt', '', pathinfo($path, PATHINFO_BASENAME));
})
->toArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus\AsyncCommandRepositories;

use Tempest\CommandBus\CommandRepository;

final class MemoryRepository implements CommandRepository
{
private array $commands = [];

public function store(string $uuid, object $command): void
{
$this->commands[$uuid] = $command;
}

public function find(string $uuid): object
{
return $this->commands[$uuid];
}

public function markAsDone(string $uuid): void
{
unset($this->commands[$uuid]);
}

public function markAsFailed(string $uuid): void
{
unset($this->commands[$uuid]);
}

public function getPendingUuids(): array
{
return array_keys($this->commands);
}
}
4 changes: 4 additions & 0 deletions src/Tempest/CommandBus/src/CommandBusConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Tempest\CommandBus;

use Tempest\CommandBus\AsyncCommandRepositories\FileCommandRepository;
use Tempest\Reflection\MethodReflector;

final class CommandBusConfig
Expand All @@ -14,6 +15,9 @@ public function __construct(

/** @var array<array-key, class-string<\Tempest\CommandBus\CommandBusMiddleware>> */
public array $middleware = [],

/** @var class-string<\Tempest\CommandBus\CommandRepository> $commandRepositoryClass */
public string $commandRepositoryClass = FileCommandRepository::class,
) {
}

Expand Down
19 changes: 19 additions & 0 deletions src/Tempest/CommandBus/src/CommandRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

interface CommandRepository
{
public function store(string $uuid, object $command): void;

public function find(string $uuid): object;

public function markAsDone(string $uuid): void;

public function markAsFailed(string $uuid): void;

/** @return string[] */
public function getPendingUuids(): array;
}
20 changes: 20 additions & 0 deletions src/Tempest/CommandBus/src/CommandRepositoryInitializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Tempest\Container\Container;
use Tempest\Container\Initializer;
use Tempest\Container\Singleton;

final readonly class CommandRepositoryInitializer implements Initializer
{
#[Singleton]
public function initialize(Container $container): CommandRepository
{
$commandRepositoryClass = $container->get(CommandBusConfig::class)->commandRepositoryClass;

return $container->get($commandRepositoryClass);
}
}
11 changes: 11 additions & 0 deletions src/Tempest/CommandBus/src/Exceptions/CouldNotResolveCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus\Exceptions;

use Exception;

final class CouldNotResolveCommand extends Exception
{
}
68 changes: 68 additions & 0 deletions src/Tempest/CommandBus/src/HandleAsyncCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use Tempest\Console\Console;
use Tempest\Console\ConsoleCommand;
use Tempest\Console\ExitCode;
use Tempest\Console\HasConsole;
use Tempest\Container\Container;
use Throwable;

final readonly class HandleAsyncCommand
{
use HasConsole;

public function __construct(
private CommandBusConfig $commandBusConfig,
private Container $container,
private Console $console,
private CommandRepository $repository,
) {
}

#[ConsoleCommand(name: 'command:handle')]
public function __invoke(?string $uuid = null): ExitCode
{
$uuid ??= $this->repository->getPendingUuids()[0] ?? null;

if (! $uuid) {
$this->error('No pending command found');

return ExitCode::ERROR;
}

try {
$command = $this->repository->find($uuid);

$commandHandler = $this->commandBusConfig->handlers[$command::class] ?? null;

if (! $commandHandler) {
$commandClass = $command::class;

$this->error("No handler found for command {$commandClass}");

return ExitCode::ERROR;
}

$commandHandler->handler->invokeArgs(
$this->container->get($commandHandler->handler->getDeclaringClass()->getName()),
[$command],
);

$this->repository->markAsDone($uuid);

$this->success('Done');

return ExitCode::SUCCESS;
} catch (Throwable $throwable) {
$this->repository->markAsFailed($uuid);

$this->error($throwable->getMessage());

return ExitCode::ERROR;
}
}
}
95 changes: 95 additions & 0 deletions src/Tempest/CommandBus/src/MonitorAsyncCommands.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

declare(strict_types=1);

namespace Tempest\CommandBus;

use DateTimeImmutable;
use Symfony\Component\Process\Process;
use Tempest\Console\Console;
use Tempest\Console\ConsoleCommand;
use Tempest\Console\HasConsole;
use Tempest\Console\Input\ConsoleArgumentBag;
use function Tempest\Support\arr;

final readonly class MonitorAsyncCommands
{
use HasConsole;

public function __construct(
private CommandRepository $repository,
private ConsoleArgumentBag $argumentBag,
private Console $console,
) {
}

#[ConsoleCommand(name: 'command:monitor')]
public function __invoke(): void
{
$this->success("Monitoring for new commands. Press ctrl+c to stop.");

/** @var \Symfony\Component\Process\Process[] $processes */
$processes = [];

while (true) { // @phpstan-ignore-line
foreach ($processes as $uuid => $process) {
$time = new DateTimeImmutable();

if ($process->isTerminated()) {
if ($process->isSuccessful()) {
$this->writeln("<success>{$uuid}</success> finished at {$time->format('Y-m-d H:i:s')}");
} else {
$this->writeln("<error>{$uuid}</error> failed at {$time->format('Y-m-d H:i:s')}");
}

if ($output = trim($process->getOutput())) {
$this->writeln($output);
}

if ($errorOutput = trim($process->getErrorOutput())) {
$this->writeln($errorOutput);
}

unset($processes[$uuid]);
}
}

$availableUuids = arr($this->repository->getPendingUuids())
->filter(fn (string $uuid) => ! in_array($uuid, array_keys($processes)));

if (count($processes) === 5) {
$this->sleep(0.5);

continue;
}

if ($availableUuids->isEmpty()) {
$this->sleep(0.5);

continue;
}

// Start a task
$uuid = $availableUuids->first();

$time = new DateTimeImmutable();
$this->writeln("<h2>{$uuid}</h2> started at {$time->format('Y-m-d H:i:s')}");

$process = new Process([
$this->argumentBag->getBinaryPath(),
$this->argumentBag->getCliName(),
'command:handle',
$uuid,
], getcwd());

$process->start();

$processes[$uuid] = $process;
}
}

private function sleep(float $seconds): void
{
usleep((int) ($seconds * 1_000_000));
}
}
1 change: 1 addition & 0 deletions src/Tempest/CommandBus/src/stored-commands/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.txt
Loading