From a60ce7bcb156be6db35036b316631e8583279db4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 09:46:16 +0100 Subject: [PATCH 01/17] first implementation --- .../src/Attribute/FromStream.php | 47 +++- .../PdoEventSourcing/src/Attribute/Stream.php | 1 - .../src/Config/ProophProjectingModule.php | 31 ++- .../EventStoreMultiStreamSource.php | 260 ++++++++++++++++++ .../EventStoreMultiStreamSourceBuilder.php | 51 ++++ .../Projecting/FromStreamAttributeTest.php | 29 ++ 6 files changed, 407 insertions(+), 12 deletions(-) create mode 100644 packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php create mode 100644 packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php create mode 100644 packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php diff --git a/packages/PdoEventSourcing/src/Attribute/FromStream.php b/packages/PdoEventSourcing/src/Attribute/FromStream.php index 670fa6df0..0ca23f5a1 100644 --- a/packages/PdoEventSourcing/src/Attribute/FromStream.php +++ b/packages/PdoEventSourcing/src/Attribute/FromStream.php @@ -9,14 +9,51 @@ use Attribute; use Ecotone\EventSourcing\EventStore; +use Ecotone\Messaging\Support\Assert; #[Attribute(Attribute::TARGET_CLASS)] class FromStream { - public function __construct( - public readonly string $stream, - public readonly ?string $aggregateType = null, - public readonly string $eventStoreReferenceName = EventStore::class - ) { + /** @var string[] */ + public readonly array $streams; + public readonly ?string $aggregateType; + public readonly string $eventStoreReferenceName; + + /** + * Accepts a single stream name or a list of stream names. + */ + public function __construct(string|array $stream, ?string $aggregateType = null, string $eventStoreReferenceName = EventStore::class) + { + // Keep original parameter name `$stream` for backward compatibility with existing tests/usages + $streams = is_array($stream) ? $stream : [$stream]; + Assert::isTrue(!empty($streams), 'At least one stream name must be provided'); + foreach ($streams as $s) { + Assert::notNullAndEmpty($s, "Stream name can't be empty"); + } + + $this->streams = array_values($streams); + $this->aggregateType = $aggregateType; + $this->eventStoreReferenceName = $eventStoreReferenceName; + } + + /** + * Backward compatibility accessor for single-stream cases. + */ + public function getStream(): string + { + return $this->streams[0]; + } + + /** + * @return string[] + */ + public function getStreams(): array + { + return $this->streams; + } + + public function isMultiStream(): bool + { + return count($this->streams) > 1; } } diff --git a/packages/PdoEventSourcing/src/Attribute/Stream.php b/packages/PdoEventSourcing/src/Attribute/Stream.php index c1d59063d..4b76762fe 100644 --- a/packages/PdoEventSourcing/src/Attribute/Stream.php +++ b/packages/PdoEventSourcing/src/Attribute/Stream.php @@ -16,7 +16,6 @@ class Stream public function __construct(string $name) { Assert::notNullAndEmpty($name, "Stream name can't be empty"); - $this->name = $name; } diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 0c60b9928..91496153a 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -13,6 +13,7 @@ use Ecotone\EventSourcing\Projecting\PartitionState\DbalProjectionStateStorageBuilder; use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreAggregateStreamSourceBuilder; use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder; +use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreMultiStreamSourceBuilder; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotationModule; use Ecotone\Messaging\Config\Configuration; @@ -54,18 +55,36 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $partitionHeaderName = $customScopeStrategyAttribute?->partitionHeaderName; if ($partitionHeaderName !== null) { + // Partitioned projections must target a single stream (aggregate stream) + if ($streamAttribute->isMultiStream()) { + throw ConfigurationException::create("Projection {$projectionName} cannot be partitioned by aggregate id when multiple streams are configured"); + } $aggregateType = $streamAttribute->aggregateType ?: throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); $extensions[] = new EventStoreAggregateStreamSourceBuilder( $projectionName, $aggregateType, - $streamAttribute->stream, + $streamAttribute->getStream(), ); - $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamAttribute->stream); + $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamAttribute->getStream()); } else { - $extensions[] = new EventStoreGlobalStreamSourceBuilder( - $streamAttribute->stream, - [$projectionName], - ); + $streams = $streamAttribute->getStreams(); + if (count($streams) > 1) { + // Multi-stream: build stream->table map using the same hashing as global builder + $map = []; + foreach ($streams as $s) { + $map[$s] = EventStoreGlobalStreamSourceBuilder::getProophTableName($s); + } + $extensions[] = new EventStoreMultiStreamSourceBuilder( + $map, + [$projectionName], + ); + } else { + // Single stream: keep global stream source + $extensions[] = new EventStoreGlobalStreamSourceBuilder( + $streams[0], + [$projectionName], + ); + } } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php new file mode 100644 index 000000000..9f67fe79b --- /dev/null +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -0,0 +1,260 @@ + $streamToTable map of logical stream name => prooph table name + */ + public function __construct( + private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, + private EcotoneClockInterface $clock, + private array $streamToTable, + private int $maxGapOffset = 5_000, + private ?Duration $gapTimeout = null, + ) { + Assert::isTrue(!empty($streamToTable), 'At least one stream must be provided'); + } + + private function getConnection(): Connection + { + if ($this->connectionFactory instanceof MultiTenantConnectionFactory) { + return $this->connectionFactory->getConnection(); + } + + return $this->connectionFactory->createContext()->getDbalConnection(); + } + + public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + Assert::null($partitionKey, 'Partition key is not supported for EventStoreMultiStreamSource'); + + $connection = $this->getConnection(); + + if (empty($lastPosition)) { + // if none of the tables exist yet, return empty + $anyExists = false; + foreach ($this->streamToTable as $table) { + if (SchemaManagerCompatibility::tableExists($connection, $table)) { + $anyExists = true; + break; + } + } + if (! $anyExists) { + return new StreamPage([], ''); + } + } + + $positions = $this->decodePositions($lastPosition); + + $now = $this->clock->now(); + $cutoffTimestamp = $this->gapTimeout ? $now->sub($this->gapTimeout)->getTimestamp() : 0; + + $perStreamRows = []; + $orderIndex = []; + $i = 0; + foreach ($this->streamToTable as $stream => $table) { + $orderIndex[$stream] = $i++; + $tracking = GapAwarePosition::fromString($positions[$stream] ?? null); + + [$gapQueryPart, $gapQueryParams, $gapQueryTypes] = match (($gaps = $tracking->getGaps()) > 0) { + true => ['OR no IN (:gaps)', ['gaps' => $gaps], ['gaps' => \Doctrine\DBAL\ArrayParameterType::INTEGER]], + false => ['', [], []], + }; + + $limit = max((int)ceil($count / max(1, count($this->streamToTable))) + 5, 10); + + $query = $connection->executeQuery(<< :position {$gapQueryPart} + ORDER BY no + LIMIT {$limit} + SQL, [ + 'position' => $tracking->getPosition(), + ...$gapQueryParams, + ], $gapQueryTypes); + + $rows = []; + foreach ($query->iterateAssociative() as $event) { + $rows[] = [ + 'stream' => $stream, + 'no' => (int)$event['no'], + 'event_name' => $event['event_name'], + 'payload' => $event['payload'], + 'metadata' => $event['metadata'], + 'created_at' => $event['created_at'], + 'ts' => $this->getTimestamp($event['created_at']), + ]; + } + + $perStreamRows[$stream] = [ + 'rows' => $rows, + 'tracking' => $tracking, + ]; + } + + // Merge all rows by created_at ASC, tie-break by stream order, then no + $all = []; + foreach ($perStreamRows as $stream => $pack) { + foreach ($pack['rows'] as $row) { + $all[] = $row; + } + } + + usort($all, function (array $a, array $b) use ($orderIndex): int { + if ($a['ts'] === $b['ts']) { + $ai = $orderIndex[$a['stream']] <=> $orderIndex[$b['stream']]; + if ($ai !== 0) { + return $ai; + } + return $a['no'] <=> $b['no']; + } + return $a['ts'] <=> $b['ts']; + }); + + // Take first $count and advance per-stream positions accordingly + $selected = array_slice($all, 0, $count); + + $events = []; + foreach ($selected as $row) { + $events[] = Event::createWithType( + $row['event_name'], + json_decode($row['payload'], true), + json_decode($row['metadata'], true), + ); + + $tracking = $perStreamRows[$row['stream']]['tracking']; + $insertGaps = $row['ts'] > $cutoffTimestamp; + $tracking->advanceTo((int)$row['no'], $insertGaps); + } + + // Cleanup per-stream trackers and encode position + foreach ($perStreamRows as $stream => $pack) { + /** @var GapAwarePosition $tracking */ + $tracking = $pack['tracking']; + $tracking->cleanByMaxOffset($this->maxGapOffset); + $this->cleanGapsByTimeout($tracking, $connection, $this->streamToTable[$stream]); + $positions[$stream] = (string)$tracking; + } + + return new StreamPage($events, $this->encodePositions($positions)); + } + + private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $connection, string $table): void + { + if ($this->gapTimeout === null) { + return; + } + $gaps = $tracking->getGaps(); + if (empty($gaps)) { + return; + } + + $minGap = $gaps[0]; + $maxGap = $gaps[count($gaps) - 1]; + + $interleavedEvents = $connection->executeQuery(<<= :minPosition and no <= :maxPosition + ORDER BY no + LIMIT 100 + SQL, [ + 'minPosition' => $minGap, + 'maxPosition' => $maxGap + 1, + ])->iterateAssociative(); + + $timestampThreshold = $this->clock->now()->sub($this->gapTimeout)->unixTime()->inSeconds(); + + $cutoffPosition = $minGap; + foreach ($interleavedEvents as $event) { + $interleavedEventPosition = $event['no']; + $timestamp = $this->getTimestamp($event['created_at']); + + if ($timestamp > $timestampThreshold) { + break; + } + if (in_array($interleavedEventPosition, $gaps, true)) { + break; + } + if ($timestamp < $timestampThreshold && $interleavedEventPosition > $cutoffPosition) { + $cutoffPosition = $interleavedEventPosition + 1; + } + } + + $tracking->cutoffGapsBelow($cutoffPosition); + } + + private function getTimestamp(string $dateString): int + { + if (strlen($dateString) === 19) { + $dateString = $dateString . '.000'; + } + return DatePoint::createFromFormat( + 'Y-m-d H:i:s.u', + $dateString, + new DateTimeZone('UTC') + )->getTimestamp(); + } + + /** + * Encodes map as: stream=position:g1,g2;stream2=position:...; + */ + private function encodePositions(array $positions): string + { + ksort($positions); + $parts = []; + foreach ($positions as $stream => $pos) { + $parts[] = $stream . '=' . (string)$pos . ';'; + } + return implode('', $parts); + } + + /** + * Decodes the map encoded by encodePositions. + * Returns array + */ + private function decodePositions(?string $position): array + { + $result = []; + if ($position === null || $position === '') { + return $result; + } + $pairs = explode(';', rtrim($position, ';')); + foreach ($pairs as $pair) { + if ($pair === '') { + continue; + } + [$stream, $pos] = explode('=', $pair, 2); + $result[$stream] = $pos; + } + return $result; + } +} diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php new file mode 100644 index 000000000..cb3f39d83 --- /dev/null +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php @@ -0,0 +1,51 @@ + $streamToTable + * @param string[] $handledProjectionNames + */ + public function __construct( + private array $streamToTable, + private array $handledProjectionNames, + private int $maxGapOffset = 5_000, + private ?int $gapTimeoutSeconds = 60, + ) { + } + + public function canHandle(string $projectionName, string $component): bool + { + return $component === StreamSource::class && in_array($projectionName, $this->handledProjectionNames, true); + } + + public function compile(MessagingContainerBuilder $builder): Definition|Reference + { + return new Definition( + EventStoreMultiStreamSource::class, + [ + Reference::to(DbalConnectionFactory::class), + Reference::to(EcotoneClockInterface::class), + $this->streamToTable, + $this->maxGapOffset, + $this->gapTimeoutSeconds !== null ? new Definition(Duration::class, [$this->gapTimeoutSeconds], 'seconds') : null, + ], + ); + } +} diff --git a/packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php b/packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php new file mode 100644 index 000000000..4b2698e00 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php @@ -0,0 +1,29 @@ +getStreams()); + self::assertSame('orders', $attr->getStream()); + self::assertFalse($attr->isMultiStream()); + } + + public function test_it_accepts_multiple_streams(): void + { + $attr = new FromStream(['orders', 'invoices']); + self::assertSame(['orders', 'invoices'], $attr->getStreams()); + self::assertTrue($attr->isMultiStream()); + self::assertSame('orders', $attr->getStream(), 'First stream should be returned by getStream for BC'); + } +} From 5c61bff1ff97d19b62f5b903c8dc1b84efac7d29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 14:27:24 +0100 Subject: [PATCH 02/17] add tests --- .../EventStoreMultiStreamSource.php | 12 ++ .../MultiStreamSynchronousProjectionTest.php | 170 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php index 9f67fe79b..e078f8659 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -83,6 +83,15 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $orderIndex[$stream] = $i++; $tracking = GapAwarePosition::fromString($positions[$stream] ?? null); + // skip querying non-existing tables for this stream + if (!SchemaManagerCompatibility::tableExists($connection, $table)) { + $perStreamRows[$stream] = [ + 'rows' => [], + 'tracking' => $tracking, + ]; + continue; + } + [$gapQueryPart, $gapQueryParams, $gapQueryTypes] = match (($gaps = $tracking->getGaps()) > 0) { true => ['OR no IN (:gaps)', ['gaps' => $gaps], ['gaps' => \Doctrine\DBAL\ArrayParameterType::INTEGER]], false => ['', [], []], @@ -172,6 +181,9 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn if ($this->gapTimeout === null) { return; } + if (!SchemaManagerCompatibility::tableExists($connection, $table)) { + return; + } $gaps = $tracking->getGaps(); if (empty($gaps)) { return; diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php new file mode 100644 index 000000000..23b890572 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php @@ -0,0 +1,170 @@ +createMultiStreamProjection(); + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection]); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + self::assertEquals([], $ecotone->sendQueryWithRouting('getInProgressTickets')); + + // write two events into Ticket stream + $ecotone->sendCommand(new RegisterTicket('101', 'Alice', 'alert')); + $ecotone->sendCommand(new RegisterTicket('102', 'Bob', 'info')); + + // also write a close to verify handler works + $ecotone->sendCommand(new CloseTicket('101')); + + $tickets = $ecotone->sendQueryWithRouting('getInProgressTickets'); + self::assertEquals([ + ['ticket_id' => '102', 'ticket_type' => 'info'], + ], $tickets); + } + + public function test_interleaving_two_streams(): void + { + $projection = $this->createMultiStreamProjection(); + $ecotone = $this->bootstrapEcotone([$projection::class, Basket::class], [$projection]); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + // Interleave events across two streams: Ticket and Basket + // 1) Ticket: register ticket 201 + $ecotone->sendCommand(new RegisterTicket('201', 'A', 'alert')); + // 2) Basket: create basket and add product + $ecotone->sendCommand(new CreateBasket('b-1')); + $ecotone->sendCommand(new AddProduct('b-1', 'Book')); + // 3) Ticket: register ticket 202 and then close 201 + $ecotone->sendCommand(new RegisterTicket('202', 'B', 'info')); + $ecotone->sendCommand(new CloseTicket('201')); + + // We only project Ticket events; ensure result reflects Ticket stream while Basket events coexist in another stream + self::assertEquals([ + ['ticket_id' => '202', 'ticket_type' => 'info'], + ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + } + + private function createMultiStreamProjection(): object + { + $connection = $this->getConnection(); + + // Configure FromStream with multiple streams: Ticket and Basket + return new #[ProjectionV2(self::NAME), FromStream([Ticket::class, Basket::class])] class ($connection) { + public const NAME = 'in_progress_ticket_list_multi_stream'; + + public function __construct(private Connection $connection) + { + } + + #[QueryHandler('getInProgressTickets')] + public function getTickets(): array + { + return $this->connection->executeQuery(<<fetchAllAssociative(); + } + + #[EventHandler] + public function addTicket(TicketWasRegistered $event): void + { + $this->connection->executeStatement(<<getTicketId(), $event->getTicketType()]); + } + + #[EventHandler] + public function closeTicket(TicketWasClosed $event): void + { + $this->connection->executeStatement(<<getTicketId()]); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement(<<connection->executeStatement(<<connection->executeStatement(<<withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + licenceKey: LicenceTesting::VALID_LICENCE, + ); + } +} From 38f56ea96a7f78923086cabca4adc8eecc1a6b29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 14:32:58 +0100 Subject: [PATCH 03/17] add tests --- .../MultiStreamSynchronousProjectionTest.php | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php index 23b890572..c2478f79f 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php @@ -63,6 +63,44 @@ public function test_building_multi_stream_synchronous_projection(): void ], $tickets); } + public function test_reset_and_delete_on_multi_stream_projection(): void + { + $projection = $this->createMultiStreamProjection(); + $ecotone = $this->bootstrapEcotone([$projection::class, Basket::class], [$projection]); + + // init + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + // seed some events across both streams + $ecotone->sendCommand(new RegisterTicket('301', 'C', 'alert')); + $ecotone->sendCommand(new CreateBasket('b-2')); + $ecotone->sendCommand(new AddProduct('b-2', 'Pen')); + $ecotone->sendCommand(new RegisterTicket('302', 'D', 'warning')); + + // verify current state + self::assertEquals([ + ['ticket_id' => '301', 'ticket_type' => 'alert'], + ['ticket_id' => '302', 'ticket_type' => 'warning'], + ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + + // reset and trigger catch up + $ecotone->resetProjection($projection::NAME) + ->triggerProjection($projection::NAME); + + // after reset and catch-up, state should be re-built + self::assertEquals([ + ['ticket_id' => '301', 'ticket_type' => 'alert'], + ['ticket_id' => '302', 'ticket_type' => 'warning'], + ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + + // delete projection table + $ecotone->deleteProjection($projection::NAME); + + // table should be gone + self::assertFalse(self::tableExists($this->getConnection(), 'in_progress_tickets_multi')); + } + public function test_interleaving_two_streams(): void { $projection = $this->createMultiStreamProjection(); @@ -87,6 +125,37 @@ public function test_interleaving_two_streams(): void ], $ecotone->sendQueryWithRouting('getInProgressTickets')); } + public function test_exhaustive_interleaving_and_mutations_across_streams(): void + { + $projection = $this->createMultiStreamProjection(); + $ecotone = $this->bootstrapEcotone([$projection::class, Basket::class], [$projection]); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + // Interleave multiple operations across streams + $ecotone->sendCommand(new RegisterTicket('401', 'Z', 'alert')); // T1 add + $ecotone->sendCommand(new CreateBasket('b-3')); // B create + $ecotone->sendCommand(new RegisterTicket('402', 'Y', 'info')); // T2 add + $ecotone->sendCommand(new AddProduct('b-3', 'Notebook')); // B add + $ecotone->sendCommand(new CloseTicket('401')); // T1 close + $ecotone->sendCommand(new RegisterTicket('403', 'X', 'warning')); // T3 add + + // Expect tickets 402 and 403 in ascending order by id (projection sorts by id ASC on read) + self::assertEquals([ + ['ticket_id' => '402', 'ticket_type' => 'info'], + ['ticket_id' => '403', 'ticket_type' => 'warning'], + ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + + // Reset and rebuild should yield the same + $ecotone->resetProjection($projection::NAME) + ->triggerProjection($projection::NAME); + self::assertEquals([ + ['ticket_id' => '402', 'ticket_type' => 'info'], + ['ticket_id' => '403', 'ticket_type' => 'warning'], + ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + } + private function createMultiStreamProjection(): object { $connection = $this->getConnection(); From 1f2d51629eaf39e04bac44c74ebe3141ad8e94df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 15:00:41 +0100 Subject: [PATCH 04/17] add tests --- .../MultiStreamSynchronousProjectionTest.php | 184 ++++++++---------- 1 file changed, 80 insertions(+), 104 deletions(-) diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php index c2478f79f..3e21253c2 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php @@ -6,10 +6,8 @@ namespace Test\Ecotone\EventSourcing\Projecting\Global; -use Doctrine\DBAL\Connection; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionDelete; -use Ecotone\EventSourcing\Attribute\ProjectionInitialization; use Ecotone\EventSourcing\Attribute\ProjectionReset; use Ecotone\Lite\EcotoneLite; use Ecotone\Lite\Test\FlowTestSupport; @@ -19,18 +17,14 @@ use Ecotone\Modelling\Attribute\QueryHandler; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Test\LicenceTesting; -use PHPUnit\Framework\TestCase; -use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\CloseTicket; -use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\RegisterTicket; -use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasClosed; -use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasRegistered; -use Test\Ecotone\EventSourcing\Fixture\Ticket\Ticket; -use Test\Ecotone\EventSourcing\Fixture\Basket\Basket; -use Test\Ecotone\EventSourcing\Fixture\Basket\Command\CreateBasket; -use Test\Ecotone\EventSourcing\Fixture\Basket\Command\AddProduct; -use Test\Ecotone\EventSourcing\Fixture\Basket\BasketEventConverter; -use Test\Ecotone\EventSourcing\Fixture\Snapshots\BasketMediaTypeConverter; -use Test\Ecotone\EventSourcing\Fixture\Ticket\TicketEventConverter; +use Test\Ecotone\EventSourcing\Fixture\Calendar\CalendarCreated; +use Test\Ecotone\EventSourcing\Fixture\Calendar\CreateCalendar; +use Test\Ecotone\EventSourcing\Fixture\Calendar\EventsConverter; +use Test\Ecotone\EventSourcing\Fixture\Calendar\MeetingCreated; +use Test\Ecotone\EventSourcing\Fixture\Calendar\MeetingScheduled; +use Test\Ecotone\EventSourcing\Fixture\Calendar\MeetingWithEventSourcing; +use Test\Ecotone\EventSourcing\Fixture\Calendar\ScheduleMeetingWithEventSourcing; +use Test\Ecotone\EventSourcing\Fixture\EventSourcingCalendarWithInternalRecorder\CalendarWithInternalRecorder; use Test\Ecotone\EventSourcing\Projecting\ProjectingTestCase; /** @@ -48,41 +42,39 @@ public function test_building_multi_stream_synchronous_projection(): void $ecotone->deleteProjection($projection::NAME) ->initializeProjection($projection::NAME); - self::assertEquals([], $ecotone->sendQueryWithRouting('getInProgressTickets')); + $this->expectException(\Ecotone\Messaging\MessagingException::class); + $ecotone->sendQueryWithRouting('getCalendar', 'cal-build-1'); - // write two events into Ticket stream - $ecotone->sendCommand(new RegisterTicket('101', 'Alice', 'alert')); - $ecotone->sendCommand(new RegisterTicket('102', 'Bob', 'info')); + // create calendar and schedule meeting to drive projection entries + $calendarId = 'cal-build-1'; + $meetingId = 'm-build-1'; + $ecotone->sendCommand(new CreateCalendar($calendarId)); + $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing($calendarId, $meetingId)); - // also write a close to verify handler works - $ecotone->sendCommand(new CloseTicket('101')); - - $tickets = $ecotone->sendQueryWithRouting('getInProgressTickets'); self::assertEquals([ - ['ticket_id' => '102', 'ticket_type' => 'info'], - ], $tickets); + $meetingId => 'created', + ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); } public function test_reset_and_delete_on_multi_stream_projection(): void { $projection = $this->createMultiStreamProjection(); - $ecotone = $this->bootstrapEcotone([$projection::class, Basket::class], [$projection]); + $ecotone = $this->bootstrapEcotone([$projection::class, CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class, EventsConverter::class], [$projection, new EventsConverter()]); // init $ecotone->deleteProjection($projection::NAME) ->initializeProjection($projection::NAME); - // seed some events across both streams - $ecotone->sendCommand(new RegisterTicket('301', 'C', 'alert')); - $ecotone->sendCommand(new CreateBasket('b-2')); - $ecotone->sendCommand(new AddProduct('b-2', 'Pen')); - $ecotone->sendCommand(new RegisterTicket('302', 'D', 'warning')); + // seed some events across multiple streams (Calendar/Meeting) + $calendarId = 'cal-reset-1'; + $meetingId = 'm-reset-1'; + $ecotone->sendCommand(new CreateCalendar($calendarId)); + $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing($calendarId, $meetingId)); // verify current state self::assertEquals([ - ['ticket_id' => '301', 'ticket_type' => 'alert'], - ['ticket_id' => '302', 'ticket_type' => 'warning'], - ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + $meetingId => 'created', + ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); // reset and trigger catch up $ecotone->resetProjection($projection::NAME) @@ -90,133 +82,114 @@ public function test_reset_and_delete_on_multi_stream_projection(): void // after reset and catch-up, state should be re-built self::assertEquals([ - ['ticket_id' => '301', 'ticket_type' => 'alert'], - ['ticket_id' => '302', 'ticket_type' => 'warning'], - ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + $meetingId => 'created', + ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); - // delete projection table + // delete projection (in-memory) $ecotone->deleteProjection($projection::NAME); - - // table should be gone - self::assertFalse(self::tableExists($this->getConnection(), 'in_progress_tickets_multi')); + $this->expectException(\Ecotone\Messaging\MessagingException::class); + $ecotone->sendQueryWithRouting('getCalendar', $calendarId); } public function test_interleaving_two_streams(): void { $projection = $this->createMultiStreamProjection(); - $ecotone = $this->bootstrapEcotone([$projection::class, Basket::class], [$projection]); + $ecotone = $this->bootstrapEcotone([$projection::class, CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class, EventsConverter::class], [$projection, new EventsConverter()]); $ecotone->deleteProjection($projection::NAME) ->initializeProjection($projection::NAME); - // Interleave events across two streams: Ticket and Basket - // 1) Ticket: register ticket 201 - $ecotone->sendCommand(new RegisterTicket('201', 'A', 'alert')); - // 2) Basket: create basket and add product - $ecotone->sendCommand(new CreateBasket('b-1')); - $ecotone->sendCommand(new AddProduct('b-1', 'Book')); - // 3) Ticket: register ticket 202 and then close 201 - $ecotone->sendCommand(new RegisterTicket('202', 'B', 'info')); - $ecotone->sendCommand(new CloseTicket('201')); - - // We only project Ticket events; ensure result reflects Ticket stream while Basket events coexist in another stream + // Interleave events across two streams: Calendar and Meeting aggregates + // 1) Calendar: create calendar and schedule meeting which creates Meeting aggregate (second stream) + $calendarId = 'cal-1'; + $meetingId = 'm-1'; + $ecotone->sendCommand(new CreateCalendar($calendarId)); + $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing($calendarId, $meetingId)); + + // We only react to Calendar/Meeting events self::assertEquals([ - ['ticket_id' => '202', 'ticket_type' => 'info'], - ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + $meetingId => 'created', + ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); } public function test_exhaustive_interleaving_and_mutations_across_streams(): void { $projection = $this->createMultiStreamProjection(); - $ecotone = $this->bootstrapEcotone([$projection::class, Basket::class], [$projection]); + $ecotone = $this->bootstrapEcotone([$projection::class, CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class, EventsConverter::class], [$projection, new EventsConverter()]); $ecotone->deleteProjection($projection::NAME) ->initializeProjection($projection::NAME); // Interleave multiple operations across streams - $ecotone->sendCommand(new RegisterTicket('401', 'Z', 'alert')); // T1 add - $ecotone->sendCommand(new CreateBasket('b-3')); // B create - $ecotone->sendCommand(new RegisterTicket('402', 'Y', 'info')); // T2 add - $ecotone->sendCommand(new AddProduct('b-3', 'Notebook')); // B add - $ecotone->sendCommand(new CloseTicket('401')); // T1 close - $ecotone->sendCommand(new RegisterTicket('403', 'X', 'warning')); // T3 add - - // Expect tickets 402 and 403 in ascending order by id (projection sorts by id ASC on read) + $calendarId = 'cal-2'; + $meetingId = 'm-2'; + $ecotone->sendCommand(new CreateCalendar($calendarId)); // Calendar create + $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing($calendarId, $meetingId)); // Meeting scheduled/created + + // Expect one meeting entry marked as created self::assertEquals([ - ['ticket_id' => '402', 'ticket_type' => 'info'], - ['ticket_id' => '403', 'ticket_type' => 'warning'], - ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + $meetingId => 'created', + ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); // Reset and rebuild should yield the same $ecotone->resetProjection($projection::NAME) ->triggerProjection($projection::NAME); self::assertEquals([ - ['ticket_id' => '402', 'ticket_type' => 'info'], - ['ticket_id' => '403', 'ticket_type' => 'warning'], - ], $ecotone->sendQueryWithRouting('getInProgressTickets')); + $meetingId => 'created', + ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); } private function createMultiStreamProjection(): object { $connection = $this->getConnection(); - // Configure FromStream with multiple streams: Ticket and Basket - return new #[ProjectionV2(self::NAME), FromStream([Ticket::class, Basket::class])] class ($connection) { - public const NAME = 'in_progress_ticket_list_multi_stream'; + // Configure FromStream with multiple streams: Calendar/Meeting aggregates + // Real-world usage: projection reacts to Calendar/Meeting events to generate a read model + return new #[ProjectionV2(self::NAME), FromStream([CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class])] class () { + public const NAME = 'calendar_multi_stream_projection'; - public function __construct(private Connection $connection) - { - } + private array $calendars = []; - #[QueryHandler('getInProgressTickets')] - public function getTickets(): array + #[QueryHandler('getCalendar')] + public function getCalendar(string $calendarId): array { - return $this->connection->executeQuery(<<fetchAllAssociative(); + return $this->calendars[$calendarId] ?? throw new \RuntimeException("Calendar with id {$calendarId} not found"); } #[EventHandler] - public function addTicket(TicketWasRegistered $event): void + public function whenCalendarCreated(CalendarCreated $event): void { - $this->connection->executeStatement(<<getTicketId(), $event->getTicketType()]); + $this->calendars[$event->calendarId] = []; } #[EventHandler] - public function closeTicket(TicketWasClosed $event): void + public function whenMeetingScheduled(MeetingScheduled $event): void { - $this->connection->executeStatement(<<getTicketId()]); + if (! array_key_exists($event->calendarId, $this->calendars)) { + throw new \RuntimeException('Meeting scheduled before calendar was created'); + } + $this->calendars[$event->calendarId][$event->meetingId] = 'scheduled'; } - #[ProjectionInitialization] - public function initialization(): void + #[EventHandler] + public function whenMeetingCreated(MeetingCreated $event): void { - $this->connection->executeStatement(<<calendarId, $this->calendars)) { + throw new \RuntimeException('Meeting created before calendar was created'); + } + $this->calendars[$event->calendarId][$event->meetingId] = 'created'; } #[ProjectionDelete] public function delete(): void { - $this->connection->executeStatement(<<calendars = []; } #[ProjectionReset] public function reset(): void { - $this->connection->executeStatement(<<calendars = []; } }; } @@ -224,8 +197,11 @@ public function reset(): void private function bootstrapEcotone(array $classesToResolve, array $services): FlowTestSupport { return EcotoneLite::bootstrapFlowTestingWithEventStore( - classesToResolve: array_merge($classesToResolve, [Ticket::class, TicketEventConverter::class, Basket::class, BasketEventConverter::class, BasketMediaTypeConverter::class]), - containerOrAvailableServices: array_merge($services, [new TicketEventConverter(), new BasketEventConverter(), new BasketMediaTypeConverter(), self::getConnectionFactory()]), + classesToResolve: array_merge($classesToResolve, [ + CalendarWithInternalRecorder::class, + MeetingWithEventSourcing::class, EventsConverter::class + ]), + containerOrAvailableServices: array_merge($services, [new EventsConverter(), self::getConnectionFactory()]), configuration: ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ ModulePackageList::DBAL_PACKAGE, From 9fdca033245d6538c66018cb6799518525513543 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 15:00:53 +0100 Subject: [PATCH 05/17] simplify --- .../StreamSource/EventStoreMultiStreamSource.php | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php index e078f8659..37d007cb1 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -241,12 +241,11 @@ private function getTimestamp(string $dateString): int */ private function encodePositions(array $positions): string { - ksort($positions); - $parts = []; + $encoded = ''; foreach ($positions as $stream => $pos) { - $parts[] = $stream . '=' . (string)$pos . ';'; + $encoded .= "$stream=$pos;"; } - return implode('', $parts); + return $encoded; } /** @@ -259,7 +258,7 @@ private function decodePositions(?string $position): array if ($position === null || $position === '') { return $result; } - $pairs = explode(';', rtrim($position, ';')); + $pairs = explode(';', $position); foreach ($pairs as $pair) { if ($pair === '') { continue; From ccb39be59237230a9aa5e37d6898937a6531707b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:08:31 +0100 Subject: [PATCH 06/17] refactor --- packages/Ecotone/src/Modelling/Event.php | 2 +- .../src/Projecting/PdoEvent.php | 23 ++ .../EventStoreGlobalStreamSource.php | 10 +- .../EventStoreMultiStreamSource.php | 202 +++--------------- .../MultiStreamSynchronousProjectionTest.php | 4 +- 5 files changed, 59 insertions(+), 182 deletions(-) create mode 100644 packages/PdoEventSourcing/src/Projecting/PdoEvent.php diff --git a/packages/Ecotone/src/Modelling/Event.php b/packages/Ecotone/src/Modelling/Event.php index 19658c528..5b7b0527d 100644 --- a/packages/Ecotone/src/Modelling/Event.php +++ b/packages/Ecotone/src/Modelling/Event.php @@ -9,7 +9,7 @@ */ class Event { - private function __construct( + protected function __construct( private string $eventName, private array|object $payload, private array $metadata diff --git a/packages/PdoEventSourcing/src/Projecting/PdoEvent.php b/packages/PdoEventSourcing/src/Projecting/PdoEvent.php new file mode 100644 index 000000000..e657a4f31 --- /dev/null +++ b/packages/PdoEventSourcing/src/Projecting/PdoEvent.php @@ -0,0 +1,23 @@ +clock->now(); $cutoffTimestamp = $this->gapTimeout ? $now->sub($this->gapTimeout)->getTimestamp() : 0; foreach ($query->iterateAssociative() as $event) { - $events[] = Event::createWithType( + $events[] = $event = new PdoEvent( $event['event_name'], json_decode($event['payload'], true), json_decode($event['metadata'], true), + (int) $event['no'], + $this->getTimestamp($event['created_at']) ); - $timestamp = $this->getTimestamp($event['created_at']); - $insertGaps = $timestamp > $cutoffTimestamp; - $tracking->advanceTo((int) $event['no'], $insertGaps); + $insertGaps = $event->timestamp > $cutoffTimestamp; + $tracking->advanceTo($event->no, $insertGaps); } $tracking->cleanByMaxOffset($this->maxGapOffset); diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php index 37d007cb1..e9b52d24b 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -11,6 +11,7 @@ use Doctrine\DBAL\Connection; use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; +use Ecotone\EventSourcing\Projecting\PdoEvent; use Ecotone\Messaging\Scheduling\DatePoint; use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; @@ -29,211 +30,62 @@ */ class EventStoreMultiStreamSource implements StreamSource { + /** @var array */ + private array $sources; /** * @param array $streamToTable map of logical stream name => prooph table name */ public function __construct( - private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, - private EcotoneClockInterface $clock, + DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, + EcotoneClockInterface $clock, private array $streamToTable, - private int $maxGapOffset = 5_000, - private ?Duration $gapTimeout = null, + int $maxGapOffset = 5_000, + ?Duration $gapTimeout = null, ) { - Assert::isTrue(!empty($streamToTable), 'At least one stream must be provided'); - } - - private function getConnection(): Connection - { - if ($this->connectionFactory instanceof MultiTenantConnectionFactory) { - return $this->connectionFactory->getConnection(); + $this->sources = []; + foreach ($this->streamToTable as $stream => $table) { + $this->sources[$stream] = new EventStoreGlobalStreamSource($connectionFactory, $clock, $table, $maxGapOffset, $gapTimeout); } - - return $this->connectionFactory->createContext()->getDbalConnection(); } public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage { Assert::null($partitionKey, 'Partition key is not supported for EventStoreMultiStreamSource'); - $connection = $this->getConnection(); - - if (empty($lastPosition)) { - // if none of the tables exist yet, return empty - $anyExists = false; - foreach ($this->streamToTable as $table) { - if (SchemaManagerCompatibility::tableExists($connection, $table)) { - $anyExists = true; - break; - } - } - if (! $anyExists) { - return new StreamPage([], ''); - } - } - $positions = $this->decodePositions($lastPosition); - $now = $this->clock->now(); - $cutoffTimestamp = $this->gapTimeout ? $now->sub($this->gapTimeout)->getTimestamp() : 0; - - $perStreamRows = []; $orderIndex = []; $i = 0; - foreach ($this->streamToTable as $stream => $table) { + $newPositions = []; + $all = []; + foreach ($this->sources as $stream => $source) { $orderIndex[$stream] = $i++; - $tracking = GapAwarePosition::fromString($positions[$stream] ?? null); - - // skip querying non-existing tables for this stream - if (!SchemaManagerCompatibility::tableExists($connection, $table)) { - $perStreamRows[$stream] = [ - 'rows' => [], - 'tracking' => $tracking, - ]; - continue; - } - - [$gapQueryPart, $gapQueryParams, $gapQueryTypes] = match (($gaps = $tracking->getGaps()) > 0) { - true => ['OR no IN (:gaps)', ['gaps' => $gaps], ['gaps' => \Doctrine\DBAL\ArrayParameterType::INTEGER]], - false => ['', [], []], - }; $limit = max((int)ceil($count / max(1, count($this->streamToTable))) + 5, 10); - $query = $connection->executeQuery(<< :position {$gapQueryPart} - ORDER BY no - LIMIT {$limit} - SQL, [ - 'position' => $tracking->getPosition(), - ...$gapQueryParams, - ], $gapQueryTypes); + $page = $source->load($positions[$stream] ?? null, $limit, $partitionKey); - $rows = []; - foreach ($query->iterateAssociative() as $event) { - $rows[] = [ - 'stream' => $stream, - 'no' => (int)$event['no'], - 'event_name' => $event['event_name'], - 'payload' => $event['payload'], - 'metadata' => $event['metadata'], - 'created_at' => $event['created_at'], - 'ts' => $this->getTimestamp($event['created_at']), - ]; + $newPositions[$stream] = $page->lastPosition; + foreach ($page->events as $event) { + $all[] = [$stream, $event]; } - - $perStreamRows[$stream] = [ - 'rows' => $rows, - 'tracking' => $tracking, - ]; } - // Merge all rows by created_at ASC, tie-break by stream order, then no - $all = []; - foreach ($perStreamRows as $stream => $pack) { - foreach ($pack['rows'] as $row) { - $all[] = $row; + usort($all, function (array $aTuple, array $bTuple) use ($orderIndex): int { + [$aStream, $a] = $aTuple; + [$bStream, $b] = $bTuple; + if ($aStream === $bStream) { + return $a->no <=> $b->no; } - } - - usort($all, function (array $a, array $b) use ($orderIndex): int { - if ($a['ts'] === $b['ts']) { - $ai = $orderIndex[$a['stream']] <=> $orderIndex[$b['stream']]; - if ($ai !== 0) { - return $ai; - } - return $a['no'] <=> $b['no']; + if ($a->timestamp === $b->timestamp) { + return $orderIndex[$aStream] <=> $orderIndex[$bStream]; } - return $a['ts'] <=> $b['ts']; + return $a->timestamp <=> $b->timestamp; }); - // Take first $count and advance per-stream positions accordingly - $selected = array_slice($all, 0, $count); - - $events = []; - foreach ($selected as $row) { - $events[] = Event::createWithType( - $row['event_name'], - json_decode($row['payload'], true), - json_decode($row['metadata'], true), - ); - - $tracking = $perStreamRows[$row['stream']]['tracking']; - $insertGaps = $row['ts'] > $cutoffTimestamp; - $tracking->advanceTo((int)$row['no'], $insertGaps); - } - - // Cleanup per-stream trackers and encode position - foreach ($perStreamRows as $stream => $pack) { - /** @var GapAwarePosition $tracking */ - $tracking = $pack['tracking']; - $tracking->cleanByMaxOffset($this->maxGapOffset); - $this->cleanGapsByTimeout($tracking, $connection, $this->streamToTable[$stream]); - $positions[$stream] = (string)$tracking; - } - - return new StreamPage($events, $this->encodePositions($positions)); - } - - private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $connection, string $table): void - { - if ($this->gapTimeout === null) { - return; - } - if (!SchemaManagerCompatibility::tableExists($connection, $table)) { - return; - } - $gaps = $tracking->getGaps(); - if (empty($gaps)) { - return; - } - - $minGap = $gaps[0]; - $maxGap = $gaps[count($gaps) - 1]; - - $interleavedEvents = $connection->executeQuery(<<= :minPosition and no <= :maxPosition - ORDER BY no - LIMIT 100 - SQL, [ - 'minPosition' => $minGap, - 'maxPosition' => $maxGap + 1, - ])->iterateAssociative(); + $events = array_map(fn(array $tuple) => $tuple[1], $all); - $timestampThreshold = $this->clock->now()->sub($this->gapTimeout)->unixTime()->inSeconds(); - - $cutoffPosition = $minGap; - foreach ($interleavedEvents as $event) { - $interleavedEventPosition = $event['no']; - $timestamp = $this->getTimestamp($event['created_at']); - - if ($timestamp > $timestampThreshold) { - break; - } - if (in_array($interleavedEventPosition, $gaps, true)) { - break; - } - if ($timestamp < $timestampThreshold && $interleavedEventPosition > $cutoffPosition) { - $cutoffPosition = $interleavedEventPosition + 1; - } - } - - $tracking->cutoffGapsBelow($cutoffPosition); - } - - private function getTimestamp(string $dateString): int - { - if (strlen($dateString) === 19) { - $dateString = $dateString . '.000'; - } - return DatePoint::createFromFormat( - 'Y-m-d H:i:s.u', - $dateString, - new DateTimeZone('UTC') - )->getTimestamp(); + return new StreamPage($events, $this->encodePositions($newPositions)); } /** diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php index 3e21253c2..f4cde9cac 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php @@ -42,7 +42,7 @@ public function test_building_multi_stream_synchronous_projection(): void $ecotone->deleteProjection($projection::NAME) ->initializeProjection($projection::NAME); - $this->expectException(\Ecotone\Messaging\MessagingException::class); + $this->expectException(\RuntimeException::class); $ecotone->sendQueryWithRouting('getCalendar', 'cal-build-1'); // create calendar and schedule meeting to drive projection entries @@ -87,7 +87,7 @@ public function test_reset_and_delete_on_multi_stream_projection(): void // delete projection (in-memory) $ecotone->deleteProjection($projection::NAME); - $this->expectException(\Ecotone\Messaging\MessagingException::class); + $this->expectException(\RuntimeException::class); $ecotone->sendQueryWithRouting('getCalendar', $calendarId); } From 41c511787a5be5201a8db66f14ac788483e22c99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:17:04 +0100 Subject: [PATCH 07/17] repeatable FromStream attribute handling --- .../src/Attribute/FromStream.php | 38 +++---------------- .../src/Config/ProophProjectingModule.php | 35 ++++++++++------- .../MultiStreamSynchronousProjectionTest.php | 2 +- 3 files changed, 28 insertions(+), 47 deletions(-) diff --git a/packages/PdoEventSourcing/src/Attribute/FromStream.php b/packages/PdoEventSourcing/src/Attribute/FromStream.php index 0ca23f5a1..c38566aa2 100644 --- a/packages/PdoEventSourcing/src/Attribute/FromStream.php +++ b/packages/PdoEventSourcing/src/Attribute/FromStream.php @@ -11,49 +11,23 @@ use Ecotone\EventSourcing\EventStore; use Ecotone\Messaging\Support\Assert; -#[Attribute(Attribute::TARGET_CLASS)] +#[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)] class FromStream { - /** @var string[] */ - public readonly array $streams; + public readonly string $stream; public readonly ?string $aggregateType; public readonly string $eventStoreReferenceName; - /** - * Accepts a single stream name or a list of stream names. - */ - public function __construct(string|array $stream, ?string $aggregateType = null, string $eventStoreReferenceName = EventStore::class) + public function __construct(string $stream, ?string $aggregateType = null, string $eventStoreReferenceName = EventStore::class) { - // Keep original parameter name `$stream` for backward compatibility with existing tests/usages - $streams = is_array($stream) ? $stream : [$stream]; - Assert::isTrue(!empty($streams), 'At least one stream name must be provided'); - foreach ($streams as $s) { - Assert::notNullAndEmpty($s, "Stream name can't be empty"); - } - - $this->streams = array_values($streams); + Assert::notNullAndEmpty($stream, "Stream name can't be empty"); + $this->stream = $stream; $this->aggregateType = $aggregateType; $this->eventStoreReferenceName = $eventStoreReferenceName; } - /** - * Backward compatibility accessor for single-stream cases. - */ public function getStream(): string { - return $this->streams[0]; - } - - /** - * @return string[] - */ - public function getStreams(): array - { - return $this->streams; - } - - public function isMultiStream(): bool - { - return count($this->streams) > 1; + return $this->stream; } } diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 91496153a..f4f1d13c6 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -39,12 +39,16 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $handledProjections = []; $extensions = []; - foreach ($annotationRegistrationService->findAnnotatedClasses(FromStream::class) as $classname) { + // Iterate over all projections and gather FromStream attributes per class (supports repeatable usage) + foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); - $streamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromStream::class); $customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); - if (! $projectionAttribute || ! $streamAttribute) { + // collect all FromStream attributes for this class (repeatable) + $classAnnotations = $annotationRegistrationService->getAnnotationsForClass($classname); + $streamAttributes = array_values(array_filter($classAnnotations, static fn($a) => $a instanceof FromStream)); + + if (! $projectionAttribute || empty($streamAttributes)) { continue; } @@ -56,22 +60,26 @@ public static function create(AnnotationFinder $annotationRegistrationService, I if ($partitionHeaderName !== null) { // Partitioned projections must target a single stream (aggregate stream) - if ($streamAttribute->isMultiStream()) { + if (count($streamAttributes) > 1) { throw ConfigurationException::create("Projection {$projectionName} cannot be partitioned by aggregate id when multiple streams are configured"); } - $aggregateType = $streamAttribute->aggregateType ?: throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); + /** @var FromStream $single */ + $single = $streamAttributes[0]; + $aggregateType = $single->aggregateType ?: throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); $extensions[] = new EventStoreAggregateStreamSourceBuilder( $projectionName, $aggregateType, - $streamAttribute->getStream(), + $single->getStream(), ); - $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamAttribute->getStream()); + $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $single->getStream()); } else { - $streams = $streamAttribute->getStreams(); - if (count($streams) > 1) { + // Single or multiple FromStream attributes + if (count($streamAttributes) > 1) { // Multi-stream: build stream->table map using the same hashing as global builder $map = []; - foreach ($streams as $s) { + foreach ($streamAttributes as $attr) { + /** @var FromStream $attr */ + $s = $attr->getStream(); $map[$s] = EventStoreGlobalStreamSourceBuilder::getProophTableName($s); } $extensions[] = new EventStoreMultiStreamSourceBuilder( @@ -80,10 +88,9 @@ public static function create(AnnotationFinder $annotationRegistrationService, I ); } else { // Single stream: keep global stream source - $extensions[] = new EventStoreGlobalStreamSourceBuilder( - $streams[0], - [$projectionName], - ); + /** @var FromStream $single */ + $single = $streamAttributes[0]; + $extensions[] = new EventStoreGlobalStreamSourceBuilder($single->getStream(), [$projectionName]); } } } diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php index f4cde9cac..db6f659d7 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php @@ -145,7 +145,7 @@ private function createMultiStreamProjection(): object // Configure FromStream with multiple streams: Calendar/Meeting aggregates // Real-world usage: projection reacts to Calendar/Meeting events to generate a read model - return new #[ProjectionV2(self::NAME), FromStream([CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class])] class () { + return new #[ProjectionV2(self::NAME), FromStream(CalendarWithInternalRecorder::class), FromStream(MeetingWithEventSourcing::class)] class () { public const NAME = 'calendar_multi_stream_projection'; private array $calendars = []; From e6b5c490196612dfef609efa8841f0a2789c6863 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:27:16 +0100 Subject: [PATCH 08/17] refactor --- .../src/Config/ProophProjectingModule.php | 15 ++++------ .../EventStoreMultiStreamSource.php | 29 ++----------------- .../EventStoreMultiStreamSourceBuilder.php | 16 +++++----- .../Projecting/FromStreamAttributeTest.php | 29 ------------------- 4 files changed, 15 insertions(+), 74 deletions(-) delete mode 100644 packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index f4f1d13c6..20d2b5c72 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -73,24 +73,19 @@ public static function create(AnnotationFinder $annotationRegistrationService, I ); $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $single->getStream()); } else { - // Single or multiple FromStream attributes if (count($streamAttributes) > 1) { - // Multi-stream: build stream->table map using the same hashing as global builder + // Multi-stream: build stream name -> stream source map $map = []; - foreach ($streamAttributes as $attr) { - /** @var FromStream $attr */ - $s = $attr->getStream(); - $map[$s] = EventStoreGlobalStreamSourceBuilder::getProophTableName($s); + foreach ($streamAttributes as $attribute) { + $map[$attribute->getStream()] = new EventStoreGlobalStreamSourceBuilder($attribute->getStream(), []); } $extensions[] = new EventStoreMultiStreamSourceBuilder( $map, [$projectionName], ); } else { - // Single stream: keep global stream source - /** @var FromStream $single */ - $single = $streamAttributes[0]; - $extensions[] = new EventStoreGlobalStreamSourceBuilder($single->getStream(), [$projectionName]); + $attribute = $streamAttributes[0]; + $extensions[] = new EventStoreGlobalStreamSourceBuilder($attribute->getStream(), [$projectionName]); } } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php index e9b52d24b..a8e3e7913 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -7,22 +7,9 @@ namespace Ecotone\EventSourcing\Projecting\StreamSource; -use DateTimeZone; -use Doctrine\DBAL\Connection; -use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility; -use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; -use Ecotone\EventSourcing\Projecting\PdoEvent; -use Ecotone\Messaging\Scheduling\DatePoint; -use Ecotone\Messaging\Scheduling\Duration; -use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Messaging\Support\Assert; -use Ecotone\Modelling\Event; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; -use Enqueue\Dbal\DbalConnectionFactory; -use Enqueue\Dbal\ManagerRegistryConnectionFactory; - -use function strlen; /** * Multi-stream source for Prooph, where each stream has its own table and sequence. @@ -30,22 +17,12 @@ */ class EventStoreMultiStreamSource implements StreamSource { - /** @var array */ - private array $sources; /** - * @param array $streamToTable map of logical stream name => prooph table name + * @param array $sources map of logical stream name => stream source */ public function __construct( - DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, - EcotoneClockInterface $clock, - private array $streamToTable, - int $maxGapOffset = 5_000, - ?Duration $gapTimeout = null, + private array $sources, ) { - $this->sources = []; - foreach ($this->streamToTable as $stream => $table) { - $this->sources[$stream] = new EventStoreGlobalStreamSource($connectionFactory, $clock, $table, $maxGapOffset, $gapTimeout); - } } public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage @@ -61,7 +38,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = foreach ($this->sources as $stream => $source) { $orderIndex[$stream] = $i++; - $limit = max((int)ceil($count / max(1, count($this->streamToTable))) + 5, 10); + $limit = (int)ceil($count / max(1, count($this->sources))) + 5; $page = $source->load($positions[$stream] ?? null, $limit, $partitionKey); diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php index cb3f39d83..57e038e44 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php @@ -19,14 +19,12 @@ class EventStoreMultiStreamSourceBuilder implements ProjectionComponentBuilder { /** - * @param array $streamToTable + * @param array $streamToSourceBuilder * @param string[] $handledProjectionNames */ public function __construct( - private array $streamToTable, + private array $streamToSourceBuilder, private array $handledProjectionNames, - private int $maxGapOffset = 5_000, - private ?int $gapTimeoutSeconds = 60, ) { } @@ -37,14 +35,14 @@ public function canHandle(string $projectionName, string $component): bool public function compile(MessagingContainerBuilder $builder): Definition|Reference { + $sourcesDefinitions = array_map(function ($sourceBuilder) use ($builder) { + return $sourceBuilder->compile($builder); + }, $this->streamToSourceBuilder); + return new Definition( EventStoreMultiStreamSource::class, [ - Reference::to(DbalConnectionFactory::class), - Reference::to(EcotoneClockInterface::class), - $this->streamToTable, - $this->maxGapOffset, - $this->gapTimeoutSeconds !== null ? new Definition(Duration::class, [$this->gapTimeoutSeconds], 'seconds') : null, + $sourcesDefinitions ], ); } diff --git a/packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php b/packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php deleted file mode 100644 index 4b2698e00..000000000 --- a/packages/PdoEventSourcing/tests/Projecting/FromStreamAttributeTest.php +++ /dev/null @@ -1,29 +0,0 @@ -getStreams()); - self::assertSame('orders', $attr->getStream()); - self::assertFalse($attr->isMultiStream()); - } - - public function test_it_accepts_multiple_streams(): void - { - $attr = new FromStream(['orders', 'invoices']); - self::assertSame(['orders', 'invoices'], $attr->getStreams()); - self::assertTrue($attr->isMultiStream()); - self::assertSame('orders', $attr->getStream(), 'First stream should be returned by getStream for BC'); - } -} From b422705143b24852e6f044ac3675f74adb802653 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:38:30 +0100 Subject: [PATCH 09/17] review --- .../src/Attribute/FromStream.php | 23 +++------ .../PdoEventSourcing/src/Attribute/Stream.php | 1 + .../src/Config/ProophProjectingModule.php | 24 ++++----- .../EventStoreMultiStreamSource.php | 6 +-- ...Test.php => MultiStreamProjectionTest.php} | 50 +------------------ 5 files changed, 20 insertions(+), 84 deletions(-) rename packages/PdoEventSourcing/tests/Projecting/Global/{MultiStreamSynchronousProjectionTest.php => MultiStreamProjectionTest.php} (74%) diff --git a/packages/PdoEventSourcing/src/Attribute/FromStream.php b/packages/PdoEventSourcing/src/Attribute/FromStream.php index c38566aa2..9e84235b7 100644 --- a/packages/PdoEventSourcing/src/Attribute/FromStream.php +++ b/packages/PdoEventSourcing/src/Attribute/FromStream.php @@ -9,25 +9,14 @@ use Attribute; use Ecotone\EventSourcing\EventStore; -use Ecotone\Messaging\Support\Assert; #[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)] -class FromStream +readonly class FromStream { - public readonly string $stream; - public readonly ?string $aggregateType; - public readonly string $eventStoreReferenceName; - - public function __construct(string $stream, ?string $aggregateType = null, string $eventStoreReferenceName = EventStore::class) - { - Assert::notNullAndEmpty($stream, "Stream name can't be empty"); - $this->stream = $stream; - $this->aggregateType = $aggregateType; - $this->eventStoreReferenceName = $eventStoreReferenceName; - } - - public function getStream(): string - { - return $this->stream; + public function __construct( + public string $stream, + public ?string $aggregateType = null, + public string $eventStoreReferenceName = EventStore::class + ) { } } diff --git a/packages/PdoEventSourcing/src/Attribute/Stream.php b/packages/PdoEventSourcing/src/Attribute/Stream.php index 4b76762fe..c1d59063d 100644 --- a/packages/PdoEventSourcing/src/Attribute/Stream.php +++ b/packages/PdoEventSourcing/src/Attribute/Stream.php @@ -16,6 +16,7 @@ class Stream public function __construct(string $name) { Assert::notNullAndEmpty($name, "Stream name can't be empty"); + $this->name = $name; } diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 20d2b5c72..5f9960993 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -39,12 +39,10 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $handledProjections = []; $extensions = []; - // Iterate over all projections and gather FromStream attributes per class (supports repeatable usage) foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); $customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); - // collect all FromStream attributes for this class (repeatable) $classAnnotations = $annotationRegistrationService->getAnnotationsForClass($classname); $streamAttributes = array_values(array_filter($classAnnotations, static fn($a) => $a instanceof FromStream)); @@ -55,37 +53,37 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $projectionName = $projectionAttribute->name; $handledProjections[] = $projectionName; - // Determine partitionHeaderName from CustomScopeStrategy attribute $partitionHeaderName = $customScopeStrategyAttribute?->partitionHeaderName; if ($partitionHeaderName !== null) { - // Partitioned projections must target a single stream (aggregate stream) if (count($streamAttributes) > 1) { throw ConfigurationException::create("Projection {$projectionName} cannot be partitioned by aggregate id when multiple streams are configured"); } - /** @var FromStream $single */ - $single = $streamAttributes[0]; - $aggregateType = $single->aggregateType ?: throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); + /** @var FromStream $streamAttribute */ + $streamAttribute = $streamAttributes[0]; + $aggregateType = $streamAttribute->aggregateType ?: throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); $extensions[] = new EventStoreAggregateStreamSourceBuilder( $projectionName, $aggregateType, - $single->getStream(), + $streamAttribute->stream, ); - $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $single->getStream()); + $extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamAttribute->stream); } else { if (count($streamAttributes) > 1) { // Multi-stream: build stream name -> stream source map $map = []; - foreach ($streamAttributes as $attribute) { - $map[$attribute->getStream()] = new EventStoreGlobalStreamSourceBuilder($attribute->getStream(), []); + /** @var FromStream $streamAttribute */ + foreach ($streamAttributes as $streamAttribute) { + $map[$streamAttribute->stream] = new EventStoreGlobalStreamSourceBuilder($streamAttribute->stream, []); } $extensions[] = new EventStoreMultiStreamSourceBuilder( $map, [$projectionName], ); } else { - $attribute = $streamAttributes[0]; - $extensions[] = new EventStoreGlobalStreamSourceBuilder($attribute->getStream(), [$projectionName]); + /** @var FromStream $streamAttribute */ + $streamAttribute = $streamAttributes[0]; + $extensions[] = new EventStoreGlobalStreamSourceBuilder($streamAttribute->stream, [$projectionName]); } } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php index a8e3e7913..ac85f13ee 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -11,10 +11,6 @@ use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; -/** - * Multi-stream source for Prooph, where each stream has its own table and sequence. - * We maintain a GapAwarePosition per stream and interleave by created_at. - */ class EventStoreMultiStreamSource implements StreamSource { /** @@ -79,7 +75,7 @@ private function encodePositions(array $positions): string /** * Decodes the map encoded by encodePositions. - * Returns array + * Returns array key is stream name, value is position (opaque string) */ private function decodePositions(?string $position): array { diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php similarity index 74% rename from packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php rename to packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php index db6f659d7..40d9bcfdf 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamSynchronousProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php @@ -31,7 +31,7 @@ * licence Enterprise * @internal */ -final class MultiStreamSynchronousProjectionTest extends ProjectingTestCase +final class MultiStreamProjectionTest extends ProjectingTestCase { public function test_building_multi_stream_synchronous_projection(): void { @@ -91,54 +91,6 @@ public function test_reset_and_delete_on_multi_stream_projection(): void $ecotone->sendQueryWithRouting('getCalendar', $calendarId); } - public function test_interleaving_two_streams(): void - { - $projection = $this->createMultiStreamProjection(); - $ecotone = $this->bootstrapEcotone([$projection::class, CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class, EventsConverter::class], [$projection, new EventsConverter()]); - - $ecotone->deleteProjection($projection::NAME) - ->initializeProjection($projection::NAME); - - // Interleave events across two streams: Calendar and Meeting aggregates - // 1) Calendar: create calendar and schedule meeting which creates Meeting aggregate (second stream) - $calendarId = 'cal-1'; - $meetingId = 'm-1'; - $ecotone->sendCommand(new CreateCalendar($calendarId)); - $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing($calendarId, $meetingId)); - - // We only react to Calendar/Meeting events - self::assertEquals([ - $meetingId => 'created', - ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); - } - - public function test_exhaustive_interleaving_and_mutations_across_streams(): void - { - $projection = $this->createMultiStreamProjection(); - $ecotone = $this->bootstrapEcotone([$projection::class, CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class, EventsConverter::class], [$projection, new EventsConverter()]); - - $ecotone->deleteProjection($projection::NAME) - ->initializeProjection($projection::NAME); - - // Interleave multiple operations across streams - $calendarId = 'cal-2'; - $meetingId = 'm-2'; - $ecotone->sendCommand(new CreateCalendar($calendarId)); // Calendar create - $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing($calendarId, $meetingId)); // Meeting scheduled/created - - // Expect one meeting entry marked as created - self::assertEquals([ - $meetingId => 'created', - ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); - - // Reset and rebuild should yield the same - $ecotone->resetProjection($projection::NAME) - ->triggerProjection($projection::NAME); - self::assertEquals([ - $meetingId => 'created', - ], $ecotone->sendQueryWithRouting('getCalendar', $calendarId)); - } - private function createMultiStreamProjection(): object { $connection = $this->getConnection(); From 6ad3c79705c20b00a3c29d33392065b39a5e2e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:48:14 +0100 Subject: [PATCH 10/17] rename --- .../src/Projecting/{PdoEvent.php => PersistedProophEvent.php} | 2 +- .../Projecting/StreamSource/EventStoreGlobalStreamSource.php | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename packages/PdoEventSourcing/src/Projecting/{PdoEvent.php => PersistedProophEvent.php} (90%) diff --git a/packages/PdoEventSourcing/src/Projecting/PdoEvent.php b/packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php similarity index 90% rename from packages/PdoEventSourcing/src/Projecting/PdoEvent.php rename to packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php index e657a4f31..e13a3b209 100644 --- a/packages/PdoEventSourcing/src/Projecting/PdoEvent.php +++ b/packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php @@ -8,7 +8,7 @@ use Ecotone\Modelling\Event; -class PdoEvent extends Event +class PersistedProophEvent extends Event { public function __construct( string $eventName, diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index b8ad92a0c..1176df4a6 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -12,7 +12,7 @@ use Doctrine\DBAL\Connection; use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; -use Ecotone\EventSourcing\Projecting\PdoEvent; +use Ecotone\EventSourcing\Projecting\PersistedProophEvent; use Ecotone\Messaging\Scheduling\DatePoint; use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; @@ -77,7 +77,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $now = $this->clock->now(); $cutoffTimestamp = $this->gapTimeout ? $now->sub($this->gapTimeout)->getTimestamp() : 0; foreach ($query->iterateAssociative() as $event) { - $events[] = $event = new PdoEvent( + $events[] = $event = new PersistedProophEvent( $event['event_name'], json_decode($event['payload'], true), json_decode($event['metadata'], true), From a6163845cd64d213b0c36962578874727268f3a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 15:18:03 +0100 Subject: [PATCH 11/17] wip --- .../src/AnnotationFinder/AnnotationFinder.php | 4 +- .../AnnotationFinder/AnnotationResolver.php | 6 +- .../FileSystem/FileSystemAnnotationFinder.php | 17 +++- .../InMemory/InMemoryAnnotationFinder.php | 5 ++ .../src/Attribute/FromAggregateStream.php | 3 + .../src/Config/ProophProjectingModule.php | 79 ++++++++----------- .../EventStoreMultiStreamSource.php | 3 - 7 files changed, 59 insertions(+), 58 deletions(-) diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php index 655ee4a42..125f3cdcb 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php @@ -15,7 +15,7 @@ interface AnnotationFinder extends AnnotationResolver public function findCombined(string $classAnnotationName, string $methodAnnotationClassName): array; /** - * @return string[] + * @return class-string[] */ public function findAnnotatedClasses(string $annotationClassName): array; @@ -25,7 +25,7 @@ public function findAnnotatedClasses(string $annotationClassName): array; public function findAnnotatedMethods(string $methodAnnotationClassName): array; /** - * @template T + * @template T of object * @param class-string $attributeClassName * @return T * @throws InvalidArgumentException diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php b/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php index dc8683bf8..ef2407428 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php @@ -13,9 +13,11 @@ interface AnnotationResolver public function getAnnotationsForMethod(string $className, string $methodName): array; /** - * @return object[] + * @param class-string $className + * @param class-string|null $attributeClassName + * @return list */ - public function getAnnotationsForClass(string $className): array; + public function getAnnotationsForClass(string $className, ?string $attributeClassName): array; /** * @return object[] diff --git a/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php index 735eee10d..82306a2d0 100644 --- a/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php @@ -208,9 +208,7 @@ public function findAnnotatedClasses(string $annotationClassName): array $classesWithAnnotations = []; foreach ($this->registeredClasses as $class) { - $classAnnotation = $this->getAnnotationForClass($class, $annotationClassName); - - if ($classAnnotation) { + if ($this->hasAnnotation($class, $annotationClassName)) { $classesWithAnnotations[] = $class; } } @@ -218,6 +216,17 @@ public function findAnnotatedClasses(string $annotationClassName): array return $classesWithAnnotations; } + private function hasAnnotation(string $className, string $annotationClassNameToFind): bool + { + $annotationsForClass = $this->getAnnotationsForClass($className); + foreach ($annotationsForClass as $annotationForClass) { + if (is_a($annotationForClass, $annotationClassNameToFind)) { + return true; + } + } + return false; + } + private function getAnnotationForClass(string $className, string $annotationClassNameToFind): ?object { $annotationsForClass = $this->getAnnotationsForClass($className); @@ -345,7 +354,7 @@ public function findAnnotatedMethods(string $methodAnnotationClassName): array return $registrations; } - private function isMethodBannedFromCurrentEnvironment(string $className, string $methodName) + private function isMethodBannedFromCurrentEnvironment(string $className, string $methodName): bool { return isset($this->bannedEnvironmentClassMethods[$className][$methodName]); } diff --git a/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php index 2b97710b7..c25a7a056 100644 --- a/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php @@ -313,4 +313,9 @@ private function hasRegisteredAnnotationForProperty(string $className, string $p return false; } + + public function getAttributesForClass(string $className, string $attributeClassName): array + { + // TODO: Implement getAttributesForClass() method. + } } diff --git a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php index 28f98a889..1725b3e71 100644 --- a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php +++ b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php @@ -5,7 +5,10 @@ namespace Ecotone\EventSourcing\Attribute; use Attribute; +use Ecotone\AnnotationFinder\AnnotationFinder; use Ecotone\EventSourcing\EventStore; +use Ecotone\Messaging\Config\ConfigurationException; +use Ecotone\Modelling\Attribute\EventSourcingAggregate; /* * Configures a projection to read from an aggregate's event stream. diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 70006c62d..3d604dd47 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -40,6 +40,9 @@ use Ecotone\Projecting\EventStoreAdapter\EventStreamingChannelAdapter; #[ModuleAnnotation] +/** + * @phpstan-type ProjectionConfiguration array{projectionName: string, streamName: string, aggregateType: ?string, isPartitioned: bool, eventNames: array} + */ class ProophProjectingModule implements AnnotationModule { /** @@ -66,7 +69,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $resolvedConfigs = [ ...self::resolveFromStreamConfigs($annotationRegistrationService, $projectionEventNames), - ...self::resolveFromAggregateStreamConfigs($annotationRegistrationService, $projectionEventNames), + ...self::resolveFromAggregateStream($annotationRegistrationService, $projectionEventNames), ]; foreach ($resolvedConfigs as $config) { @@ -88,7 +91,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I /** * Resolve stream configurations from FromStream attributes. * - * @return array + * @return list */ private static function resolveFromStreamConfigs( AnnotationFinder $annotationRegistrationService, @@ -96,26 +99,30 @@ private static function resolveFromStreamConfigs( ): array { $configs = []; - foreach ($annotationRegistrationService->findAnnotatedClasses(FromStream::class) as $classname) { - $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); - $streamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromStream::class); + foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { + $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); + $aggregateStreamAttributes = $annotationRegistrationService->getAnnotationsForClass($classname, FromAggregateStream::class); + $streamAttributes = [ + ...$annotationRegistrationService->getAnnotationsForClass($classname, FromStream::class), + ...array_map(fn (FromAggregateStream $attribute) => self::resolveFromAggregateStream($annotationRegistrationService, $attribute, $projectionAttribute->name), $aggregateStreamAttributes) + ]; $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); - if (! $projectionAttribute || ! $streamAttribute) { + if (empty($streamAttributes)) { continue; } $projectionName = $projectionAttribute->name; $isPartitioned = $partitionedAttribute !== null; - if ($isPartitioned && ! $streamAttribute->aggregateType) { + if ($isPartitioned && ! $streamAttributes->aggregateType) { throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); } $configs[] = [ 'projectionName' => $projectionName, - 'streamName' => $streamAttribute->stream, - 'aggregateType' => $streamAttribute->aggregateType, + 'streamName' => $streamAttributes->stream, + 'aggregateType' => $streamAttributes->aggregateType, 'isPartitioned' => $isPartitioned, 'eventNames' => $projectionEventNames[$projectionName] ?? [], ]; @@ -126,54 +133,32 @@ private static function resolveFromStreamConfigs( /** * Resolve stream configurations from FromAggregateStream attributes. - * - * @return array */ - private static function resolveFromAggregateStreamConfigs( + private static function resolveFromAggregateStream( AnnotationFinder $annotationRegistrationService, - array $projectionEventNames - ): array { - $configs = []; - - foreach ($annotationRegistrationService->findAnnotatedClasses(FromAggregateStream::class) as $classname) { - $projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class); - $aggregateStreamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromAggregateStream::class); - $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); - - if (! $projectionAttribute || ! $aggregateStreamAttribute) { - continue; - } - - $aggregateClass = $aggregateStreamAttribute->aggregateClass; - $projectionName = $projectionAttribute->name; - - $eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); - if ($eventSourcingAggregateAttribute === null) { - throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class."); - } + FromAggregateStream $aggregateStreamAttribute, + string $projectionName + ): FromStream { + $aggregateClass = $aggregateStreamAttribute->aggregateClass; + + $eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); + if ($eventSourcingAggregateAttribute === null) { + throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class."); + } - $streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class); - $streamName = $streamAttribute?->getName() ?? $aggregateClass; + $streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class); + $streamName = $streamAttribute?->getName() ?? $aggregateClass; - $aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class); - $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; + $aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class); + $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; - $configs[] = [ - 'projectionName' => $projectionName, - 'streamName' => $streamName, - 'aggregateType' => $aggregateType, - 'isPartitioned' => $partitionedAttribute !== null, - 'eventNames' => $projectionEventNames[$projectionName] ?? [], - ]; - } - - return $configs; + return new FromStream($streamName, $aggregateType, $aggregateStreamAttribute->eventStoreReferenceName); } /** * Create stream source extensions based on resolved configuration. * - * @param array{projectionName: string, streamName: string, aggregateType: ?string, isPartitioned: bool, eventNames: array} $config + * @param ProjectionConfiguration $config * @return ProjectionComponentBuilder[] */ private static function createStreamSourceExtensions(array $config): array diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php index ac85f13ee..8dba4edf9 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php @@ -7,7 +7,6 @@ namespace Ecotone\EventSourcing\Projecting\StreamSource; -use Ecotone\Messaging\Support\Assert; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; @@ -23,8 +22,6 @@ public function __construct( public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage { - Assert::null($partitionKey, 'Partition key is not supported for EventStoreMultiStreamSource'); - $positions = $this->decodePositions($lastPosition); $orderIndex = []; From 77f384dd5c82dc8602af33387ea5bfb51538b621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 16:38:51 +0100 Subject: [PATCH 12/17] refactor after merge --- .../AnnotationFinder/AnnotatedDefinition.php | 30 +++++ .../src/AnnotationFinder/AnnotatedFinding.php | 8 ++ .../src/AnnotationFinder/AnnotationFinder.php | 6 +- .../AnnotationFinder/AnnotationResolver.php | 7 +- .../AnnotationResolver/AttributeResolver.php | 5 +- .../FileSystem/FileSystemAnnotationFinder.php | 11 +- .../InMemory/InMemoryAnnotationFinder.php | 10 +- .../src/Config/ProophProjectingModule.php | 107 +++++++----------- .../Global/MultiStreamProjectionTest.php | 2 - 9 files changed, 111 insertions(+), 75 deletions(-) diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php b/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php index 4e7d7840f..e77aa151e 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotatedDefinition.php @@ -9,12 +9,23 @@ /** * licence Apache-2.0 + * + * @template TClassAttribute of object + * @template TMethodAttribute of object + * @implements AnnotatedFinding */ class AnnotatedDefinition implements AnnotatedFinding { private string $className; private string $methodName; + /** + * @var TClassAttribute + */ private object $annotationForClass; + + /** + * @var TMethodAttribute + */ private object $annotationForMethod; /** * @var object[] @@ -25,6 +36,13 @@ class AnnotatedDefinition implements AnnotatedFinding */ private array $classAnnotations; + /** + * @param TClassAttribute $annotationForClass + * @param TMethodAttribute $annotationForMethod + * @param class-string $className + * @param object[] $classAnnotations + * @param object[] $methodAnnotations + */ private function __construct(object $annotationForClass, object $annotationForMethod, string $className, string $methodName, array $classAnnotations, array $methodAnnotations) { $this->annotationForClass = $annotationForClass; @@ -36,6 +54,9 @@ private function __construct(object $annotationForClass, object $annotationForMe } /** + * @param TClassAttribute $annotationForClass + * @param TMethodAttribute $annotationForMethod + * @param class-string $className * @param object[] $classAnnotations * @param object[] $methodAnnotations */ @@ -44,16 +65,25 @@ public static function create(object $annotationForClass, object $annotationForM return new self($annotationForClass, $annotationForMethod, $className, $methodName, $classAnnotations, $methodAnnotations); } + /** + * @return TClassAttribute + */ public function getAnnotationForClass(): object { return $this->annotationForClass; } + /** + * @return TMethodAttribute + */ public function getAnnotationForMethod(): object { return $this->annotationForMethod; } + /** + * @return class-string + */ public function getClassName(): string { return $this->className; diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotatedFinding.php b/packages/Ecotone/src/AnnotationFinder/AnnotatedFinding.php index 9a9adea0e..fb73467a2 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotatedFinding.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotatedFinding.php @@ -6,11 +6,19 @@ /** * licence Apache-2.0 + * + * @template TMethodAttribute of object */ interface AnnotatedFinding { + /** + * @return TMethodAttribute + */ public function getAnnotationForMethod(): object; + /** + * @return class-string + */ public function getClassName(): string; public function getMethodName(): string; diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php index 125f3cdcb..ddfa87bc4 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotationFinder.php @@ -10,7 +10,11 @@ interface AnnotationFinder extends AnnotationResolver { /** - * @return AnnotatedDefinition[] + * @template TClassAttribute of object + * @template TMethodAttribute of object + * @param class-string $classAnnotationName + * @param class-string $methodAnnotationClassName + * @return list> */ public function findCombined(string $classAnnotationName, string $methodAnnotationClassName): array; diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php b/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php index ef2407428..b6bf2b18c 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotationResolver.php @@ -13,11 +13,12 @@ interface AnnotationResolver public function getAnnotationsForMethod(string $className, string $methodName): array; /** + * @template T of object * @param class-string $className - * @param class-string|null $attributeClassName - * @return list + * @param class-string|null $attributeClassName + * @return ($attributeClassName is null ? list : list) */ - public function getAnnotationsForClass(string $className, ?string $attributeClassName): array; + public function getAnnotationsForClass(string $className, ?string $attributeClassName = null): array; /** * @return object[] diff --git a/packages/Ecotone/src/AnnotationFinder/AnnotationResolver/AttributeResolver.php b/packages/Ecotone/src/AnnotationFinder/AnnotationResolver/AttributeResolver.php index 759aee436..ab3e41500 100644 --- a/packages/Ecotone/src/AnnotationFinder/AnnotationResolver/AttributeResolver.php +++ b/packages/Ecotone/src/AnnotationFinder/AnnotationResolver/AttributeResolver.php @@ -46,7 +46,7 @@ public function getAnnotationsForMethod(string $className, string $methodName): /** * @inheritDoc */ - public function getAnnotationsForClass(string $className): array + public function getAnnotationsForClass(string $className, ?string $attributeClassName = null): array { $attributes = []; $currentClass = new ReflectionClass($className); @@ -58,6 +58,9 @@ public function getAnnotationsForClass(string $className): array if (! class_exists($attribute->getName())) { continue; } + if ($attributeClassName && ! ($attribute instanceof $attributeClassName)) { + continue; + } if (in_array($attribute->getName(), array_map(fn ($attr) => $attr::class, $attributes))) { continue; // Avoid duplicate attributes from parent classes } diff --git a/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php index 82306a2d0..fb3721937 100644 --- a/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/FileSystem/FileSystemAnnotationFinder.php @@ -245,11 +245,18 @@ private function getAnnotationForClass(string $className, string $annotationClas } /** + * @param string $className + * @param string|null $attributeClassName * @inheritDoc */ - public function getAnnotationsForClass(string $className): array + public function getAnnotationsForClass(string $className, ?string $attributeClassName = null): array { - return $this->getCachedAnnotationsForClass($className); + $attributes = $this->getCachedAnnotationsForClass($className); + if ($attributeClassName) { + return array_values(array_filter($attributes, fn (object $attribute) => $attribute instanceof $attributeClassName)); + } else { + return $attributes; + } } /** diff --git a/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php index c25a7a056..81dc8718a 100644 --- a/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php @@ -105,13 +105,17 @@ public function getAnnotationsForMethod(string $className, string $methodName): /** * @inheritDoc */ - public function getAnnotationsForClass(string $classNameToFind): array + public function getAnnotationsForClass(string $className, ?string $attributeClassName = null): array { - if (! isset($this->annotationsForClass[self::CLASS_ANNOTATIONS][$classNameToFind])) { + if (! isset($this->annotationsForClass[self::CLASS_ANNOTATIONS][$className])) { return []; } + $attributes = $this->annotationsForClass[self::CLASS_ANNOTATIONS][$className]; + if ($attributeClassName) { + $attributes = array_filter($attributes, fn ($attribute) => $attribute instanceof $attributeClassName); + } - return array_values($this->annotationsForClass[self::CLASS_ANNOTATIONS][$classNameToFind]); + return array_values($attributes); } /** diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 3d604dd47..ce5d059c7 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -20,6 +20,7 @@ use Ecotone\EventSourcing\Projecting\PartitionState\DbalProjectionStateStorageBuilder; use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreAggregateStreamSourceBuilder; use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder; +use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreMultiStreamSourceBuilder; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotationModule; use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver; @@ -40,9 +41,6 @@ use Ecotone\Projecting\EventStoreAdapter\EventStreamingChannelAdapter; #[ModuleAnnotation] -/** - * @phpstan-type ProjectionConfiguration array{projectionName: string, streamName: string, aggregateType: ?string, isPartitioned: bool, eventNames: array} - */ class ProophProjectingModule implements AnnotationModule { /** @@ -57,8 +55,6 @@ public function __construct( public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static { - $extensions = []; - $namedEvents = []; foreach ($annotationRegistrationService->findAnnotatedClasses(NamedEvent::class) as $className) { $attribute = $annotationRegistrationService->getAttributeForClass($className, NamedEvent::class); @@ -67,14 +63,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $projectionEventNames = self::collectProjectionEventNames($annotationRegistrationService, $interfaceToCallRegistry, $namedEvents); - $resolvedConfigs = [ - ...self::resolveFromStreamConfigs($annotationRegistrationService, $projectionEventNames), - ...self::resolveFromAggregateStream($annotationRegistrationService, $projectionEventNames), - ]; - - foreach ($resolvedConfigs as $config) { - $extensions = [...$extensions, ...self::createStreamSourceExtensions($config)]; - } + $extensions = self::resolveConfigs($annotationRegistrationService, $projectionEventNames); $projectionNames = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { @@ -91,20 +80,23 @@ public static function create(AnnotationFinder $annotationRegistrationService, I /** * Resolve stream configurations from FromStream attributes. * - * @return list + * @return list */ - private static function resolveFromStreamConfigs( + private static function resolveConfigs( AnnotationFinder $annotationRegistrationService, - array $projectionEventNames - ): array { - $configs = []; + array $projectionEventNames + ): array + { + $extensions = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); - $aggregateStreamAttributes = $annotationRegistrationService->getAnnotationsForClass($classname, FromAggregateStream::class); $streamAttributes = [ ...$annotationRegistrationService->getAnnotationsForClass($classname, FromStream::class), - ...array_map(fn (FromAggregateStream $attribute) => self::resolveFromAggregateStream($annotationRegistrationService, $attribute, $projectionAttribute->name), $aggregateStreamAttributes) + ...\array_map( + fn(FromAggregateStream $aggregateStreamAttribute) => self::resolveFromAggregateStream($annotationRegistrationService, $aggregateStreamAttribute, $projectionAttribute->name), + $annotationRegistrationService->getAnnotationsForClass($classname, FromAggregateStream::class) + ) ]; $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); @@ -115,20 +107,41 @@ private static function resolveFromStreamConfigs( $projectionName = $projectionAttribute->name; $isPartitioned = $partitionedAttribute !== null; - if ($isPartitioned && ! $streamAttributes->aggregateType) { - throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); + $sources = []; + foreach ($streamAttributes as $streamAttribute) { + if ($isPartitioned && ! $streamAttribute->aggregateType) { + throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); + } + if ($isPartitioned) { + $sources[$streamAttribute->stream.'.'.$streamAttribute->aggregateType] = new EventStoreAggregateStreamSourceBuilder( + $projectionName, + $streamAttribute->aggregateType, + $streamAttribute->stream, + $projectionEventNames[$projectionName] ?? [], + ); + $extensions[] = new AggregateIdPartitionProviderBuilder( + $projectionName, + $streamAttribute->aggregateType, + $streamAttribute->stream, + ); + } else { + $sources[$streamAttribute->stream] = new EventStoreGlobalStreamSourceBuilder( + $streamAttribute->stream, + [$projectionName] + ); + } + } + if (count($sources) > 1) { + $extensions[] = new EventStoreMultiStreamSourceBuilder( + $sources, + [$projectionName], + ); + } else { + $extensions[] = current($sources); } - - $configs[] = [ - 'projectionName' => $projectionName, - 'streamName' => $streamAttributes->stream, - 'aggregateType' => $streamAttributes->aggregateType, - 'isPartitioned' => $isPartitioned, - 'eventNames' => $projectionEventNames[$projectionName] ?? [], - ]; } - return $configs; + return $extensions; } /** @@ -155,38 +168,6 @@ private static function resolveFromAggregateStream( return new FromStream($streamName, $aggregateType, $aggregateStreamAttribute->eventStoreReferenceName); } - /** - * Create stream source extensions based on resolved configuration. - * - * @param ProjectionConfiguration $config - * @return ProjectionComponentBuilder[] - */ - private static function createStreamSourceExtensions(array $config): array - { - if ($config['isPartitioned']) { - return [ - new EventStoreAggregateStreamSourceBuilder( - $config['projectionName'], - $config['aggregateType'], - $config['streamName'], - $config['eventNames'], - ), - new AggregateIdPartitionProviderBuilder( - $config['projectionName'], - $config['aggregateType'], - $config['streamName'] - ), - ]; - } - - return [ - new EventStoreGlobalStreamSourceBuilder( - $config['streamName'], - [$config['projectionName']], - ), - ]; - } - public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { $dbalConfiguration = ExtensionObjectResolver::resolveUnique(DbalConfiguration::class, $extensionObjects, DbalConfiguration::createWithDefaults()); diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php index 40d9bcfdf..34aa1c8f3 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php @@ -93,8 +93,6 @@ public function test_reset_and_delete_on_multi_stream_projection(): void private function createMultiStreamProjection(): object { - $connection = $this->getConnection(); - // Configure FromStream with multiple streams: Calendar/Meeting aggregates // Real-world usage: projection reacts to Calendar/Meeting events to generate a read model return new #[ProjectionV2(self::NAME), FromStream(CalendarWithInternalRecorder::class), FromStream(MeetingWithEventSourcing::class)] class () { From fdbea4854d551cb40311e206de6d1245e9353cf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 16:41:37 +0100 Subject: [PATCH 13/17] rename PersistedProophEvent -> StreamEvent --- .../Projecting/{PersistedProophEvent.php => StreamEvent.php} | 2 +- .../Projecting/StreamSource/EventStoreGlobalStreamSource.php | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename packages/PdoEventSourcing/src/Projecting/{PersistedProophEvent.php => StreamEvent.php} (90%) diff --git a/packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php b/packages/PdoEventSourcing/src/Projecting/StreamEvent.php similarity index 90% rename from packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php rename to packages/PdoEventSourcing/src/Projecting/StreamEvent.php index e13a3b209..6ea1024c5 100644 --- a/packages/PdoEventSourcing/src/Projecting/PersistedProophEvent.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamEvent.php @@ -8,7 +8,7 @@ use Ecotone\Modelling\Event; -class PersistedProophEvent extends Event +class StreamEvent extends Event { public function __construct( string $eventName, diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index b7d15f025..8fed841e1 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -13,7 +13,7 @@ use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; use Ecotone\EventSourcing\PdoStreamTableNameProvider; -use Ecotone\EventSourcing\Projecting\PersistedProophEvent; +use Ecotone\EventSourcing\Projecting\StreamEvent; use Ecotone\Messaging\Scheduling\DatePoint; use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; @@ -82,7 +82,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $now = $this->clock->now(); $cutoffTimestamp = $this->gapTimeout ? $now->sub($this->gapTimeout)->getTimestamp() : 0; foreach ($query->iterateAssociative() as $event) { - $events[] = $event = new PersistedProophEvent( + $events[] = $event = new StreamEvent( $event['event_name'], json_decode($event['payload'], true), json_decode($event['metadata'], true), From e441b5258a22b73418bb8884227da0429ff977b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 16:44:26 +0100 Subject: [PATCH 14/17] tests exception message --- .../tests/Projecting/Global/MultiStreamProjectionTest.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php index 34aa1c8f3..de09f1ba1 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php @@ -43,6 +43,7 @@ public function test_building_multi_stream_synchronous_projection(): void ->initializeProjection($projection::NAME); $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Calendar with id cal-build-1 not found'); $ecotone->sendQueryWithRouting('getCalendar', 'cal-build-1'); // create calendar and schedule meeting to drive projection entries @@ -88,6 +89,7 @@ public function test_reset_and_delete_on_multi_stream_projection(): void // delete projection (in-memory) $ecotone->deleteProjection($projection::NAME); $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Calendar with id cal-reset-1 not found'); $ecotone->sendQueryWithRouting('getCalendar', $calendarId); } From eaece46cd200547f503cbebc86cd8a9d8e934dd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 17:30:38 +0100 Subject: [PATCH 15/17] fix phpstan --- .../AnnotationFinder/InMemory/InMemoryAnnotationFinder.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php b/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php index 81dc8718a..6a723ae42 100644 --- a/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php +++ b/packages/Ecotone/src/AnnotationFinder/InMemory/InMemoryAnnotationFinder.php @@ -317,9 +317,4 @@ private function hasRegisteredAnnotationForProperty(string $className, string $p return false; } - - public function getAttributesForClass(string $className, string $attributeClassName): array - { - // TODO: Implement getAttributesForClass() method. - } } From c966b480bf555e2ce32d75b30a2e99636e67e2e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 17:57:32 +0100 Subject: [PATCH 16/17] add polling test --- .../src/Attribute/FromAggregateStream.php | 2 +- .../src/Config/ProophProjectingModule.php | 7 +- .../EventStoreAggregateStreamSource.php | 8 +- .../Global/MultiStreamProjectionTest.php | 160 ++++++++++++++++++ 4 files changed, 172 insertions(+), 5 deletions(-) diff --git a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php index 1725b3e71..bfc67f101 100644 --- a/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php +++ b/packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php @@ -26,7 +26,7 @@ * * licence Enterprise */ -#[Attribute(Attribute::TARGET_CLASS)] +#[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)] class FromAggregateStream { /** diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index ce5d059c7..c3c319b7d 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -88,6 +88,7 @@ private static function resolveConfigs( ): array { $extensions = []; + $partitionProviders = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); @@ -113,13 +114,14 @@ private static function resolveConfigs( throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); } if ($isPartitioned) { - $sources[$streamAttribute->stream.'.'.$streamAttribute->aggregateType] = new EventStoreAggregateStreamSourceBuilder( + $sourceIdentifier = $streamAttribute->stream.'.'.$streamAttribute->aggregateType; + $sources[$sourceIdentifier] = new EventStoreAggregateStreamSourceBuilder( $projectionName, $streamAttribute->aggregateType, $streamAttribute->stream, $projectionEventNames[$projectionName] ?? [], ); - $extensions[] = new AggregateIdPartitionProviderBuilder( + $partitionProviders[$streamAttribute->stream] ??= new AggregateIdPartitionProviderBuilder( $projectionName, $streamAttribute->aggregateType, $streamAttribute->stream, @@ -139,6 +141,7 @@ private static function resolveConfigs( } else { $extensions[] = current($sources); } + $extensions = [...$extensions, ...array_values($partitionProviders)]; } return $extensions; diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php index 76a1e47d6..35f53d069 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php @@ -34,6 +34,10 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = { Assert::notNull($partitionKey, 'Partition key cannot be null for aggregate stream source'); + if (! $this->eventStore->hasStream($this->streamName)) { + return new StreamPage([], $lastPosition ?? ''); + } + $metadataMatcher = new MetadataMatcher(); if ($this->aggregateType !== null) { // @todo: watch out ! Prooph's event store has an index on (aggregate_type, aggregate_id). Not adding aggregate type here will result in a full table scan @@ -73,11 +77,11 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = return new StreamPage($events, $this->createPositionFrom($lastPosition, $events)); } - private function createPositionFrom(?string $lastPosition, array $events): ?string + private function createPositionFrom(?string $lastPosition, array $events): string { $lastEvent = end($events); if ($lastEvent === false) { - return $lastPosition; + return $lastPosition ?? ''; } return (string) $lastEvent->getMetadata()[MessageHeaders::EVENT_AGGREGATE_VERSION] ?? throw new RuntimeException('Last event does not have aggregate version'); } diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php index de09f1ba1..27576561d 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php @@ -6,6 +6,7 @@ namespace Test\Ecotone\EventSourcing\Projecting\Global; +use Ecotone\EventSourcing\Attribute\FromAggregateStream; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionReset; @@ -13,8 +14,12 @@ use Ecotone\Lite\Test\FlowTestSupport; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\QueryHandler; +use Ecotone\Projecting\Attribute\Partitioned; +use Ecotone\Projecting\Attribute\Polling; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Test\LicenceTesting; use Test\Ecotone\EventSourcing\Fixture\Calendar\CalendarCreated; @@ -93,6 +98,58 @@ public function test_reset_and_delete_on_multi_stream_projection(): void $ecotone->sendQueryWithRouting('getCalendar', $calendarId); } + public function test_building_polling_multi_stream_projection(): void + { + $projection = $this->createPollingMultiStreamProjection(); + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection]); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + // before running polling consumer nothing is projected + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Calendar with id cal-poll-1 not found'); + $ecotone->sendQueryWithRouting('getCalendar', 'cal-poll-1'); + + // seed events + $ecotone->sendCommand(new CreateCalendar('cal-poll-1')); + $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing('cal-poll-1', 'm-poll-1')); + + // run polling endpoint + $ecotone->run($projection::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup()); + + self::assertEquals(['m-poll-1' => 'created'], $ecotone->sendQueryWithRouting('getCalendar', 'cal-poll-1')); + } + + public function test_reset_and_delete_on_polling_multi_stream_projection(): void + { + $projection = $this->createPollingMultiStreamProjection(); + $ecotone = $this->bootstrapEcotone([$projection::class, CalendarWithInternalRecorder::class, MeetingWithEventSourcing::class, EventsConverter::class], [$projection, new EventsConverter()]); + + $ecotone->deleteProjection($projection::NAME) + ->initializeProjection($projection::NAME); + + $ecotone->sendCommand(new CreateCalendar('cal-poll-reset')); + $ecotone->sendCommand(new ScheduleMeetingWithEventSourcing('cal-poll-reset', 'm-poll-reset')); + + // run polling once to build state + $ecotone->run($projection::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup()); + self::assertEquals(['m-poll-reset' => 'created'], $ecotone->sendQueryWithRouting('getCalendar', 'cal-poll-reset')); + + // reset and then run polling again to catch up + $ecotone->resetProjection($projection::NAME); + $ecotone->run($projection::ENDPOINT_ID, ExecutionPollingMetadata::createWithTestingSetup()); + + self::assertEquals(['m-poll-reset' => 'created'], $ecotone->sendQueryWithRouting('getCalendar', 'cal-poll-reset')); + + // delete projection wipes state + $ecotone->deleteProjection($projection::NAME); + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Calendar with id cal-poll-reset not found'); + $ecotone->sendQueryWithRouting('getCalendar', 'cal-poll-reset'); + } + private function createMultiStreamProjection(): object { // Configure FromStream with multiple streams: Calendar/Meeting aggregates @@ -146,6 +203,109 @@ public function reset(): void }; } + private function createPartitionedMultiStreamProjection(): object + { + return new #[ProjectionV2(self::NAME), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromAggregateStream(CalendarWithInternalRecorder::class), FromAggregateStream(MeetingWithEventSourcing::class)] class { + public const NAME = 'calendar_multi_stream_projection_partitioned'; + + private array $calendars = []; + + #[QueryHandler('getCalendar')] + public function getCalendar(string $calendarId): array + { + return $this->calendars[$calendarId] ?? throw new \RuntimeException("Calendar with id {$calendarId} not found"); + } + + #[EventHandler] + public function whenCalendarCreated(CalendarCreated $event): void + { + $this->calendars[$event->calendarId] = []; + } + + #[EventHandler] + public function whenMeetingScheduled(MeetingScheduled $event): void + { + if (! array_key_exists($event->calendarId, $this->calendars)) { + throw new \RuntimeException('Meeting scheduled before calendar was created'); + } + $this->calendars[$event->calendarId][$event->meetingId] = 'scheduled'; + } + + #[EventHandler] + public function whenMeetingCreated(MeetingCreated $event): void + { + if (! array_key_exists($event->calendarId, $this->calendars)) { + throw new \RuntimeException('Meeting created before calendar was created'); + } + $this->calendars[$event->calendarId][$event->meetingId] = 'created'; + } + + #[ProjectionDelete] + public function delete(): void + { + $this->calendars = []; + } + + #[ProjectionReset] + public function reset(): void + { + $this->calendars = []; + } + }; + } + + private function createPollingMultiStreamProjection(): object + { + return new #[ProjectionV2(self::NAME), Polling(self::ENDPOINT_ID), FromStream(CalendarWithInternalRecorder::class), FromStream(MeetingWithEventSourcing::class)] class () { + public const NAME = 'calendar_multi_stream_projection_polling'; + public const ENDPOINT_ID = 'calendar_multi_stream_projection_polling_runner'; + + private array $calendars = []; + + #[QueryHandler('getCalendar')] + public function getCalendar(string $calendarId): array + { + return $this->calendars[$calendarId] ?? throw new \RuntimeException("Calendar with id {$calendarId} not found"); + } + + #[EventHandler(endpointId: 'pollingMultiStream.whenCalendarCreated')] + public function whenCalendarCreated(CalendarCreated $event): void + { + $this->calendars[$event->calendarId] = []; + } + + #[EventHandler(endpointId: 'pollingMultiStream.whenMeetingScheduled')] + public function whenMeetingScheduled(MeetingScheduled $event): void + { + if (! array_key_exists($event->calendarId, $this->calendars)) { + throw new \RuntimeException('Meeting scheduled before calendar was created'); + } + $this->calendars[$event->calendarId][$event->meetingId] = 'scheduled'; + } + + #[EventHandler(endpointId: 'pollingMultiStream.whenMeetingCreated')] + public function whenMeetingCreated(MeetingCreated $event): void + { + if (! array_key_exists($event->calendarId, $this->calendars)) { + throw new \RuntimeException('Meeting created before calendar was created'); + } + $this->calendars[$event->calendarId][$event->meetingId] = 'created'; + } + + #[ProjectionDelete] + public function delete(): void + { + $this->calendars = []; + } + + #[ProjectionReset] + public function reset(): void + { + $this->calendars = []; + } + }; + } + private function bootstrapEcotone(array $classesToResolve, array $services): FlowTestSupport { return EcotoneLite::bootstrapFlowTestingWithEventStore( From c90ce2b7dc0111142afb7fa96243c24b3969a7dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20de=20La=20B=C3=A9doy=C3=A8re?= <1191198+jlabedo@users.noreply.github.com> Date: Mon, 5 Jan 2026 18:03:07 +0100 Subject: [PATCH 17/17] disallow multi-stream partitioned projections --- .../src/Config/ProophProjectingModule.php | 7 +++++++ .../Global/MultiStreamProjectionTest.php | 17 ++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index c3c319b7d..e2c69dfcf 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -108,6 +108,13 @@ private static function resolveConfigs( $projectionName = $projectionAttribute->name; $isPartitioned = $partitionedAttribute !== null; + // @todo: Partitioned projections cannot be declared with multiple streams because the current partition provider cannot merge partitions from multiple streams. + if ($isPartitioned && count($streamAttributes) > 1) { + throw ConfigurationException::create( + "Partitioned projection {$projectionName} cannot declare multiple streams. Use a single aggregate stream or remove #[Partitioned]." + ); + } + $sources = []; foreach ($streamAttributes as $streamAttribute) { if ($isPartitioned && ! $streamAttribute->aggregateType) { diff --git a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php index 27576561d..87a51b55a 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Global/MultiStreamProjectionTest.php @@ -14,6 +14,7 @@ use Ecotone\Lite\Test\FlowTestSupport; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; @@ -51,7 +52,6 @@ public function test_building_multi_stream_synchronous_projection(): void $this->expectExceptionMessage('Calendar with id cal-build-1 not found'); $ecotone->sendQueryWithRouting('getCalendar', 'cal-build-1'); - // create calendar and schedule meeting to drive projection entries $calendarId = 'cal-build-1'; $meetingId = 'm-build-1'; $ecotone->sendCommand(new CreateCalendar($calendarId)); @@ -150,10 +150,21 @@ public function test_reset_and_delete_on_polling_multi_stream_projection(): void $ecotone->sendQueryWithRouting('getCalendar', 'cal-poll-reset'); } + public function test_declaring_partitioned_multi_stream_projection_throws_exception(): void + { + $projection = new #[ProjectionV2(self::NAME), Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), FromStream(CalendarWithInternalRecorder::class), FromStream(MeetingWithEventSourcing::class)] class () { + public const NAME = 'calendar_multi_stream_projection'; + }; + + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage('Partitioned projection calendar_multi_stream_projection cannot declare multiple streams'); + + // Bootstrapping should fail due to invalid configuration + $this->bootstrapEcotone([$projection::class], [$projection]); + } + private function createMultiStreamProjection(): object { - // Configure FromStream with multiple streams: Calendar/Meeting aggregates - // Real-world usage: projection reacts to Calendar/Meeting events to generate a read model return new #[ProjectionV2(self::NAME), FromStream(CalendarWithInternalRecorder::class), FromStream(MeetingWithEventSourcing::class)] class () { public const NAME = 'calendar_multi_stream_projection';