Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/PdoEventSourcing/src/Config/EventSourcingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Ecotone\EventSourcing\EventSourcingRepositoryBuilder;
use Ecotone\EventSourcing\EventStore;
use Ecotone\EventSourcing\EventStreamEmitter;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\EventSourcing\Mapping\EventMapper;
use Ecotone\EventSourcing\ProjectionLifeCycleConfiguration;
use Ecotone\EventSourcing\ProjectionManager;
Expand Down Expand Up @@ -286,6 +287,12 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
Reference::to(LegacyProjectionsTableManager::class),
]));

// Register PdoStreamTableNameProvider as an alias to LazyProophEventStore
$messagingConfiguration->registerServiceDefinition(
PdoStreamTableNameProvider::class,
Reference::to(LazyProophEventStore::class)
);

$messagingConfiguration->registerServiceDefinition(
LazyProophProjectionManager::class,
new Definition(LazyProophProjectionManager::class, [
Expand Down
26 changes: 26 additions & 0 deletions packages/PdoEventSourcing/src/PdoStreamTableNameProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace Ecotone\EventSourcing;

/**
* Provides table name generation for event streams based on configured persistence strategy.
* This allows runtime resolution of table names instead of hardcoding sha1 hashing.
*
* licence Apache-2.0
*/
interface PdoStreamTableNameProvider
{
/**
* Generate the table name for a given stream based on the configured persistence strategy.
*
* The table name generation depends on:
* - The database type (MySQL, MariaDB, PostgreSQL)
* - The persistence strategy (simple, single, aggregate, partition)
* - The stream name
*
* @param string $streamName The stream name
* @return string The generated table name
*/
public function generateTableNameForStream(string $streamName): string;
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,42 @@
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\Projecting\PartitionProvider;
use Enqueue\Dbal\DbalConnectionFactory;
use RuntimeException;

use function sha1;

class AggregateIdPartitionProvider implements PartitionProvider
{
private string $streamTable;
public function __construct(
private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
private string $aggregateType,
private string $streamName
private string $streamName,
private PdoStreamTableNameProvider $tableNameProvider
) {
// This is the name Prooph uses to store events in the database
$this->streamTable = '_' . sha1($this->streamName);
}

public function partitions(): iterable
{
$connection = $this->getConnection();
$platform = $connection->getDatabasePlatform();

// Resolve table name at runtime using the provider
$streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName);

// Build platform-specific query
if ($platform instanceof PostgreSQLPlatform) {
// PostgreSQL: Use JSONB operators
$query = $connection->executeQuery(<<<SQL
SELECT DISTINCT metadata->>'_aggregate_id' AS aggregate_id
FROM {$this->streamTable}
FROM {$streamTable}
WHERE metadata->>'_aggregate_type' = ?
SQL, [$this->aggregateType]);
} elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) {
// MySQL/MariaDB: Use generated indexed columns for better performance
$query = $connection->executeQuery(<<<SQL
SELECT DISTINCT aggregate_id
FROM {$this->streamTable}
FROM {$streamTable}
WHERE aggregate_type = ?
SQL, [$this->aggregateType]);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Ecotone\EventSourcing\Projecting;

use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
Expand All @@ -31,6 +32,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
Reference::to(DbalConnectionFactory::class),
$this->aggregateType,
$this->streamName,
Reference::to(PdoStreamTableNameProvider::class),
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Doctrine\DBAL\Connection;
use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility;
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\Messaging\Scheduling\DatePoint;
use Ecotone\Messaging\Scheduling\Duration;
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
Expand All @@ -29,7 +30,8 @@ class EventStoreGlobalStreamSource implements StreamSource
public function __construct(
private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory,
private EcotoneClockInterface $clock,
private string $proophStreamTable,
private string $streamName,
private PdoStreamTableNameProvider $tableNameProvider,
private int $maxGapOffset = 5_000,
private ?Duration $gapTimeout = null,
) {
Expand All @@ -50,7 +52,10 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =

$connection = $this->getConnection();

if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $this->proophStreamTable)) {
// Resolve table name at runtime using the provider
$proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName);

if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $proophStreamTable)) {
return new StreamPage([], '');
}

Expand All @@ -63,7 +68,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey =

$query = $connection->executeQuery(<<<SQL
SELECT no, event_name, payload, metadata, created_at
FROM {$this->proophStreamTable}
FROM {$proophStreamTable}
WHERE no > :position {$gapQueryPart}
ORDER BY no
LIMIT {$count}
Expand Down Expand Up @@ -106,10 +111,13 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn
$minGap = $gaps[0];
$maxGap = $gaps[count($gaps) - 1];

// Resolve table name at runtime
$proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName);

// Query interleaved events in the gap range
$interleavedEvents = $connection->executeQuery(<<<SQL
SELECT no, created_at
FROM {$this->proophStreamTable}
FROM {$proophStreamTable}
WHERE no >= :minPosition and no <= :maxPosition
ORDER BY no
LIMIT 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Ecotone\EventSourcing\Projecting\StreamSource;

use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
Expand All @@ -16,8 +17,6 @@
use Ecotone\Projecting\StreamSource;
use Enqueue\Dbal\DbalConnectionFactory;

use function sha1;

class EventStoreGlobalStreamSourceBuilder implements ProjectionComponentBuilder
{
public function __construct(
Expand All @@ -38,15 +37,11 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
[
Reference::to(DbalConnectionFactory::class),
Reference::to(EcotoneClockInterface::class),
self::getProophTableName($this->streamName),
$this->streamName,
Reference::to(PdoStreamTableNameProvider::class),
5_000,
new Definition(Duration::class, [60], 'seconds'),
],
);
}

public static function getProophTableName($streamName): string
{
return '_' . sha1($streamName);
}
}
24 changes: 23 additions & 1 deletion packages/PdoEventSourcing/src/Prooph/LazyProophEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Ecotone\EventSourcing\Database\LegacyProjectionsTableManager;
use Ecotone\EventSourcing\EventSourcingConfiguration;
use Ecotone\EventSourcing\InMemory\StreamIteratorWithPosition;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMariaDbSimpleStreamStrategy;
use Ecotone\EventSourcing\Prooph\PersistenceStrategy\InterlopMysqlSimpleStreamStrategy;
use Ecotone\EventSourcing\ProophEventMapper;
Expand All @@ -35,6 +36,7 @@
use Prooph\EventStore\StreamName;
use RuntimeException;

use function sha1;
use function spl_object_id;
use function str_contains;

Expand All @@ -43,7 +45,7 @@
/**
* licence Apache-2.0
*/
class LazyProophEventStore implements EventStore
class LazyProophEventStore implements EventStore, PdoStreamTableNameProvider
{
public const DEFAULT_ENABLE_WRITE_LOCK_STRATEGY = false;
public const INITIALIZE_ON_STARTUP = true;
Expand Down Expand Up @@ -305,6 +307,26 @@ private function getPostgresPersistenceStrategyFor(?StreamName $streamName = nul
};
}

public function generateTableNameForStream(string $streamName): string
{
$streamNameObj = new StreamName($streamName);
$eventStoreType = $this->getEventStoreType();

if ($this->eventSourcingConfiguration->isInMemory()) {
// In-memory doesn't use table names, but return consistent format
return '_' . sha1($streamName);
}

$persistenceStrategy = match ($eventStoreType) {
self::EVENT_STORE_TYPE_MYSQL => $this->getMysqlPersistenceStrategyFor($streamNameObj),
self::EVENT_STORE_TYPE_MARIADB => $this->getMariaDbPersistenceStrategyFor($streamNameObj),
self::EVENT_STORE_TYPE_POSTGRES => $this->getPostgresPersistenceStrategyFor($streamNameObj),
default => throw InvalidArgumentException::create('Unexpected match value ' . $eventStoreType)
};

return $persistenceStrategy->generateTableName($streamNameObj);
}

public function getEventStoreType(): string
{
if ($this->eventSourcingConfiguration->isInMemory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
namespace Test\Ecotone\EventSourcing\Projecting;

use Ecotone\EventSourcing\EventStore;
use Ecotone\EventSourcing\PdoStreamTableNameProvider;
use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSource;
use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder;
use Ecotone\EventSourcing\Projecting\StreamSource\GapAwarePosition;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Lite\Test\FlowTestSupport;
Expand All @@ -30,6 +30,8 @@
use Test\Ecotone\EventSourcing\Projecting\Fixture\Ticket\TicketCreated;
use Test\Ecotone\EventSourcing\Projecting\Fixture\Ticket\TicketEventConverter;

use function sha1;

/**
* @internal
*/
Expand All @@ -42,11 +44,21 @@ class GapAwarePositionIntegrationTest extends ProjectingTestCase
private static EventStore $eventStore;
private static ProjectingManager $projectionManager;
private static string $proophTicketTable;
private static PdoStreamTableNameProvider $tableNameProvider;

protected function setUp(): void
{
self::$connectionFactory = self::getConnectionFactory();
self::$clock = new StubUTCClock();

// Create a stub table name provider
self::$tableNameProvider = new class implements PdoStreamTableNameProvider {
public function generateTableNameForStream(string $streamName): string
{
return '_' . sha1($streamName);
}
};

$projection = new #[ProjectionV2(DbalTicketProjection::NAME)] class (self::$connectionFactory->establishConnection()) extends DbalTicketProjection {
};
self::$projection = $projection;
Expand All @@ -69,7 +81,7 @@ classesToResolve: [$projection::class],
runForProductionEventStore: true
);

self::$proophTicketTable = EventStoreGlobalStreamSourceBuilder::getProophTableName(Ticket::STREAM_NAME);
self::$proophTicketTable = self::$tableNameProvider->generateTableNameForStream(Ticket::STREAM_NAME);
self::$eventStore = self::$ecotone->getGateway(EventStore::class);
self::$projectionManager = self::$ecotone->getGateway(ProjectionRegistry::class)->get(DbalTicketProjection::NAME);
if (self::$eventStore->hasStream(Ticket::STREAM_NAME)) {
Expand Down Expand Up @@ -100,7 +112,8 @@ public function test_max_gap_offset_cleaning(): void
$streamSource = new EventStoreGlobalStreamSource(
self::$connectionFactory,
self::$clock,
self::$proophTicketTable,
Ticket::STREAM_NAME,
self::$tableNameProvider,
maxGapOffset: 3, // Only keep gaps within 3 positions
gapTimeout: null
);
Expand Down Expand Up @@ -137,7 +150,8 @@ public function test_gap_timeout_cleaning(): void
$streamSource = new EventStoreGlobalStreamSource(
self::$connectionFactory,
self::$clock,
self::$proophTicketTable,
Ticket::STREAM_NAME,
self::$tableNameProvider,
gapTimeout: Duration::seconds(5)
);

Expand Down Expand Up @@ -170,7 +184,8 @@ public function test_gap_cleaning_noop_when_no_gaps(): void
$streamSource = new EventStoreGlobalStreamSource(
self::$connectionFactory,
self::$clock,
self::$proophTicketTable,
Ticket::STREAM_NAME,
self::$tableNameProvider,
maxGapOffset: 1000,
gapTimeout: Duration::seconds(5)
);
Expand All @@ -191,7 +206,8 @@ public function test_gap_cleaning_noop_when_timeout_disabled(): void
$streamSource = new EventStoreGlobalStreamSource(
self::$connectionFactory,
self::$clock,
self::$proophTicketTable,
Ticket::STREAM_NAME,
self::$tableNameProvider,
maxGapOffset: 1000,
gapTimeout: null // No timeout
);
Expand Down