Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
06e0ad1
add priority test
jlabedo May 22, 2025
500483c
Bus routing selector
jlabedo May 22, 2025
7cca7e4
introduce better separation with BusRoutingConfigBuilder
jlabedo May 22, 2025
9b5b8aa
wip
jlabedo May 23, 2025
3f34db3
wip
jlabedo May 23, 2025
153b3b2
save - missing aggregate factory methods handling
jlabedo May 23, 2025
a4cad61
cleaning
jlabedo May 23, 2025
9c946c9
cleaning
jlabedo May 23, 2025
b315714
wip
jlabedo May 26, 2025
d2c8953
wip
jlabedo May 26, 2025
417171d
main tests passing
jlabedo May 26, 2025
0f35426
Scheduling the same command multiple times for multiple handlers
unixslayer Mar 27, 2025
2b8260e
es tests passing + same logging behaviour
jlabedo Jun 2, 2025
640806e
fix es test with infinite loop
jlabedo Jun 2, 2025
7ac20af
fix dbal auto incremented id test
jlabedo Jun 2, 2025
7eb3b18
distribution module
jlabedo Jun 3, 2025
f5653ba
cleaning
jlabedo Jun 3, 2025
26bf02c
fix test
jlabedo Jun 3, 2025
b5233b6
cleaning
jlabedo Jun 3, 2025
12dcc24
fix amqp distributed bus
jlabedo Jun 3, 2025
4359089
refactor
jlabedo Jun 3, 2025
43d8f1f
cleaning
jlabedo Jun 3, 2025
8af30a0
fix phpstan
jlabedo Jun 3, 2025
6204396
cleaning
jlabedo Jun 3, 2025
8c2a782
fix interceptors for aggregates
jlabedo Jun 3, 2025
e1e14ac
fix multitenant test
jlabedo Jun 3, 2025
bec94c0
Merge branch 'main' into event-routing
jlabedo Jun 3, 2025
7495e99
fix aggregate output channel
jlabedo Jun 4, 2025
ed5640c
rename test
jlabedo Jun 4, 2025
5786a0b
cleaning
jlabedo Jun 4, 2025
4f50d2b
Merge branch 'main' into event-routing
jlabedo Jun 4, 2025
fe93fcf
use same routing logic for aggregate query handlers
jlabedo Jun 4, 2025
1a867ca
Merge remote-tracking branch 'unixslayer/multiple-async-handlers-for-…
jlabedo Jun 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/Amqp/src/Configuration/AmqpModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private function __construct(AmqpDistributionModule $amqpDistributionModule)
*/
public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static
{
return new self(AmqpDistributionModule::create($annotationRegistrationService, $interfaceToCallRegistry));
return new self(AmqpDistributionModule::create());
}

/**
Expand Down
23 changes: 10 additions & 13 deletions packages/Amqp/src/Distribution/AmqpDistributionModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,22 @@ class AmqpDistributionModule
public const AMQP_ROUTING_KEY = 'ecotone.amqp.distributed.service_target';
public const CHANNEL_PREFIX = 'distributed_';

private array $distributedEventHandlers;
private array $distributedCommandHandlers;

public function __construct(array $distributedEventHandlers, array $distributedCommandHandlers)
public function __construct()
{
$this->distributedEventHandlers = $distributedEventHandlers;
$this->distributedCommandHandlers = $distributedCommandHandlers;
}

public static function create(AnnotationFinder $annotationFinder, InterfaceToCallRegistry $interfaceToCallRegistry): self
public static function create(): self
{
return new self(
DistributedHandlerModule::getDistributedEventHandlerRoutingKeys($annotationFinder, $interfaceToCallRegistry),
DistributedHandlerModule::getDistributedCommandHandlerRoutingKeys($annotationFinder, $interfaceToCallRegistry)
);
return new self();
}

public function getAmqpConfiguration(array $extensionObjects): array
{
$applicationConfiguration = ExtensionObjectResolver::resolveUnique(ServiceConfiguration::class, $extensionObjects, ServiceConfiguration::createWithDefaults());
$distributedModule = ExtensionObjectResolver::resolve(DistributedHandlerModule::class, $extensionObjects)[0] ?? null;

$distributedEventHandlerRoutingKeys = $distributedModule ? $distributedModule->getDistributedEventHandlerRoutes() : [];

$amqpConfiguration = [];
/** @var AmqpDistributedBusConfiguration $distributedBusConfiguration */
foreach ($extensionObjects as $distributedBusConfiguration) {
Expand All @@ -73,7 +69,7 @@ public function getAmqpConfiguration(array $extensionObjects): array
$amqpConfiguration[] = AmqpQueue::createWith($queueName);
$amqpConfiguration[] = AmqpBinding::createFromNames(self::AMQP_DISTRIBUTED_EXCHANGE, $queueName, $applicationConfiguration->getServiceName());

foreach ($this->distributedEventHandlers as $distributedEventHandler) {
foreach ($distributedEventHandlerRoutingKeys as $distributedEventHandler) {
/** Adjust star to RabbitMQ so it can substitute for zero or more words. */
$distributedEventHandler = str_replace('*', '#', $distributedEventHandler);
$amqpConfiguration[] = AmqpBinding::createFromNames(self::AMQP_DISTRIBUTED_EXCHANGE, $queueName, $distributedEventHandler);
Expand Down Expand Up @@ -126,7 +122,8 @@ public function canHandle($extensionObject): bool
{
return
$extensionObject instanceof AmqpDistributedBusConfiguration
|| $extensionObject instanceof ServiceConfiguration;
|| $extensionObject instanceof ServiceConfiguration
|| $extensionObject instanceof DistributedHandlerModule;
}

private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePublisherConfiguration $amqpPublisher, ServiceConfiguration $applicationConfiguration, Configuration $configuration): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Person
use WithEvents;

public const RENAME_COMMAND = 'person.rename';
public const REGISTER_COMMAND = 'person.register';

#[ORM\Id()]
#[ORM\Column(name: 'person_id', type: 'integer')]
Expand All @@ -43,20 +44,15 @@ class Person
#[ORM\Column(name: 'roles', type: 'json')]
public array $roles = [];

private function __construct(int $personId, string $name)
private function __construct(string $name)
{
$this->name = $name;

$this->recordThat(new PersonRegistered($personId, $name));
}

#[CommandHandler]
public static function register(RegisterPerson $command): static
#[CommandHandler(self::REGISTER_COMMAND)]
public static function register(string $name): static
{
$person = new self($command->getPersonId(), $command->getName());
if ($command->isException()) {
throw new RuntimeException('Exception');
}
$person = new self($name);

return $person;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/Dbal/tests/Integration/ORMTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function test_support_for_orm_with_auto_incremented_id(): void
'Test\Ecotone\Dbal\Fixture\ORM\AutogeneratedIdentifier',
]);

$generatedId = $ecotone->getGateway(CommandBus::class)->send(new RegisterPerson(100, 'Johny'));
$generatedId = $ecotone->getGateway(CommandBus::class)->sendWithRouting(\Test\Ecotone\Dbal\Fixture\ORM\AutogeneratedIdentifier\Person::REGISTER_COMMAND ,'Johny');

self::assertEquals(
'Johny',
Expand Down
4 changes: 3 additions & 1 deletion packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public function findAnnotatedClasses(string $annotationClassName): array;
public function findAnnotatedMethods(string $methodAnnotationClassName): array;

/**
* @return object
* @template T
* @param class-string<T> $attributeClassName
* @return T
* @throws InvalidArgumentException
*/
public function getAttributeForClass(string $className, string $attributeClassName): object;
Expand Down
2 changes: 1 addition & 1 deletion packages/Ecotone/src/Lite/LazyInMemoryContainer.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function set(string $id, mixed $object): void
private function resolveArgument(mixed $argument): mixed
{
if (is_array($argument)) {
return array_map(fn ($argument) => $this->resolveArgument($argument), $argument);
return array_map(fn ($a) => $this->resolveArgument($a), $argument);
} elseif ($argument instanceof Definition) {
$object = $this->instantiateDefinition($argument);
foreach ($argument->getMethodCalls() as $methodCall) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\PollingMetadata;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\Config\Routing\RoutingEvent;
use Ecotone\Modelling\Config\Routing\RoutingEventHandler;

#[ModuleAnnotation]
/**
* licence Apache-2.0
*/
class AsynchronousModule extends NoExternalConfigurationModule implements AnnotationModule
class AsynchronousModule implements AnnotationModule, RoutingEventHandler
{
/**
* @param array<string, array<string>> $asyncEndpoints
Expand Down Expand Up @@ -162,7 +165,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO

public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array
{
$defaultChannels = [];
$extensions = [$this];

if ($serviceConfiguration->isModulePackageEnabled(ModulePackageList::TEST_PACKAGE)) {
$polingChannelBuilders = array_map(
Expand All @@ -180,14 +183,14 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration,
continue;
}

$defaultChannels[] = SimpleMessageChannelBuilder::createQueueChannel(
$extensions[] = SimpleMessageChannelBuilder::createQueueChannel(
$endpointChannel,
true,
);
}
}

return $defaultChannels;
return $extensions;
}

public function getModulePackageName(): string
Expand Down Expand Up @@ -247,4 +250,22 @@ public function resolveChannels(array $extensionObjects): array

return $endpointChannels;
}

public function handleRoutingEvent(RoutingEvent $event, ?Configuration $messagingConfiguration = null): void
{
$registration = $event->getRegistration();
$isAsynchronous = $registration->hasMethodAnnotation(Asynchronous::class);
if (! $isAsynchronous) {
return;
}

$annotationForMethod = $registration->getAnnotationForMethod();
$asynchronous = $registration->getMethodAnnotationsWithType(Asynchronous::class)[0];

if ($annotationForMethod instanceof CommandHandler) {
Assert::isTrue(! in_array($annotationForMethod->getInputChannelName(), $asynchronous->getChannelName()), "Command Handler routing key can't be equal to asynchronous channel name in {$registration}");
} elseif ($annotationForMethod instanceof EventHandler) {
Assert::isTrue(! in_array($annotationForMethod->getListenTo(), $asynchronous->getChannelName()), "Event Handler listen to routing can't be equal to asynchronous channel name in {$registration}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,6 @@ private function configureAsynchronousEndpoints(): void
UninterruptibleServiceActivator::create(
HeaderEnricher::create([
MessageBusChannel::COMMAND_CHANNEL_NAME_BY_NAME => null,
MessageBusChannel::COMMAND_CHANNEL_NAME_BY_OBJECT => null,
MessageBusChannel::EVENT_CHANNEL_NAME_BY_OBJECT => null,
MessageBusChannel::EVENT_CHANNEL_NAME_BY_NAME => null,
MessageHeaders::ROUTING_SLIP => implode(',', $consequentialChannels),
]),
Expand Down
14 changes: 14 additions & 0 deletions packages/Ecotone/src/Messaging/Config/PriorityBasedOnType.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,18 @@ public function getType(): string
{
return $this->type;
}

public function getPriorityArray(): array
{
return [$this->number, $this->getTypePriority()];
}

private function getTypePriority(): int
{
return match ($this->type) {
self::AGGREGATE_TYPE => 3,
self::PROJECTION_TYPE => 2,
default => 1
};
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php
declare(strict_types = 1);

/*
* licence Apache-2.0
Expand All @@ -15,7 +16,7 @@
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Handler\MessageProcessor;

class ChainedMessageProcessorBuilder
class ChainedMessageProcessorBuilder implements CompilableBuilder
{
private ?InterceptedMessageProcessorBuilder $interceptedProcessor = null;

Expand Down Expand Up @@ -81,4 +82,36 @@ public function compileProcessor(MessagingContainerBuilder $builder, MethodInter
default => new Definition(ChainedMessageProcessor::class, [$compiledProcessors])
};
}

private array $annotations = [];
public function withEndpointAnnotations(array $annotations): self
{
$self = clone $this;
$self->annotations = $annotations;

return $self;
}

private iterable $requiredInterceptorNames = [];
public function withRequiredInterceptorNames(array $requiredInterceptorNames): self
{
$self = clone $this;
$self->requiredInterceptorNames = $requiredInterceptorNames;

return $self;
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
$interceptedInterface = $this->getInterceptedInterface();
$interceptorsConfiguration = $interceptedInterface
? $builder->getRelatedInterceptors(
$interceptedInterface,
$this->annotations,
$this->requiredInterceptorNames,
)
: MethodInterceptorsConfiguration::createEmpty();

return $this->compileProcessor($builder, $interceptorsConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php
/*
* licence Apache-2.0
*/
declare(strict_types=1);

namespace Ecotone\Messaging\Handler\Router;

use Ecotone\Messaging\Handler\ChannelResolver;
use Ecotone\Messaging\Handler\MessageProcessor;
use Ecotone\Messaging\Handler\Processor\SendToChannelProcessor;

class RouteToChannelResolver implements RouteResolver
{
public function __construct(
public ChannelResolver $channelResolver
)
{
}

public function resolve(string $routeName): MessageProcessor
{
$channel = $this->channelResolver->resolve($routeName);
return new SendToChannelProcessor($channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use Ecotone\Messaging\Handler\ParameterConverterBuilder;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder;
use Ecotone\Messaging\Support\Assert;

use function get_class;
use function is_string;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,13 @@ public static function createRecipientListRouter(array $recipientList): self
);
}

public static function createHeaderExistsRouter(string $headerName, string $routeToChannel, string $fallbackRoute): self
public static function createHeaderExistsRouter(string $headerName, CompilableBuilder $existsProcessor, CompilableBuilder $fallbackProcessor): self
{
return new self(
HeaderExistsRouter::create($headerName, $routeToChannel, $fallbackRoute)->getDefinition(),
HeaderExistsRouter::create($headerName, 'exists', 'fallback')->getDefinition(),
[
$routeToChannel => new Definition(SendToChannelProcessor::class, [
new ChannelReference($routeToChannel),
]),
$fallbackRoute => new Definition(SendToChannelProcessor::class, [
new ChannelReference($fallbackRoute),
]),
'exists' => $existsProcessor,
'fallback' => $fallbackProcessor,
]
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,18 @@ public function chainInterceptedProcessor(InterceptedMessageProcessorBuilder $pr

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
$interceptedInterface = $this->chainedMessageProcessorBuilder->getInterceptedInterface();
$interceptorsConfiguration = $interceptedInterface
? $builder->getRelatedInterceptors(
$interceptedInterface,
$this->getEndpointAnnotations(),
$this->getRequiredInterceptorNames()
)
: MethodInterceptorsConfiguration::createEmpty();

$name = $this->getInterceptedInterface($builder->getInterfaceToCallRegistry())->toString();
$processor = $this->chainedMessageProcessorBuilder->compileProcessor($builder, $interceptorsConfiguration);
$chainedMessageProcessorBuilder = $this->chainedMessageProcessorBuilder
->withRequiredInterceptorNames($this->getRequiredInterceptorNames())
->withEndpointAnnotations($this->getEndpointAnnotations());

return new Definition(
RequestReplyProducer::class,
[
$this->outputMessageChannelName ? new ChannelReference($this->outputMessageChannelName) : null,
$processor,
$chainedMessageProcessorBuilder->compile($builder),
new Reference(ChannelResolver::class),
$this->isReplyRequired,
$name,
$chainedMessageProcessorBuilder->getInterceptedInterface()?->getName() ?? '',
]
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@

namespace Ecotone\Modelling\AggregateFlow\SaveAggregate;

use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\ClassDefinition;
use Ecotone\Messaging\Handler\Enricher\PropertyReaderAccessor;
use Ecotone\Messaging\Handler\MessageProcessor;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateResolver;
use Ecotone\Modelling\Attribute\NamedEvent;
use Ecotone\Modelling\AggregateMessage;
use Ecotone\Modelling\Event;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\Repository\AggregateRepository;
Expand Down Expand Up @@ -78,15 +75,6 @@ private function publishEvents(array $events): void
{
foreach ($events as $event) {
$this->eventBus->publish($event->getPayload(), $event->getMetadata());

$eventDefinition = ClassDefinition::createFor(TypeDescriptor::createFromVariable($event->getPayload()));
$namedEvent = TypeDescriptor::create(NamedEvent::class);
if ($eventDefinition->hasClassAnnotation($namedEvent)) {
/** @var NamedEvent $namedEvent */
$namedEvent = $eventDefinition->getSingleClassAnnotation($namedEvent);

$this->eventBus->publishWithRouting($namedEvent->getName(), $event->getPayload(), MediaType::APPLICATION_X_PHP, $event->getMetadata());
}
}
}

Expand Down
Loading
Loading