Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\AnnotationFinder;

use Ecotone\Messaging\Handler\TypeDescriptor;
use InvalidArgumentException;

/**
Expand Down Expand Up @@ -155,9 +156,9 @@ public function hasClassAnnotation(string $type): bool
return false;
}

public function hasAnnotation(string $type): bool
public function hasAnnotation(string|TypeDescriptor $type): bool
{
return $this->hasMethodAnnotation($type) || $this->hasClassAnnotation($type);
return $this->hasMethodAnnotation((string) $type) || $this->hasClassAnnotation((string) $type);
}


Expand Down
4 changes: 3 additions & 1 deletion packages/Ecotone/src/AnnotationFinder/AnnotatedFinding.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Ecotone\AnnotationFinder;

use Ecotone\Messaging\Handler\TypeDescriptor;

/**
* licence Apache-2.0
*/
Expand Down Expand Up @@ -46,5 +48,5 @@ public function hasMethodAnnotation(string $type): bool;

public function hasClassAnnotation(string $type): bool;

public function hasAnnotation(string $type): bool;
public function hasAnnotation(string|TypeDescriptor $type): bool;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ecotone\Messaging\Attribute\EndpointAnnotation;
use Ecotone\Messaging\Attribute\InternalHandler;
use Ecotone\Messaging\Attribute\ModuleAnnotation;
use Ecotone\Messaging\Attribute\StreamBasedSource;
use Ecotone\Messaging\Channel\CombinedMessageChannel;
use Ecotone\Messaging\Channel\MessageChannelBuilder;
use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder;
Expand Down Expand Up @@ -65,6 +66,9 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
if ($annotationForMethod instanceof QueryHandler) {
continue;
}
if ($endpoint->hasClassAnnotation(StreamBasedSource::class)) {
continue;
}
if (in_array(get_class($annotationForMethod), [CommandHandler::class, EventHandler::class, InternalHandler::class])) {
if ($annotationForMethod->isEndpointIdGenerated()) {
throw ConfigurationException::create("{$endpoint} should have endpointId defined for handling asynchronously");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static function getFirstParameterClassIfAny(AnnotatedFinding $registratio
return null;
}

public static function getFirstParameterTypeFor(AnnotatedFinding $registration, InterfaceToCallRegistry $interfaceToCallRegistry): string
private static function getFirstParameterTypeFor(AnnotatedFinding $registration, InterfaceToCallRegistry $interfaceToCallRegistry): string
{
$interfaceToCall = $interfaceToCallRegistry->getFor($registration->getClassName(), $registration->getMethodName());

Expand All @@ -117,39 +117,6 @@ public static function getFirstParameterTypeFor(AnnotatedFinding $registration,
return TypeDescriptor::ARRAY;
}

/**
* This allows to decouple input channel for routing from specific channel that executes the Handler.
* This way each handler can be treated separately (e.g. for async processing)
*/
public static function getExecutionMessageHandlerChannel(AnnotatedFinding $registration): string
{
/** @var IdentifiedAnnotation $annotationForMethod */
$annotationForMethod = $registration->getAnnotationForMethod();

return $annotationForMethod->getEndpointId() . '.target';
}

public static function getRoutingInputMessageChannelForEventHandler(AnnotatedFinding $registration, InterfaceToCallRegistry $interfaceToCallRegistry): string
{
/** @var InputOutputEndpointAnnotation $annotationForMethod */
$annotationForMethod = $registration->getAnnotationForMethod();

$inputChannelName = null;
if ($annotationForMethod instanceof EventHandler) {
$inputChannelName = $annotationForMethod->getListenTo();
}

if (!$inputChannelName) {
$interfaceToCall = $interfaceToCallRegistry->getFor($registration->getClassName(), $registration->getMethodName());
if ($interfaceToCall->hasNoParameters()) {
throw ConfigurationException::create("Missing command class or listen routing for {$registration}.");
}
$inputChannelName = $interfaceToCall->getFirstParameterTypeHint();
}

return $inputChannelName;
}

/**
* @inheritDoc
*/
Expand Down
58 changes: 0 additions & 58 deletions packages/PdoEventSourcing/src/ChannelProjectionExecutor.php

This file was deleted.

91 changes: 49 additions & 42 deletions packages/PdoEventSourcing/src/Config/EventSourcingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
use Ecotone\EventSourcing\Prooph\LazyProophProjectionManager;
use Ecotone\EventSourcing\ProophEventMapper;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\EndpointAnnotation;
use Ecotone\Messaging\Attribute\ModuleAnnotation;
use Ecotone\Messaging\Attribute\PropagateHeaders;
use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\AsynchronousModule;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\NoExternalConfigurationModule;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ParameterConverterAnnotationFactory;
Expand All @@ -49,7 +47,10 @@
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Endpoint\InboundChannelAdapter\InboundChannelAdapterBuilder;
use Ecotone\Messaging\Gateway\MessagingEntrypointWithHeadersPropagation;
use Ecotone\Messaging\Handler\ChannelResolver;
use Ecotone\Messaging\Handler\ClassDefinition;
use Ecotone\Messaging\Handler\Filter\MessageFilterBuilder;
use Ecotone\Messaging\Handler\Gateway\GatewayProxyBuilder;
Expand All @@ -60,16 +61,20 @@
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\HeaderBuilder;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\PayloadBuilder;
use Ecotone\Messaging\Handler\ReferenceSearchService;
use Ecotone\Messaging\Handler\Router\RouterProcessor;
use Ecotone\Messaging\Handler\Router\RouterProcessorBuilder;
use Ecotone\Messaging\Handler\Router\RouteToChannelResolver;
use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder;
use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder;
use Ecotone\Messaging\Handler\Splitter\SplitterBuilder;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\NamedEvent;
use Ecotone\Modelling\Config\MessageBusChannel;
use Ecotone\Modelling\Config\MessageHandlerRoutingModule;
use Ecotone\Modelling\Config\Routing\BusRouteSelector;
use Ecotone\Modelling\Config\Routing\BusRoutingKeyResolver;
use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder;
use Ramsey\Uuid\Uuid;

#[ModuleAnnotation]
Expand All @@ -83,27 +88,16 @@ class EventSourcingModule extends NoExternalConfigurationModule
public const ECOTONE_ES_DELETE_PROJECTION = 'ecotone:es:delete-projection';
public const ECOTONE_ES_INITIALIZE_PROJECTION = 'ecotone:es:initialize-projection';
public const ECOTONE_ES_TRIGGER_PROJECTION = 'ecotone:es:trigger-projection';
/**
* @var ProjectionSetupConfiguration[]
*/
private array $projectionSetupConfigurations;
/** @var ServiceActivatorBuilder[] */
private array $projectionLifeCycleServiceActivators = [];
private AggregateStreamMapping $aggregateToStreamMapping;
private AggregateTypeMapping $aggregateTypeMapping;

/**
* @var ProjectionSetupConfiguration[]
* @var AnnotatedDefinition[] $projectionEventHandlers
* @var ServiceActivatorBuilder[]
* @var GatewayProxyBuilder[]
* @param ProjectionSetupConfiguration[] $projectionSetupConfigurations
* @param AnnotatedDefinition[] $projectionEventHandlers
* @param array<string, string> $namedEvents key is class name, value is event name
* @param ServiceActivatorBuilder[] $projectionLifeCycleServiceActivators
* @param GatewayProxyBuilder[] $projectionStateGateways
*/
private function __construct(array $projectionConfigurations, private array $projectionEventHandlers, private AsynchronousModule $asynchronousModule, array $projectionLifeCycleServiceActivators, AggregateStreamMapping $aggregateToStreamMapping, AggregateTypeMapping $aggregateTypeMapping, private array $projectionStateGateways)
private function __construct(private array $projectionSetupConfigurations, private array $projectionEventHandlers, private array $namedEvents, private array $projectionLifeCycleServiceActivators, private AggregateStreamMapping $aggregateToStreamMapping, private AggregateTypeMapping $aggregateTypeMapping, private array $projectionStateGateways)
{
$this->projectionSetupConfigurations = $projectionConfigurations;
$this->projectionLifeCycleServiceActivators = $projectionLifeCycleServiceActivators;
$this->aggregateToStreamMapping = $aggregateToStreamMapping;
$this->aggregateTypeMapping = $aggregateTypeMapping;
}

public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static
Expand Down Expand Up @@ -246,8 +240,13 @@ public static function create(AnnotationFinder $annotationRegistrationService, I

$projectionSetupConfigurations[$projectionAttribute->getName()] = $projectionConfiguration;
}
$namedEvents = [];
foreach ($annotationRegistrationService->findAnnotatedClasses(NamedEvent::class) as $className) {
$attribute = $annotationRegistrationService->getAttributeForClass($className, NamedEvent::class);
$namedEvents[$className] = $attribute->getName();
}

return new self($projectionSetupConfigurations, $projectionEventHandlers, AsynchronousModule::create($annotationRegistrationService, $interfaceToCallRegistry), $projectionLifeCyclesServiceActivators, AggregateStreamMapping::createWith($aggregateToStreamMapping), AggregateTypeMapping::createWith($aggregateTypeMapping), $projectionStateGateways);
return new self($projectionSetupConfigurations, $projectionEventHandlers, $namedEvents, $projectionLifeCyclesServiceActivators, AggregateStreamMapping::createWith($aggregateToStreamMapping), AggregateTypeMapping::createWith($aggregateTypeMapping), $projectionStateGateways);
}

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
Expand All @@ -268,7 +267,8 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
new Definition(LazyProophProjectionManager::class, [
Reference::to(EventSourcingConfiguration::class),
$this->projectionSetupConfigurations,
Reference::to(ReferenceSearchService::class),
Reference::to(MessagingEntrypointWithHeadersPropagation::class),
Reference::to(ConversionService::class),
Reference::to(LazyProophEventStore::class),
]),
);
Expand Down Expand Up @@ -559,28 +559,12 @@ private function registerProjections(ServiceConfiguration $serviceConfiguration,
$projectionRunningConfigurations[$extensionObject->getProjectionName()] = $extensionObject;
}
}
/** @var array<string, AnnotatedDefinition[]> $eventHandlersByProjectionName */
$eventHandlersByProjectionName = [];
foreach ($this->projectionEventHandlers as $projectionEventHandler) {
/** @var Projection $projectionAttribute */
$projectionAttribute = $projectionEventHandler->getAnnotationForClass();
/** @var EndpointAnnotation $handlerAttribute */
$handlerAttribute = $projectionEventHandler->getAnnotationForMethod();
$projectionConfiguration = $this->projectionSetupConfigurations[$projectionAttribute->getName()];

/** @TODO in case of Projection Event Handlers we don't need to register them asynchronously, so this can be simplified to assume always synchronous */
$eventHandlerTriggeringInputChannel = MessageHandlerRoutingModule::getExecutionMessageHandlerChannel($projectionEventHandler);
$eventHandlerSynchronousInputChannel = $serviceConfiguration->isModulePackageEnabled(ModulePackageList::ASYNCHRONOUS_PACKAGE) ? $this->asynchronousModule->getSynchronousChannelFor($eventHandlerTriggeringInputChannel, $handlerAttribute->getEndpointId()) : $eventHandlerTriggeringInputChannel;

$this->projectionSetupConfigurations[$projectionAttribute->getName()] = $projectionConfiguration->withProjectionEventHandler(
MessageHandlerRoutingModule::getRoutingInputMessageChannelForEventHandler($projectionEventHandler, $interfaceToCallRegistry),
$projectionEventHandler->getClassName(),
$projectionEventHandler->getMethodName(),
$eventHandlerSynchronousInputChannel
);
if (array_key_exists($projectionAttribute->getName(), $projectionRunningConfigurations)) {
$projectionRunningConfiguration = $projectionRunningConfigurations[$projectionAttribute->getName()];
$this->projectionSetupConfigurations[$projectionAttribute->getName()] = $this->projectionSetupConfigurations[$projectionAttribute->getName()]
->withPolling($projectionRunningConfiguration->isPolling());
}
$eventHandlersByProjectionName[$projectionAttribute->getName()][] = $projectionEventHandler;
}

$messagingConfiguration->registerServiceDefinition(ProophEventMapper::class, Definition::createFor(ProophEventMapper::class, [Reference::to(EventMapper::class)]));
Expand All @@ -595,6 +579,7 @@ private function registerProjections(ServiceConfiguration $serviceConfiguration,
}

$projectionSetupConfiguration = $projectionSetupConfiguration
->withPolling($projectionRunningConfiguration->isPolling())
->withOptions(
array_merge(
$projectionSetupConfiguration->getProjectionOptions(),
Expand All @@ -609,6 +594,28 @@ private function registerProjections(ServiceConfiguration $serviceConfiguration,
->withInputChannelName($projectionSetupConfiguration->getProjectionInputChannel())
);

/** Router for executing events */
$eventHandlers = $eventHandlersByProjectionName[$projectionSetupConfiguration->getProjectionName()];
$routerMap = new BusRoutingMapBuilder();
foreach ($eventHandlers as $eventHandler) {
$routerMap->addRoutesFromAnnotatedFinding($eventHandler, $interfaceToCallRegistry);
}
foreach ($this->namedEvents as $className => $eventName) {
$routerMap->addObjectAlias($className, $eventName);
}
$messagingConfiguration->registerMessageHandler(
MessageProcessorActivatorBuilder::create()
->withInputChannelName($projectionSetupConfiguration->getActionRouterChannel())
->chain(new Definition(RouterProcessor::class, [
new Definition(BusRouteSelector::class, [
$routerMap->compile(),
new Definition(BusRoutingKeyResolver::class, [ProjectionEventHandler::PROJECTION_EVENT_NAME]),
]),
new Definition(RouteToChannelResolver::class, [new Reference(ChannelResolver::class)]),
false,
]))
);

if ($projectionRunningConfiguration->isPolling()) {
$messagingConfiguration->registerConsumer(
InboundChannelAdapterBuilder::createWithDirectObject(
Expand Down
Loading
Loading