diff --git a/packages/PdoEventSourcing/src/Config/EventSourcingModule.php b/packages/PdoEventSourcing/src/Config/EventSourcingModule.php index 20d14743c..1949cef92 100644 --- a/packages/PdoEventSourcing/src/Config/EventSourcingModule.php +++ b/packages/PdoEventSourcing/src/Config/EventSourcingModule.php @@ -25,6 +25,7 @@ use Ecotone\EventSourcing\EventSourcingRepositoryBuilder; use Ecotone\EventSourcing\EventStore; use Ecotone\EventSourcing\EventStreamEmitter; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\EventSourcing\Mapping\EventMapper; use Ecotone\EventSourcing\ProjectionLifeCycleConfiguration; use Ecotone\EventSourcing\ProjectionManager; @@ -286,6 +287,12 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO Reference::to(LegacyProjectionsTableManager::class), ])); + // Register PdoStreamTableNameProvider as an alias to LazyProophEventStore + $messagingConfiguration->registerServiceDefinition( + PdoStreamTableNameProvider::class, + Reference::to(LazyProophEventStore::class) + ); + $messagingConfiguration->registerServiceDefinition( LazyProophProjectionManager::class, new Definition(LazyProophProjectionManager::class, [ diff --git a/packages/PdoEventSourcing/src/PdoStreamTableNameProvider.php b/packages/PdoEventSourcing/src/PdoStreamTableNameProvider.php new file mode 100644 index 000000000..c9ff6195b --- /dev/null +++ b/packages/PdoEventSourcing/src/PdoStreamTableNameProvider.php @@ -0,0 +1,26 @@ +streamTable = '_' . sha1($this->streamName); } public function partitions(): iterable @@ -35,19 +32,22 @@ public function partitions(): iterable $connection = $this->getConnection(); $platform = $connection->getDatabasePlatform(); + // Resolve table name at runtime using the provider + $streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + // Build platform-specific query if ($platform instanceof PostgreSQLPlatform) { // PostgreSQL: Use JSONB operators $query = $connection->executeQuery(<<>'_aggregate_id' AS aggregate_id - FROM {$this->streamTable} + FROM {$streamTable} WHERE metadata->>'_aggregate_type' = ? SQL, [$this->aggregateType]); } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { // MySQL/MariaDB: Use generated indexed columns for better performance $query = $connection->executeQuery(<<streamTable} + FROM {$streamTable} WHERE aggregate_type = ? SQL, [$this->aggregateType]); } else { diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php index 6787acaaf..8223da7ed 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php @@ -7,6 +7,7 @@ namespace Ecotone\EventSourcing\Projecting; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; @@ -31,6 +32,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc Reference::to(DbalConnectionFactory::class), $this->aggregateType, $this->streamName, + Reference::to(PdoStreamTableNameProvider::class), ]); } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index d11561753..657391661 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -12,6 +12,7 @@ use Doctrine\DBAL\Connection; use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility; use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\Messaging\Scheduling\DatePoint; use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; @@ -29,7 +30,8 @@ class EventStoreGlobalStreamSource implements StreamSource public function __construct( private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, private EcotoneClockInterface $clock, - private string $proophStreamTable, + private string $streamName, + private PdoStreamTableNameProvider $tableNameProvider, private int $maxGapOffset = 5_000, private ?Duration $gapTimeout = null, ) { @@ -50,7 +52,10 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $connection = $this->getConnection(); - if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $this->proophStreamTable)) { + // Resolve table name at runtime using the provider + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + + if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $proophStreamTable)) { return new StreamPage([], ''); } @@ -63,7 +68,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $query = $connection->executeQuery(<<proophStreamTable} + FROM {$proophStreamTable} WHERE no > :position {$gapQueryPart} ORDER BY no LIMIT {$count} @@ -106,10 +111,13 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn $minGap = $gaps[0]; $maxGap = $gaps[count($gaps) - 1]; + // Resolve table name at runtime + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + // Query interleaved events in the gap range $interleavedEvents = $connection->executeQuery(<<proophStreamTable} + FROM {$proophStreamTable} WHERE no >= :minPosition and no <= :maxPosition ORDER BY no LIMIT 100 diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php index ccf55702c..39975353d 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php @@ -7,6 +7,7 @@ namespace Ecotone\EventSourcing\Projecting\StreamSource; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; @@ -16,8 +17,6 @@ use Ecotone\Projecting\StreamSource; use Enqueue\Dbal\DbalConnectionFactory; -use function sha1; - class EventStoreGlobalStreamSourceBuilder implements ProjectionComponentBuilder { public function __construct( @@ -38,15 +37,11 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc [ Reference::to(DbalConnectionFactory::class), Reference::to(EcotoneClockInterface::class), - self::getProophTableName($this->streamName), + $this->streamName, + Reference::to(PdoStreamTableNameProvider::class), 5_000, new Definition(Duration::class, [60], 'seconds'), ], ); } - - public static function getProophTableName($streamName): string - { - return '_' . sha1($streamName); - } } diff --git a/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php b/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php index f1ff36ce0..7b5c0a79b 100644 --- a/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php +++ b/packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php @@ -10,6 +10,7 @@ use Ecotone\EventSourcing\Database\LegacyProjectionsTableManager; use Ecotone\EventSourcing\EventSourcingConfiguration; use Ecotone\EventSourcing\InMemory\StreamIteratorWithPosition; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMariaDbSimpleStreamStrategy; use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMysqlSimpleStreamStrategy; use Ecotone\EventSourcing\ProophEventMapper; @@ -35,6 +36,7 @@ use Prooph\EventStore\StreamName; use RuntimeException; +use function sha1; use function spl_object_id; use function str_contains; @@ -43,7 +45,7 @@ /** * licence Apache-2.0 */ -class LazyProophEventStore implements EventStore +class LazyProophEventStore implements EventStore, PdoStreamTableNameProvider { public const DEFAULT_ENABLE_WRITE_LOCK_STRATEGY = false; public const INITIALIZE_ON_STARTUP = true; @@ -305,6 +307,26 @@ private function getPostgresPersistenceStrategyFor(?StreamName $streamName = nul }; } + public function generateTableNameForStream(string $streamName): string + { + $streamNameObj = new StreamName($streamName); + $eventStoreType = $this->getEventStoreType(); + + if ($this->eventSourcingConfiguration->isInMemory()) { + // In-memory doesn't use table names, but return consistent format + return '_' . sha1($streamName); + } + + $persistenceStrategy = match ($eventStoreType) { + self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamNameObj), + self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamNameObj), + self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamNameObj), + default => throw InvalidArgumentException::create('Unexpected match value ' . $eventStoreType) + }; + + return $persistenceStrategy->generateTableName($streamNameObj); + } + public function getEventStoreType(): string { if ($this->eventSourcingConfiguration->isInMemory()) { diff --git a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php index 2b0f8c837..34b1861cc 100644 --- a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php @@ -8,8 +8,8 @@ namespace Test\Ecotone\EventSourcing\Projecting; use Ecotone\EventSourcing\EventStore; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSource; -use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder; use Ecotone\EventSourcing\Projecting\StreamSource\GapAwarePosition; use Ecotone\Lite\EcotoneLite; use Ecotone\Lite\Test\FlowTestSupport; @@ -30,6 +30,8 @@ use Test\Ecotone\EventSourcing\Projecting\Fixture\Ticket\TicketCreated; use Test\Ecotone\EventSourcing\Projecting\Fixture\Ticket\TicketEventConverter; +use function sha1; + /** * @internal */ @@ -42,11 +44,21 @@ class GapAwarePositionIntegrationTest extends ProjectingTestCase private static EventStore $eventStore; private static ProjectingManager $projectionManager; private static string $proophTicketTable; + private static PdoStreamTableNameProvider $tableNameProvider; protected function setUp(): void { self::$connectionFactory = self::getConnectionFactory(); self::$clock = new StubUTCClock(); + + // Create a stub table name provider + self::$tableNameProvider = new class implements PdoStreamTableNameProvider { + public function generateTableNameForStream(string $streamName): string + { + return '_' . sha1($streamName); + } + }; + $projection = new #[ProjectionV2(DbalTicketProjection::NAME)] class (self::$connectionFactory->establishConnection()) extends DbalTicketProjection { }; self::$projection = $projection; @@ -69,7 +81,7 @@ classesToResolve: [$projection::class], runForProductionEventStore: true ); - self::$proophTicketTable = EventStoreGlobalStreamSourceBuilder::getProophTableName(Ticket::STREAM_NAME); + self::$proophTicketTable = self::$tableNameProvider->generateTableNameForStream(Ticket::STREAM_NAME); self::$eventStore = self::$ecotone->getGateway(EventStore::class); self::$projectionManager = self::$ecotone->getGateway(ProjectionRegistry::class)->get(DbalTicketProjection::NAME); if (self::$eventStore->hasStream(Ticket::STREAM_NAME)) { @@ -100,7 +112,8 @@ public function test_max_gap_offset_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - self::$proophTicketTable, + Ticket::STREAM_NAME, + self::$tableNameProvider, maxGapOffset: 3, // Only keep gaps within 3 positions gapTimeout: null ); @@ -137,7 +150,8 @@ public function test_gap_timeout_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - self::$proophTicketTable, + Ticket::STREAM_NAME, + self::$tableNameProvider, gapTimeout: Duration::seconds(5) ); @@ -170,7 +184,8 @@ public function test_gap_cleaning_noop_when_no_gaps(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - self::$proophTicketTable, + Ticket::STREAM_NAME, + self::$tableNameProvider, maxGapOffset: 1000, gapTimeout: Duration::seconds(5) ); @@ -191,7 +206,8 @@ public function test_gap_cleaning_noop_when_timeout_disabled(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - self::$proophTicketTable, + Ticket::STREAM_NAME, + self::$tableNameProvider, maxGapOffset: 1000, gapTimeout: null // No timeout );