diff --git a/packages/Amqp/tests/AmqpMessagingTestCase.php b/packages/Amqp/tests/AmqpMessagingTestCase.php index 0c6a19958..c23791c5c 100644 --- a/packages/Amqp/tests/AmqpMessagingTestCase.php +++ b/packages/Amqp/tests/AmqpMessagingTestCase.php @@ -4,6 +4,7 @@ use AMQPQueueException; use Ecotone\Amqp\Distribution\AmqpDistributionModule; +use Ecotone\Enqueue\CachedConnectionFactory; use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnection; use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnection; use Interop\Amqp\AmqpConnectionFactory; @@ -75,6 +76,9 @@ public static function getRabbitConnectionFactory(array $config = []): AmqpConne public function setUp(): void { + // Clear cached connection factories to prevent channel mode conflicts between tests + // (e.g., confirm mode vs transaction mode on the same channel) + CachedConnectionFactory::clearInstances(); // Ensure cache directory is writable for tests $this->queueCleanUp(); } diff --git a/packages/Amqp/tests/Integration/SuccessTransactionTest.php b/packages/Amqp/tests/Integration/SuccessTransactionTest.php index 068a02f8e..fbec5beb0 100644 --- a/packages/Amqp/tests/Integration/SuccessTransactionTest.php +++ b/packages/Amqp/tests/Integration/SuccessTransactionTest.php @@ -20,10 +20,6 @@ final class SuccessTransactionTest extends AmqpMessagingTestCase { public function test_order_is_placed_when_transaction_is_successful(): void { - if (getenv('AMQP_IMPLEMENTATION') === 'lib') { - $this->markTestSkipped('Transaction tests require Ext'); - } - $ecotone = $this->bootstrapFlowTesting( containerOrAvailableServices: [new OrderService(), ...$this->getConnectionFactoryReferences()], configuration: ServiceConfiguration::createWithDefaults() diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index a82329335..4d05788bb 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -67,8 +67,6 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO /** @var array> $components [projection name][component name][reference] */ $components = []; foreach ($componentBuilders as $componentBuilder) { - $reference = Uuid::uuid4()->toString(); - $moduleReferenceSearchService->store($reference, $componentBuilder); foreach ($projectionBuilders as $projectionBuilder) { $projectionName = $projectionBuilder->projectionName(); foreach ([StreamSource::class, PartitionProvider::class, ProjectionStateStorage::class] as $component) { @@ -79,6 +77,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO . ' You can only register one component of each type per projection. Please check your configuration.' ); } + + $reference = Uuid::uuid4()->toString(); + $moduleReferenceSearchService->store($reference, $componentBuilder); $components[$projectionName][$component] = new Reference($reference); } } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreAdapterModule.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreAdapterModule.php index 73bb228f2..6396e0bd6 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreAdapterModule.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreAdapterModule.php @@ -72,7 +72,7 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, continue; } - $extensions[] = new EventStoreChannelAdapterProjectionBuilder($extensionObject); + $extensions[] = new EventStoreStreamingChannelAdapterBuilder($extensionObject); } return $extensions; diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjectionBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php similarity index 94% rename from packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjectionBuilder.php rename to packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index e534926ad..97453d961 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjectionBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -18,7 +18,7 @@ * * @internal */ -class EventStoreChannelAdapterProjectionBuilder implements ProjectionExecutorBuilder +class EventStoreStreamingChannelAdapterBuilder implements ProjectionExecutorBuilder { public function __construct( private EventStoreChannelAdapter $channelAdapter diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php index f56f233c8..07275e527 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php @@ -7,6 +7,7 @@ namespace Ecotone\Projecting\InMemory; +use Ecotone\EventSourcing\EventStore\FieldType; use Ecotone\EventSourcing\EventStore\InMemoryEventStore; use Ecotone\EventSourcing\EventStore\MetadataMatcher; use Ecotone\EventSourcing\EventStore\Operator; @@ -16,10 +17,14 @@ class InMemoryEventStoreStreamSource implements StreamSource { + /** + * @param array $eventNames Event names to filter by, empty array means no filtering + */ public function __construct( private InMemoryEventStore $eventStore, private ?string $streamName = null, - private ?string $partitionHeader = null + private ?string $partitionHeader = null, + private array $eventNames = [], ) { } @@ -38,20 +43,26 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = continue; } - $metadataMatcher = null; + $metadataMatcher = new MetadataMatcher(); if ($partitionKey !== null && $this->partitionHeader !== null) { - $metadataMatcher = (new MetadataMatcher()) + $metadataMatcher = $metadataMatcher ->withMetadataMatch($this->partitionHeader, Operator::EQUALS, $partitionKey); } + // Filter by event names if specified (optimization for partitioned projections) + if ($this->eventNames !== []) { + $metadataMatcher = $metadataMatcher + ->withMetadataMatch('event_name', Operator::IN, $this->eventNames, FieldType::MESSAGE_PROPERTY); + } + // Load all events from this stream (starting from position 1) $events = $this->eventStore->load($stream, 1, null, $metadataMatcher); - $allEvents = array_merge($allEvents, is_array($events) ? $events : iterator_to_array($events)); + $allEvents = array_merge($allEvents, \is_array($events) ? $events : iterator_to_array($events)); } // Slice based on global position $events = array_slice($allEvents, $from, $count); - $to = $from + count($events); + $to = $from + \count($events); return new StreamPage($events, (string) $to); } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php index faee8356c..6241715f5 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php @@ -16,17 +16,21 @@ class InMemoryEventStoreStreamSourceBuilder implements ProjectionComponentBuilder { + /** + * @param array $eventNames Event names to filter by, empty array means no filtering + */ public function __construct( private ?array $projectionNames = null, private ?string $streamName = null, - private ?string $partitionHeader = null + private ?string $partitionHeader = null, + private array $eventNames = [], ) { } public function canHandle(string $projectionName, string $component): bool { return $component === StreamSource::class - && ($this->projectionNames === null || in_array($projectionName, $this->projectionNames, true)); + && ($this->projectionNames === null || \in_array($projectionName, $this->projectionNames, true)); } public function compile(MessagingContainerBuilder $builder): Definition|Reference @@ -37,6 +41,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc Reference::to(InMemoryEventStore::class), $this->streamName, $this->partitionHeader, + $this->eventNames, ] ); } diff --git a/packages/Enqueue/src/CachedConnectionFactory.php b/packages/Enqueue/src/CachedConnectionFactory.php index 50c901e06..9368256d2 100644 --- a/packages/Enqueue/src/CachedConnectionFactory.php +++ b/packages/Enqueue/src/CachedConnectionFactory.php @@ -51,6 +51,16 @@ public function reconnect(): void $this->cachedContext = []; } + /** + * Clear all cached connection factory instances. + * This is useful in tests to ensure clean state between test runs, + * especially when switching between different channel modes (confirm vs tx). + */ + public static function clearInstances(): void + { + self::$instances = []; + } + public function getConsumer(Destination $destination): Consumer { return $this->createContext()->createConsumer($destination); diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index d02f263bf..844d73655 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -24,9 +24,13 @@ use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; +use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\EventSourcingAggregate; +use Ecotone\Modelling\Attribute\NamedEvent; +use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionV2; +use Ecotone\Projecting\Config\ProjectionComponentBuilder; use Ecotone\Projecting\EventStoreAdapter\EventStoreChannelAdapter; #[ModuleAnnotation] @@ -42,42 +46,85 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $handledProjections = []; $extensions = []; + $namedEvents = []; + foreach ($annotationRegistrationService->findAnnotatedClasses(NamedEvent::class) as $className) { + $attribute = $annotationRegistrationService->getAttributeForClass($className, NamedEvent::class); + $namedEvents[$className] = $attribute->getName(); + } + + $projectionEventNames = self::collectProjectionEventNames($annotationRegistrationService, $interfaceToCallRegistry, $namedEvents); + + $resolvedConfigs = [ + ...self::resolveFromStreamConfigs($annotationRegistrationService, $projectionEventNames), + ...self::resolveFromAggregateStreamConfigs($annotationRegistrationService, $projectionEventNames), + ]; + + foreach ($resolvedConfigs as $config) { + $handledProjections[] = $config['projectionName']; + $extensions = [...$extensions, ...self::createStreamSourceExtensions($config)]; + } + + if ($handledProjections !== []) { + $extensions[] = new DbalProjectionStateStorageBuilder($handledProjections); + } + + return new self($extensions); + } + + /** + * Resolve stream configurations from FromStream attributes. + * + * @return array + */ + private static function resolveFromStreamConfigs( + AnnotationFinder $annotationRegistrationService, + array $projectionEventNames + ): array { + $configs = []; + foreach ($annotationRegistrationService->findAnnotatedClasses(FromStream::class) as $classname) { $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); $streamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromStream::class); - $customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); + $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); if (! $projectionAttribute || ! $streamAttribute) { continue; } $projectionName = $projectionAttribute->name; - $handledProjections[] = $projectionName; - - // Determine partitionHeaderName from CustomScopeStrategy attribute - $partitionHeaderName = $customScopeStrategyAttribute?->partitionHeaderName; - - if ($partitionHeaderName !== null) { - $aggregateType = $streamAttribute->aggregateType ?: throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); - $extensions[] = new EventStoreAggregateStreamSourceBuilder( - $projectionName, - $aggregateType, - $streamAttribute->stream, - ); - $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamAttribute->stream); - } else { - $extensions[] = new EventStoreGlobalStreamSourceBuilder( - $streamAttribute->stream, - [$projectionName], - ); + $isPartitioned = $partitionedAttribute !== null; + + if ($isPartitioned && ! $streamAttribute->aggregateType) { + throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); } + + $configs[] = [ + 'projectionName' => $projectionName, + 'streamName' => $streamAttribute->stream, + 'aggregateType' => $streamAttribute->aggregateType, + 'isPartitioned' => $isPartitioned, + 'eventNames' => $projectionEventNames[$projectionName] ?? [], + ]; } - // Handle AggregateStream attribute + return $configs; + } + + /** + * Resolve stream configurations from FromAggregateStream attributes. + * + * @return array + */ + private static function resolveFromAggregateStreamConfigs( + AnnotationFinder $annotationRegistrationService, + array $projectionEventNames + ): array { + $configs = []; + foreach ($annotationRegistrationService->findAnnotatedClasses(FromAggregateStream::class) as $classname) { $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); $aggregateStreamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromAggregateStream::class); - $customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); + $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); if (! $projectionAttribute || ! $aggregateStreamAttribute) { continue; @@ -97,28 +144,48 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class); $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; - $handledProjections[] = $projectionName; - - if ($customScopeStrategyAttribute !== null) { - $extensions[] = new EventStoreAggregateStreamSourceBuilder( - $projectionName, - $aggregateType, - $streamName, - ); - $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamName); - } else { - $extensions[] = new EventStoreGlobalStreamSourceBuilder( - $streamName, - [$projectionName], - ); - } + $configs[] = [ + 'projectionName' => $projectionName, + 'streamName' => $streamName, + 'aggregateType' => $aggregateType, + 'isPartitioned' => $partitionedAttribute !== null, + 'eventNames' => $projectionEventNames[$projectionName] ?? [], + ]; } - if (! empty($handledProjections)) { - $extensions[] = new DbalProjectionStateStorageBuilder($handledProjections); + return $configs; + } + + /** + * Create stream source extensions based on resolved configuration. + * + * @param array{projectionName: string, streamName: string, aggregateType: ?string, isPartitioned: bool, eventNames: array} $config + * @return ProjectionComponentBuilder[] + */ + private static function createStreamSourceExtensions(array $config): array + { + if ($config['isPartitioned']) { + return [ + new EventStoreAggregateStreamSourceBuilder( + $config['projectionName'], + $config['aggregateType'], + $config['streamName'], + $config['eventNames'], + ), + new AggregateIdPartitionProviderBuilder( + $config['projectionName'], + $config['aggregateType'], + $config['streamName'] + ), + ]; } - return new self($extensions); + return [ + new EventStoreGlobalStreamSourceBuilder( + $config['streamName'], + [$config['projectionName']], + ), + ]; } public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void @@ -155,4 +222,69 @@ public function getModulePackageName(): string { return ModulePackageList::EVENT_SOURCING_PACKAGE; } + + /** + * Collect event names for each partitioned projection. + * Returns empty array for projections that use catch-all patterns or object types. + * + * @param array $namedEvents Map of class name to named event name + * @return array> Map of projection name to event names (empty array means no filtering) + */ + private static function collectProjectionEventNames( + AnnotationFinder $annotationRegistrationService, + InterfaceToCallRegistry $interfaceToCallRegistry, + array $namedEvents + ): array { + $projectionEventNames = []; + $disabledFiltering = []; + $routingMapBuilder = new BusRoutingMapBuilder(); + + foreach ($annotationRegistrationService->findCombined(ProjectionV2::class, EventHandler::class) as $projectionEventHandler) { + /** @var ProjectionV2 $projectionAttribute */ + $projectionAttribute = $projectionEventHandler->getAnnotationForClass(); + $projectionName = $projectionAttribute->name; + + if (! isset($projectionEventNames[$projectionName])) { + $projectionEventNames[$projectionName] = []; + } + + if (isset($disabledFiltering[$projectionName])) { + continue; + } + + $routes = $routingMapBuilder->getRoutesFromAnnotatedFinding($projectionEventHandler, $interfaceToCallRegistry); + foreach ($routes as $route) { + // Check for catch-all pattern - disable filtering by keeping empty array + if ($route === '*' || $route === 'object') { + $projectionEventNames[$projectionName] = []; + $disabledFiltering[$projectionName] = true; + break; + } + + // Check for glob patterns (containing * but not exactly *) + if (str_contains($route, '*')) { + throw ConfigurationException::create( + "Projection {$projectionName} uses glob pattern '{$route}' which is not allowed. " . + "For query optimization, event handlers must use explicit event names. Use union type parameters instead." + ); + } + + // Check if route is a class with NamedEvent annotation + if (class_exists($route) && isset($namedEvents[$route])) { + $projectionEventNames[$projectionName][] = $namedEvents[$route]; + } else { + $projectionEventNames[$projectionName][] = $route; + } + } + } + + // Deduplicate event names (skip disabled ones which are empty arrays) + foreach ($projectionEventNames as $projectionName => $eventNames) { + if (! isset($disabledFiltering[$projectionName]) && $eventNames !== []) { + $projectionEventNames[$projectionName] = array_values(array_unique($eventNames)); + } + } + + return $projectionEventNames; + } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php index d29fcff35..76a1e47d6 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php @@ -8,6 +8,7 @@ namespace Ecotone\EventSourcing\Projecting\StreamSource; use Ecotone\EventSourcing\EventStore; +use Ecotone\EventSourcing\EventStore\FieldType; use Ecotone\EventSourcing\EventStore\MetadataMatcher; use Ecotone\EventSourcing\EventStore\Operator; use Ecotone\Messaging\MessageHeaders; @@ -18,10 +19,14 @@ class EventStoreAggregateStreamSource implements StreamSource { + /** + * @param array $eventNames Event names to filter by, empty array means no filtering + */ public function __construct( private EventStore $eventStore, private string $streamName, private ?string $aggregateType, + private array $eventNames = [], ) { } @@ -49,6 +54,15 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = (int)$lastPosition + 1 ); + if ($this->eventNames !== []) { + $metadataMatcher = $metadataMatcher->withMetadataMatch( + 'event_name', + Operator::IN, + $this->eventNames, + FieldType::MESSAGE_PROPERTY + ); + } + $events = $this->eventStore->load( $this->streamName, 1, diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php index 69ff5d48c..7b6909bcf 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php @@ -16,8 +16,15 @@ class EventStoreAggregateStreamSourceBuilder implements ProjectionComponentBuilder { - public function __construct(public readonly string $handledProjectionName, public ?string $aggregateType, private string $streamName) - { + /** + * @param array $eventNames Event names to filter, empty array means no filtering + */ + public function __construct( + public readonly string $handledProjectionName, + public ?string $aggregateType, + private string $streamName, + private array $eventNames = [], + ) { } public function canHandle(string $projectionName, string $component): bool @@ -33,6 +40,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc new Reference(EventStore::class), $this->streamName, $this->aggregateType, + $this->eventNames, ], ); } diff --git a/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/Converters.php b/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/Converters.php new file mode 100644 index 000000000..ce28212aa --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/Converters.php @@ -0,0 +1,38 @@ + $event->id, 'type' => 'first']; + } + + #[Converter] + public function convertArrayToFirstEvent(array $data): FirstEvent + { + return new FirstEvent($data['id']); + } + + #[Converter] + public function convertSecondEventToArray(SecondEvent $event): array + { + return ['id' => $event->id, 'type' => 'second']; + } + + #[Converter] + public function convertArrayToSecondEvent(array $data): SecondEvent + { + return new SecondEvent($data['id']); + } +} + diff --git a/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/FirstEvent.php b/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/FirstEvent.php new file mode 100644 index 000000000..ae56b322d --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/FirstEvent.php @@ -0,0 +1,21 @@ +id = $event->id; + } + + #[EventSourcingHandler] + public function onSecondEvent(SecondEvent $event): void + { + // No-op, just for event sourcing + } +} + diff --git a/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/SecondEvent.php b/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/SecondEvent.php new file mode 100644 index 000000000..cfd691a3a --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/EventNameFiltering/SecondEvent.php @@ -0,0 +1,21 @@ +expectException(ConfigurationException::class); + $this->expectExceptionMessage("glob pattern 'test.*' which is not allowed"); + $projection = $this->getProjectionWithRegexRouting(); - $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + EcotoneLite::bootstrapFlowTestingWithEventStore( classesToResolve: [get_class($projection), AnAggregate::class, AnEvent::class, Converters::class], containerOrAvailableServices: [$projection, new Converters(), DbalConnectionFactory::class => $this->getConnectionFactory()], configuration: ServiceConfiguration::createWithDefaults() @@ -63,16 +67,6 @@ classesToResolve: [get_class($projection), AnAggregate::class, AnEvent::class, C runForProductionEventStore: true, licenceKey: LicenceTesting::VALID_LICENCE, ); - - $ecotone->sendCommandWithRoutingKey('create', '123'); - - self::assertEquals( - [ - ['id' => '123'], - ], - $projection->events, - 'Projection should receive named event with regex routing' - ); } public function test_projection_with_multiple_handlers_for_different_events(): void diff --git a/packages/PdoEventSourcing/tests/Projecting/Partitioned/EventNameFilteringTest.php b/packages/PdoEventSourcing/tests/Projecting/Partitioned/EventNameFilteringTest.php new file mode 100644 index 000000000..9fc43bdb1 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/Partitioned/EventNameFilteringTest.php @@ -0,0 +1,246 @@ +getProjectionHandlingAllEvents(); + + $ecotone = $this->bootstrapEcotone([$projection]); + + $ecotone->sendCommandWithRoutingKey('createMultiEvent', '123'); + + self::assertEquals( + ['first_event:123', 'second_event:123'], + $projection->events, + 'Projection with explicit handlers for all events should receive all events' + ); + } + + public function test_partitioned_projection_filters_to_only_handled_events(): void + { + $projection = $this->getProjectionHandlingOnlyOneEvent(); + + $ecotone = $this->bootstrapEcotone([$projection]); + + $ecotone->sendCommandWithRoutingKey('createMultiEvent', '123'); + + self::assertEquals( + ['first_event:123'], + $projection->events, + 'Projection handling only one event type should only receive that event type' + ); + } + + public function test_partitioned_projection_with_catch_all_pattern(): void + { + $projection = $this->getProjectionWithCatchAllPattern(); + + $ecotone = $this->bootstrapEcotone([$projection]); + + $ecotone->sendCommandWithRoutingKey('createMultiEvent', '123'); + + self::assertEquals( + [['id' => '123', 'type' => 'first'], ['id' => '123', 'type' => 'second']], + $projection->events, + 'Projection with "*" pattern should receive all events' + ); + } + + public function test_partitioned_projection_with_object_type_receives_all_events(): void + { + $projection = $this->getProjectionWithObjectType(); + + $ecotone = $this->bootstrapEcotone([$projection]); + + $ecotone->sendCommandWithRoutingKey('createMultiEvent', '123'); + + self::assertCount( + 2, + $projection->events, + 'Projection with object type parameter should receive all events' + ); + } + + public function test_partitioned_projection_resolves_event_name_from_named_event_attribute(): void + { + $projection = $this->getProjectionWithClassTypeResolvingToNamedEvent(); + + $ecotone = $this->bootstrapEcotone([$projection]); + + $ecotone->sendCommandWithRoutingKey('createMultiEvent', '123'); + + self::assertEquals( + ['first_event:123'], + $projection->events, + 'Projection with class type should resolve event name from #[NamedEvent] attribute' + ); + } + + public function test_partitioned_projection_with_union_type_for_multiple_events(): void + { + $projection = $this->getProjectionWithUnionType(); + + $ecotone = $this->bootstrapEcotone([$projection]); + + $ecotone->sendCommandWithRoutingKey('createMultiEvent', '123'); + + self::assertEquals( + ['union:123', 'union:123'], + $projection->events, + 'Projection with union type should receive all specified event types' + ); + } + + public function test_partitioned_projection_with_glob_pattern_throws_exception(): void + { + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage("glob pattern 'order.*' which is not allowed"); + + $projection = $this->getProjectionWithGlobPattern(); + + $this->bootstrapEcotone([$projection]); + } + + private function bootstrapEcotone(array $projections) + { + $classes = array_map(fn ($p) => get_class($p), $projections); + + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: array_merge($classes, [MultiEventAggregate::class, FirstEvent::class, SecondEvent::class, Converters::class]), + containerOrAvailableServices: array_merge($projections, [new Converters(), DbalConnectionFactory::class => $this->getConnectionFactory()]), + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])), + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + private function getProjectionHandlingAllEvents(): object + { + return new #[ProjectionV2('projection_all_events'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler] + public function onFirstEvent(FirstEvent $event): void + { + $this->events[] = 'first_event:' . $event->id; + } + + #[EventHandler] + public function onSecondEvent(SecondEvent $event): void + { + $this->events[] = 'second_event:' . $event->id; + } + }; + } + + private function getProjectionHandlingOnlyOneEvent(): object + { + return new #[ProjectionV2('projection_one_event'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler] + public function onFirstEvent(FirstEvent $event): void + { + $this->events[] = 'first_event:' . $event->id; + } + }; + } + + private function getProjectionWithCatchAllPattern(): object + { + return new #[ProjectionV2('projection_catch_all'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler('*')] + public function onAnyEvent(array $event): void + { + $this->events[] = $event; + } + }; + } + + private function getProjectionWithObjectType(): object + { + return new #[ProjectionV2('projection_object_type'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler] + public function onAnyEvent(object $event): void + { + $this->events[] = $event; + } + }; + } + + private function getProjectionWithClassTypeResolvingToNamedEvent(): object + { + return new #[ProjectionV2('projection_class_type'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler] + public function onFirstEvent(FirstEvent $event): void + { + $this->events[] = 'first_event:' . $event->id; + } + }; + } + + private function getProjectionWithUnionType(): object + { + return new #[ProjectionV2('projection_union_type'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler] + public function onEvent(FirstEvent|SecondEvent $event): void + { + $this->events[] = 'union:' . $event->id; + } + }; + } + + private function getProjectionWithGlobPattern(): object + { + return new #[ProjectionV2('projection_glob_pattern'), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(stream: MultiEventAggregate::STREAM_NAME, aggregateType: MultiEventAggregate::class)] class { + public array $events = []; + + #[EventHandler('order.*')] + public function onOrderEvent(array $event): void + { + $this->events[] = $event; + } + }; + } +} diff --git a/packages/PdoEventSourcing/tests/Projecting/Partitioned/ProjectionHandlersExecutionRoutingTest.php b/packages/PdoEventSourcing/tests/Projecting/Partitioned/ProjectionHandlersExecutionRoutingTest.php index a722612e0..d4cef5556 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Partitioned/ProjectionHandlersExecutionRoutingTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Partitioned/ProjectionHandlersExecutionRoutingTest.php @@ -13,6 +13,7 @@ use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Test\LicenceTesting; @@ -53,11 +54,14 @@ classesToResolve: [get_class($projection), AnAggregate::class, AnEvent::class, C ); } - public function test_partitioned_projection_with_regex_routing(): void + public function test_partitioned_projection_with_glob_pattern_throws_exception(): void { + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage("glob pattern 'test.*' which is not allowed"); + $projection = $this->getProjectionWithRegexRouting(); - $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + EcotoneLite::bootstrapFlowTestingWithEventStore( classesToResolve: [get_class($projection), AnAggregate::class, AnEvent::class, Converters::class], containerOrAvailableServices: [$projection, new Converters(), DbalConnectionFactory::class => $this->getConnectionFactory()], configuration: ServiceConfiguration::createWithDefaults() @@ -65,16 +69,6 @@ classesToResolve: [get_class($projection), AnAggregate::class, AnEvent::class, C runForProductionEventStore: true, licenceKey: LicenceTesting::VALID_LICENCE, ); - - $ecotone->sendCommandWithRoutingKey('create', '123'); - - self::assertEquals( - [ - ['id' => '123'], - ], - $projection->events, - 'Partitioned projection should receive named event with regex routing' - ); } public function test_partitioned_projection_with_multiple_handlers_for_different_events(): void