diff --git a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php new file mode 100644 index 000000000..19e6284b3 --- /dev/null +++ b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php @@ -0,0 +1,40 @@ +findAnnotatedClasses(FromAggregateStream::class) as $classname) { + $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); + $aggregateStreamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromAggregateStream::class); + $customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); + + if (! $projectionAttribute || ! $aggregateStreamAttribute) { + continue; + } + + $aggregateClass = $aggregateStreamAttribute->aggregateClass; + $projectionName = $projectionAttribute->name; + + $eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); + if ($eventSourcingAggregateAttribute === null) { + throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class."); + } + + $streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class); + $streamName = $streamAttribute?->getName() ?? $aggregateClass; + + $aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class); + $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; + + $handledProjections[] = $projectionName; + + if ($customScopeStrategyAttribute !== null) { + $extensions[] = new EventStoreAggregateStreamSourceBuilder( + $projectionName, + $aggregateType, + $streamName, + ); + $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamName); + } else { + $extensions[] = new EventStoreGlobalStreamSourceBuilder( + $streamName, + [$projectionName], + ); + } + } + if (! empty($handledProjections)) { $extensions[] = new DbalProjectionStateStorageBuilder($handledProjections); } diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/SynchronousEventDrivenProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/SynchronousEventDrivenProjectionTest.php index 854c59e1e..d6b998b1e 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/SynchronousEventDrivenProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/SynchronousEventDrivenProjectionTest.php @@ -5,6 +5,7 @@ namespace Test\Ecotone\EventSourcing\Projecting\Global; use Doctrine\DBAL\Connection; +use Ecotone\EventSourcing\Attribute\FromAggregateStream; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; @@ -12,6 +13,7 @@ use Ecotone\EventSourcing\EventSourcingConfiguration; use Ecotone\Lite\EcotoneLite; use Ecotone\Lite\Test\FlowTestSupport; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Modelling\Attribute\EventHandler; @@ -32,6 +34,10 @@ use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasRegistered; use Test\Ecotone\EventSourcing\Fixture\Ticket\Ticket; use Test\Ecotone\EventSourcing\Fixture\Ticket\TicketEventConverter; +use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Command\PlaceOrder; +use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Event\OrderWasPlaced; +use Test\Ecotone\EventSourcing\Projecting\App\Ordering\EventsConverter; +use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Order; use Test\Ecotone\EventSourcing\Projecting\ProjectingTestCase; /** @@ -188,6 +194,132 @@ classesToResolve: [$projection::class, Ticket::class, TicketEventConverter::clas self::assertEquals([['ticket_id' => '124', 'ticket_type' => 'info']], $ecotone->sendQueryWithRouting('getInProgressTickets')); } + public function test_building_global_projection_with_aggregate_stream_attribute(): void + { + $projection = $this->createOrderListProjectionWithAggregateStream(); + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [$projection::class, Order::class, EventsConverter::class], + containerOrAvailableServices: [$projection, new EventsConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + self::assertEquals([], $ecotone->sendQueryWithRouting('getOrders')); + + $ecotone->sendCommand(new PlaceOrder('order-1', 'laptop', 2)); + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'], + ], $ecotone->sendQueryWithRouting('getOrders')); + + $ecotone->sendCommand(new PlaceOrder('order-2', 'phone', 1)); + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'], + ['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1'], + ], $ecotone->sendQueryWithRouting('getOrders')); + + // Test reset and catchup + $ecotone->resetProjection($projection::NAME) + ->triggerProjection($projection::NAME); + + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'], + ['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1'], + ], $ecotone->sendQueryWithRouting('getOrders')); + } + + public function test_aggregate_stream_throws_exception_for_non_event_sourcing_aggregate(): void + { + // Create a projection that references a non-EventSourcingAggregate class + $projection = new #[ProjectionV2('invalid_projection'), FromAggregateStream(\stdClass::class)] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage('must be an EventSourcingAggregate'); + + EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [$projection::class], + containerOrAvailableServices: [$projection, self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } + + private function createOrderListProjectionWithAggregateStream(): object + { + $connection = $this->getConnection(); + + return new #[ProjectionV2(self::NAME), FromAggregateStream(Order::class)] class ($connection) { + public const NAME = 'order_list_aggregate_stream'; + + public function __construct(private Connection $connection) + { + } + + #[QueryHandler('getOrders')] + public function getOrders(): array + { + return $this->connection->executeQuery(<<fetchAllAssociative(); + } + + #[EventHandler] + public function addOrder(OrderWasPlaced $event): void + { + $this->connection->executeStatement(<<orderId, $event->product, $event->quantity]); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement(<<connection->executeStatement(<<connection->executeStatement(<<sendQueryWithRouting('getInProgressTickets')); } + public function test_building_partitioned_projection_with_aggregate_stream_attribute(): void + { + $projection = $this->createOrderListProjectionWithAggregateStream(); + + $ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [$projection::class, Order::class, EventsConverter::class], + containerOrAvailableServices: [$projection, new EventsConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + self::assertEquals([], $ecotone->sendQueryWithRouting('getOrdersPartitioned')); + + $ecotone->sendCommand(new PlaceOrder('order-1', 'laptop', 2)); + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2', 'shipped' => '0'], + ], $ecotone->sendQueryWithRouting('getOrdersPartitioned')); + + $ecotone->sendCommand(new PlaceOrder('order-2', 'phone', 1)); + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2', 'shipped' => '0'], + ['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1', 'shipped' => '0'], + ], $ecotone->sendQueryWithRouting('getOrdersPartitioned')); + + $ecotone->sendCommand(new ShipOrder('order-1')); + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2', 'shipped' => '1'], + ['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1', 'shipped' => '0'], + ], $ecotone->sendQueryWithRouting('getOrdersPartitioned')); + + // Test reset and catchup for partitioned projection + $ecotone->resetProjection($projection::NAME) + ->triggerProjection($projection::NAME); + + self::assertEquals([ + ['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2', 'shipped' => '1'], + ['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1', 'shipped' => '0'], + ], $ecotone->sendQueryWithRouting('getOrdersPartitioned')); + } + + private function createOrderListProjectionWithAggregateStream(): object + { + $connection = $this->getConnection(); + + return new #[ProjectionV2(self::NAME), Partitioned, FromAggregateStream(Order::class)] class ($connection) { + public const NAME = 'order_list_partitioned_aggregate_stream'; + + public function __construct(private Connection $connection) + { + } + + #[QueryHandler('getOrdersPartitioned')] + public function getOrders(): array + { + return $this->connection->executeQuery(<<fetchAllAssociative(); + } + + #[EventHandler] + public function addOrder(OrderWasPlaced $event): void + { + $this->connection->executeStatement(<<orderId, $event->product, $event->quantity, 0]); + } + + #[EventHandler] + public function shipOrder(OrderWasShipped $event): void + { + $this->connection->executeStatement(<<orderId]); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement(<<connection->executeStatement(<<connection->executeStatement(<<getConnection();