diff --git a/packages/Ecotone/src/Lite/Test/Configuration/EcotoneTestSupportModule.php b/packages/Ecotone/src/Lite/Test/Configuration/EcotoneTestSupportModule.php index 2ddbd80be..a239c3a43 100644 --- a/packages/Ecotone/src/Lite/Test/Configuration/EcotoneTestSupportModule.php +++ b/packages/Ecotone/src/Lite/Test/Configuration/EcotoneTestSupportModule.php @@ -42,9 +42,11 @@ use Ecotone\Modelling\CommandBus; use Ecotone\Modelling\EventBus; use Ecotone\Modelling\QueryBus; +use Ecotone\Projecting\InMemory\InMemoryEventStoreStreamSource; use Ecotone\Projecting\InMemory\InMemoryEventStoreStreamSourceBuilder; -use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorageBuilder; -use Ecotone\Projecting\InMemory\InMemoryStreamSourceBuilder; +use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorage; +use Ecotone\Projecting\ProjectionStateStorageReference; +use Ecotone\Projecting\StreamSourceReference; #[ModuleAnnotation] /** @@ -197,6 +199,13 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO GatewayHeaderBuilder::create('channelName', 'ecotone.test_support_gateway.channel_name'), ])); } + + $messagingConfiguration->registerServiceDefinition( + InMemoryEventStoreStreamSource::class, + new Definition(InMemoryEventStoreStreamSource::class, [ + new Reference(InMemoryEventStore::class), + ]) + ); } public function canHandle($extensionObject): bool @@ -229,8 +238,6 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, } } - // If EVENT_SOURCING_PACKAGE is enabled but no EventSourcingConfiguration is provided, - // it means DBAL mode is being used, so don't register InMemoryEventStoreStreamSource if (! $hasEventSourcingConfiguration) { $shouldRegisterInMemoryStreamSource = false; } @@ -240,15 +247,7 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, return []; } - // Check if user has registered a custom InMemoryStreamSourceBuilder - // If so, don't register InMemoryEventStoreStreamSourceBuilder to avoid conflicts - foreach ($serviceExtensions as $extensionObject) { - if ($extensionObject instanceof InMemoryStreamSourceBuilder) { - return []; - } - } - - return [new InMemoryEventStoreStreamSourceBuilder(), new InMemoryProjectionStateStorageBuilder()]; + return [new StreamSourceReference(InMemoryEventStoreStreamSource::class), new ProjectionStateStorageReference(InMemoryProjectionStateStorage::class)]; } public function getModulePackageName(): string diff --git a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php index aaa3787d1..846295781 100644 --- a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php +++ b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php @@ -65,10 +65,13 @@ use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor; use Ecotone\Modelling\QueryBus; use Ecotone\OpenTelemetry\Configuration\OpenTelemetryModule; +use Ecotone\Projecting\Config\PartitionProviderRegistryModule; use Ecotone\Projecting\Config\ProjectingAttributeModule; use Ecotone\Projecting\Config\ProjectingConsoleCommands; use Ecotone\Projecting\Config\ProjectingModule; +use Ecotone\Projecting\Config\ProjectionStateStorageRegistryModule; use Ecotone\Projecting\Config\StreamFilterRegistryModule; +use Ecotone\Projecting\Config\StreamSourceRegistryModule; use Ecotone\Projecting\EventStoreAdapter\EventStoreAdapterModule; use Ecotone\Redis\Configuration\RedisMessageConsumerModule; use Ecotone\Redis\Configuration\RedisMessagePublisherModule; @@ -114,9 +117,12 @@ class ModuleClassList InstantRetryAttributeModule::class, DynamicMessageChannelModule::class, EventSourcedRepositoryModule::class, + PartitionProviderRegistryModule::class, ProjectingModule::class, ProjectingAttributeModule::class, + ProjectionStateStorageRegistryModule::class, StreamFilterRegistryModule::class, + StreamSourceRegistryModule::class, EventStoreAdapterModule::class, /** Attribute based configurations */ diff --git a/packages/Ecotone/src/Projecting/Attribute/PartitionProvider.php b/packages/Ecotone/src/Projecting/Attribute/PartitionProvider.php new file mode 100644 index 000000000..b4978c81f --- /dev/null +++ b/packages/Ecotone/src/Projecting/Attribute/PartitionProvider.php @@ -0,0 +1,21 @@ +findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { + $projectionAttribute = $annotationFinder->getAttributeForClass($projectionClassName, ProjectionV2::class); + $allProjectionNames[] = $projectionAttribute->name; + } + + $userlandPartitionProviderReferences = []; + foreach ($annotationFinder->findAnnotatedClasses(PartitionProviderAttribute::class) as $providerClassName) { + $userlandPartitionProviderReferences[] = AnnotatedDefinitionReference::getReferenceForClassName($annotationFinder, $providerClassName); + } + + return new self($allProjectionNames, $userlandPartitionProviderReferences); + } + + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void + { + $partitionProviderReferences = ExtensionObjectResolver::resolve( + PartitionProviderReference::class, + $extensionObjects + ); + + $partitionedProjectionNames = []; + foreach ($partitionProviderReferences as $ref) { + $partitionedProjectionNames = array_merge($partitionedProjectionNames, $ref->getPartitionedProjectionNames()); + } + $partitionedProjectionNames = array_unique($partitionedProjectionNames); + + $nonPartitionedProjectionNames = array_values(array_diff($this->allProjectionNames, $partitionedProjectionNames)); + + $userlandProviders = array_map( + fn (string $reference) => new Reference($reference), + $this->userlandPartitionProviderReferences + ); + + $builtinProviders = array_map( + fn (PartitionProviderReference $ref) => new Reference($ref->getReferenceName()), + $partitionProviderReferences + ); + + $messagingConfiguration->registerServiceDefinition( + SinglePartitionProvider::class, + new Definition(SinglePartitionProvider::class, [$nonPartitionedProjectionNames]) + ); + + $builtinProviders[] = new Reference(SinglePartitionProvider::class); + + $allProviders = array_merge($userlandProviders, $builtinProviders); + + $messagingConfiguration->registerServiceDefinition( + PartitionProviderRegistry::class, + new Definition(PartitionProviderRegistry::class, [$allProviders]) + ); + } + + public function canHandle($extensionObject): bool + { + return $extensionObject instanceof PartitionProviderReference; + } + + public function getModulePackageName(): string + { + return ModulePackageList::CORE_PACKAGE; + } +} + diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 45471665b..74131856f 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -29,15 +29,14 @@ use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder; use Ecotone\Projecting\BackfillExecutorHandler; use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry; -use Ecotone\Projecting\PartitionProvider; +use Ecotone\Projecting\PartitionProviderRegistry; use Ecotone\Projecting\ProjectingHeaders; use Ecotone\Projecting\ProjectingManager; use Ecotone\Projecting\ProjectionRegistry; -use Ecotone\Projecting\ProjectionStateStorage; +use Ecotone\Projecting\ProjectionStateStorageRegistry; use Ecotone\Projecting\SinglePartitionProvider; use Ecotone\Projecting\StreamFilterRegistry; -use Ecotone\Projecting\StreamSource; -use Ramsey\Uuid\Uuid; +use Ecotone\Projecting\StreamSourceRegistry; /** * This module allows to configure projections in a standard way @@ -61,33 +60,15 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO { $serviceConfiguration = ExtensionObjectResolver::resolveUnique(ServiceConfiguration::class, $extensionObjects, ServiceConfiguration::createWithDefaults()); $projectionBuilders = ExtensionObjectResolver::resolve(ProjectionExecutorBuilder::class, $extensionObjects); - $componentBuilders = ExtensionObjectResolver::resolve(ProjectionComponentBuilder::class, $extensionObjects); if (! empty($projectionBuilders) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { throw ConfigurationException::create('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.'); } - /** @var array> $components [projection name][component name][reference] */ - $components = []; - foreach ($componentBuilders as $componentBuilder) { - foreach ($projectionBuilders as $projectionBuilder) { - $projectionName = $projectionBuilder->projectionName(); - foreach ([StreamSource::class, PartitionProvider::class, ProjectionStateStorage::class] as $component) { - if ($componentBuilder->canHandle($projectionName, $component)) { - if (isset($components[$projectionName][$component])) { - throw ConfigurationException::create( - "Projection with name {$projectionName} is already registered for component {$component} with reference {$components[$projectionName][$component]}." - . ' You can only register one component of each type per projection. Please check your configuration.' - ); - } - - $reference = Uuid::uuid4()->toString(); - $moduleReferenceSearchService->store($reference, $componentBuilder); - $components[$projectionName][$component] = new Reference($reference); - } - } - } - } + $messagingConfiguration->registerServiceDefinition( + SinglePartitionProvider::class, + new Definition(SinglePartitionProvider::class) + ); $projectionRegistryMap = []; foreach ($projectionBuilders as $projectionBuilder) { @@ -98,10 +79,10 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $messagingConfiguration->registerServiceDefinition( $projectingManagerReference = ProjectingManager::class . ':' . $projectionName, new Definition(ProjectingManager::class, [ - $components[$projectionName][ProjectionStateStorage::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have projection state storage configured. Please check your configuration."), + new Reference(ProjectionStateStorageRegistry::class), new Reference($reference), - $components[$projectionName][StreamSource::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have stream source configured. Please check your configuration."), - $components[$projectionName][PartitionProvider::class] ?? new Definition(SinglePartitionProvider::class), + new Reference(StreamSourceRegistry::class), + new Reference(PartitionProviderRegistry::class), new Reference(StreamFilterRegistry::class), $projectionName, new Reference(TerminationListener::class), @@ -186,8 +167,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO public function canHandle($extensionObject): bool { return $extensionObject instanceof ServiceConfiguration - || $extensionObject instanceof ProjectionExecutorBuilder - || $extensionObject instanceof ProjectionComponentBuilder; + || $extensionObject instanceof ProjectionExecutorBuilder; } public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array diff --git a/packages/Ecotone/src/Projecting/Config/ProjectionComponentBuilder.php b/packages/Ecotone/src/Projecting/Config/ProjectionComponentBuilder.php deleted file mode 100644 index e2ae02e62..000000000 --- a/packages/Ecotone/src/Projecting/Config/ProjectionComponentBuilder.php +++ /dev/null @@ -1,18 +0,0 @@ -findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { + $projectionAttribute = $annotationFinder->getAttributeForClass($projectionClassName, ProjectionV2::class); + $allProjectionNames[] = $projectionAttribute->name; + } + + $userlandStateStorageReferences = []; + foreach ($annotationFinder->findAnnotatedClasses(StateStorageAttribute::class) as $storageClassName) { + $userlandStateStorageReferences[] = AnnotatedDefinitionReference::getReferenceForClassName($annotationFinder, $storageClassName); + } + + return new self($allProjectionNames, $userlandStateStorageReferences); + } + + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void + { + $stateStorageReferences = ExtensionObjectResolver::resolve( + ProjectionStateStorageReference::class, + $extensionObjects + ); + + $userlandStorages = \array_map( + fn (string $reference) => new Reference($reference), + $this->userlandStateStorageReferences + ); + + $builtinStorages = \array_map( + fn (ProjectionStateStorageReference $ref) => new Reference($ref->getReferenceName()), + $stateStorageReferences + ); + + $messagingConfiguration->registerServiceDefinition( + InMemoryProjectionStateStorage::class, + new Definition(InMemoryProjectionStateStorage::class, [null]) + ); + + $messagingConfiguration->registerServiceDefinition( + ProjectionStateStorageRegistry::class, + new Definition(ProjectionStateStorageRegistry::class, [ + $userlandStorages, + $builtinStorages, + ]) + ); + } + + public function canHandle($extensionObject): bool + { + return $extensionObject instanceof ProjectionStateStorageReference; + } + + public function getModulePackageName(): string + { + return ModulePackageList::CORE_PACKAGE; + } +} + diff --git a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php index 10fba9962..8e6a77865 100644 --- a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php +++ b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php @@ -12,6 +12,7 @@ use Ecotone\EventSourcing\Attribute\FromAggregateStream; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\Stream; +use Ecotone\EventSourcing\EventStore; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotationModule; use Ecotone\Messaging\Config\Configuration; @@ -26,6 +27,8 @@ use Ecotone\Modelling\Attribute\NamedEvent; use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder; use Ecotone\Projecting\Attribute\ProjectionV2; +use Ecotone\Projecting\Attribute\Streaming; +use Ecotone\Projecting\EventStoreAdapter\EventStreamingChannelAdapter; use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamFilterRegistry; @@ -71,7 +74,8 @@ public static function collectStreamFilters(AnnotationFinder $annotationFinder, $streamFilters[$projectionName][] = self::resolveFromAggregateStream($annotationFinder, $aggregateStreamAttribute, $projectionName, $eventNames); } - if (! isset($streamFilters[$projectionName]) || $streamFilters[$projectionName] === []) { + $isStreamingProjection = $annotationFinder->findAttributeForClass($classname, Streaming::class) !== null; + if (! $isStreamingProjection && (! isset($streamFilters[$projectionName]) || $streamFilters[$projectionName] === [])) { throw ConfigurationException::create( "Projection '{$projectionName}' must have at least one #[FromStream] or #[FromAggregateStream] attribute defined on class {$classname}." ); @@ -185,6 +189,20 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } } + foreach ($extensionObjects as $extensionObject) { + if ($extensionObject instanceof EventStreamingChannelAdapter) { + $projectionName = $extensionObject->getProjectionName(); + $filtersDefinition[$projectionName] = [ + new Definition(StreamFilter::class, [ + $extensionObject->fromStream, + $extensionObject->aggregateType, + EventStore::class, + $extensionObject->eventNames, + ]), + ]; + } + } + $messagingConfiguration->registerServiceDefinition( StreamFilterRegistry::class, new Definition(StreamFilterRegistry::class, [$filtersDefinition]) @@ -193,7 +211,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO public function canHandle($extensionObject): bool { - return false; + return $extensionObject instanceof EventStreamingChannelAdapter; } public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array diff --git a/packages/Ecotone/src/Projecting/Config/StreamSourceRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamSourceRegistryModule.php new file mode 100644 index 000000000..fd23dcba3 --- /dev/null +++ b/packages/Ecotone/src/Projecting/Config/StreamSourceRegistryModule.php @@ -0,0 +1,89 @@ +findAnnotatedClasses(StreamSourceAttribute::class) as $sourceClassName) { + $userlandStreamSourceReferences[] = AnnotatedDefinitionReference::getReferenceForClassName($annotationFinder, $sourceClassName); + } + + return new self($userlandStreamSourceReferences); + } + + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void + { + $streamSourceReferences = ExtensionObjectResolver::resolve( + StreamSourceReference::class, + $extensionObjects + ); + + $userlandSources = array_map( + fn (string $reference) => new Reference($reference), + $this->userlandStreamSourceReferences + ); + + $builtinSources = array_map( + fn (StreamSourceReference $ref) => new Reference($ref->getReferenceName()), + $streamSourceReferences + ); + + $messagingConfiguration->registerServiceDefinition( + StreamSourceRegistry::class, + new Definition(StreamSourceRegistry::class, [ + $userlandSources, + $builtinSources, + ]) + ); + } + + public function canHandle($extensionObject): bool + { + return $extensionObject instanceof StreamSourceReference + || ($extensionObject instanceof StreamSource && $extensionObject instanceof DefinedObject); + } + + public function getModulePackageName(): string + { + return ModulePackageList::CORE_PACKAGE; + } +} + diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index bfef18135..9a1ca6317 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -32,12 +32,12 @@ public function projectionName(): string public function asyncChannelName(): ?string { - return null; // Channel adapters are always polling-based + return null; } public function partitionHeader(): ?string { - return null; // Channel adapters don't support partitioning + return null; } public function automaticInitialization(): bool @@ -52,17 +52,16 @@ public function eventLoadingBatchSize(): int public function backfillPartitionBatchSize(): int { - return 100; // Default value, streaming channel adapters don't support partitioned backfill + return 100; } public function backfillAsyncChannelName(): ?string { - return null; // Streaming channel adapters don't support async backfill + return null; } public function compile(MessagingContainerBuilder $builder): Definition|Reference { - // Create the projection executor that forwards events to the streaming channel return new Definition( EventStoreChannelAdapterProjection::class, [ diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php index 970d8ad86..9c2e50564 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php @@ -23,17 +23,24 @@ class InMemoryEventStoreStreamSource implements StreamSource { /** + * @param array|null $projectionNames Projection names this source handles, null means all projections * @param array $eventNames Event names to filter by, empty array means no filtering */ public function __construct( private InMemoryEventStore $eventStore, + private ?array $projectionNames = null, private ?string $streamName = null, private ?string $partitionHeader = null, private array $eventNames = [], ) { } - public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function canHandle(string $projectionName): bool + { + return $this->projectionNames === null || \in_array($projectionName, $this->projectionNames, true); + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage { // Position is 0-based index into the global event array (like InMemoryStreamSource) $from = $lastPosition !== null ? (int) $lastPosition : 0; diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php deleted file mode 100644 index 1c22693a0..000000000 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSourceBuilder.php +++ /dev/null @@ -1,50 +0,0 @@ - $eventNames Event names to filter by, empty array means no filtering - */ - public function __construct( - private ?array $projectionNames = null, - private ?string $streamName = null, - private ?string $partitionHeader = null, - private array $eventNames = [], - ) { - } - - public function canHandle(string $projectionName, string $component): bool - { - return $component === StreamSource::class - && ($this->projectionNames === null || in_array($projectionName, $this->projectionNames, true)); - } - - public function compile(MessagingContainerBuilder $builder): Definition|Reference - { - return new Definition( - InMemoryEventStoreStreamSource::class, - [ - Reference::to(InMemoryEventStore::class), - $this->streamName, - $this->partitionHeader, - $this->eventNames, - ] - ); - } -} diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php index 5416b6f3e..bcfe26e16 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php @@ -20,6 +20,19 @@ class InMemoryProjectionStateStorage implements ProjectionStateStorage */ private array $projectionStates = []; + /** + * @param string[]|null $projectionNames + */ + public function __construct( + private ?array $projectionNames = null, + ) { + } + + public function canHandle(string $projectionName): bool + { + return $this->projectionNames === null || \in_array($projectionName, $this->projectionNames, true); + } + public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState { $key = $this->getKey($projectionName, $partitionKey); diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorageBuilder.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorageBuilder.php deleted file mode 100644 index 1c66b9778..000000000 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorageBuilder.php +++ /dev/null @@ -1,31 +0,0 @@ -projectionNames === null || in_array($projectionName, $this->projectionNames, true)); - } -} diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php index 0a4f7e7fc..fb0aadba4 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php @@ -14,14 +14,22 @@ class InMemoryStreamSource implements StreamSource { /** + * @param string[]|null $handledProjectionNames null means handles all projections * @param Event[] $events */ public function __construct( + private ?array $handledProjectionNames = null, private ?string $partitionHeader = null, private array $events = [], ) { } + public function canHandle(string $projectionName): bool + { + return $this->handledProjectionNames === null + || \in_array($projectionName, $this->handledProjectionNames, true); + } + public function append(Event ...$events): void { foreach ($events as $event) { @@ -29,7 +37,7 @@ public function append(Event ...$events): void } } - public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSourceBuilder.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSourceBuilder.php deleted file mode 100644 index d4ce99958..000000000 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSourceBuilder.php +++ /dev/null @@ -1,39 +0,0 @@ -projectionNames === null || in_array($projectionName, $this->projectionNames, true)); - } - - public function getDefinition(): Definition - { - return new Definition(InMemoryStreamSource::class); - } -} diff --git a/packages/Ecotone/src/Projecting/PartitionProvider.php b/packages/Ecotone/src/Projecting/PartitionProvider.php index 39f66314a..f93ec52c7 100644 --- a/packages/Ecotone/src/Projecting/PartitionProvider.php +++ b/packages/Ecotone/src/Projecting/PartitionProvider.php @@ -9,6 +9,14 @@ interface PartitionProvider { + /** + * Returns whether this provider can handle the given projection. + * + * @param string $projectionName The name of the projection + * @return bool True if this provider can handle the projection + */ + public function canHandle(string $projectionName): bool; + /** * Returns the total count of partitions for the given stream filter. * For non-partitioned projections, returns 1. diff --git a/packages/Ecotone/src/Projecting/PartitionProviderReference.php b/packages/Ecotone/src/Projecting/PartitionProviderReference.php new file mode 100644 index 000000000..6519a5f52 --- /dev/null +++ b/packages/Ecotone/src/Projecting/PartitionProviderReference.php @@ -0,0 +1,41 @@ +referenceName; + } + + /** + * @return string[] + */ + public function getPartitionedProjectionNames(): array + { + return $this->partitionedProjectionNames; + } +} + diff --git a/packages/Ecotone/src/Projecting/PartitionProviderRegistry.php b/packages/Ecotone/src/Projecting/PartitionProviderRegistry.php new file mode 100644 index 000000000..95e82b058 --- /dev/null +++ b/packages/Ecotone/src/Projecting/PartitionProviderRegistry.php @@ -0,0 +1,33 @@ +providers as $provider) { + if ($provider->canHandle($projectionName)) { + return $provider; + } + } + + throw new RuntimeException("No partition provider found for projection: {$projectionName}"); + } +} + diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 955c946d7..93eb76d6b 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -16,19 +16,21 @@ class ProjectingManager { private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; + private ?ProjectionStateStorage $projectionStateStorage = null; + public function __construct( - private ProjectionStateStorage $projectionStateStorage, - private ProjectorExecutor $projectorExecutor, - private StreamSource $streamSource, - private PartitionProvider $partitionProvider, - private StreamFilterRegistry $streamFilterRegistry, - private string $projectionName, - private TerminationListener $terminationListener, - private MessagingEntrypoint $messagingEntrypoint, - private int $eventLoadingBatchSize = 1000, - private bool $automaticInitialization = true, - private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, - private ?string $backfillAsyncChannelName = null, + private ProjectionStateStorageRegistry $projectionStateStorageRegistry, + private ProjectorExecutor $projectorExecutor, + private StreamSourceRegistry $streamSourceRegistry, + private PartitionProviderRegistry $partitionProviderRegistry, + private StreamFilterRegistry $streamFilterRegistry, + private string $projectionName, + private TerminationListener $terminationListener, + private MessagingEntrypoint $messagingEntrypoint, + private int $eventLoadingBatchSize = 1000, + private bool $automaticInitialization = true, + private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, + private ?string $backfillAsyncChannelName = null, ) { if ($eventLoadingBatchSize < 1) { throw new InvalidArgumentException('Event loading batch size must be at least 1'); @@ -38,6 +40,14 @@ public function __construct( } } + private function getProjectionStateStorage(): ProjectionStateStorage + { + if ($this->projectionStateStorage === null) { + $this->projectionStateStorage = $this->projectionStateStorageRegistry->getFor($this->projectionName); + } + return $this->projectionStateStorage; + } + public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void { do { @@ -50,7 +60,7 @@ public function execute(?string $partitionKeyValue = null, bool $manualInitializ */ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitialize): int { - $transaction = $this->projectionStateStorage->beginTransaction(); + $transaction = $this->getProjectionStateStorage()->beginTransaction(); try { $projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $canInitialize); if ($projectionState === null) { @@ -58,7 +68,8 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial return 0; } - $streamPage = $this->streamSource->load($projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); + $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); + $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); $userState = $projectionState->userState; $processedEvents = 0; @@ -79,7 +90,7 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial $projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED); } - $this->projectionStateStorage->savePartition($projectionState); + $this->getProjectionStateStorage()->savePartition($projectionState); $transaction->commit(); return $processedEvents; } catch (Throwable $e) { @@ -90,12 +101,12 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial public function loadState(?string $partitionKey = null): ProjectionPartitionState { - return $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); + return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey); } public function getPartitionProvider(): PartitionProvider { - return $this->partitionProvider; + return $this->partitionProviderRegistry->getPartitionProviderFor($this->projectionName); } public function getProjectionName(): string @@ -105,14 +116,14 @@ public function getProjectionName(): string public function init(): void { - $this->projectionStateStorage->init($this->projectionName); + $this->getProjectionStateStorage()->init($this->projectionName); $this->projectorExecutor->init(); } public function delete(): void { - $this->projectionStateStorage->delete($this->projectionName); + $this->getProjectionStateStorage()->delete($this->projectionName); $this->projectorExecutor->delete(); } @@ -133,7 +144,7 @@ public function prepareBackfill(): void private function prepareBackfillForFilter(StreamFilter $streamFilter): void { - $totalPartitions = $this->partitionProvider->count($streamFilter); + $totalPartitions = $this->getPartitionProvider()->count($streamFilter); if ($totalPartitions === 0) { return; @@ -184,10 +195,10 @@ public function backfill(): void private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState { - $projectionState = $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); + $storage = $this->getProjectionStateStorage(); + $projectionState = $storage->loadPartition($this->projectionName, $partitionKey); if (! $canInitialize && $projectionState?->status === ProjectionInitializationStatus::UNINITIALIZED) { - // Projection is being initialized by another process return null; } if ($projectionState) { @@ -195,12 +206,11 @@ private function loadOrInitializePartitionState(?string $partitionKey, bool $can } if ($canInitialize) { - $projectionState = $this->projectionStateStorage->initPartition($this->projectionName, $partitionKey); + $projectionState = $storage->initPartition($this->projectionName, $partitionKey); if ($projectionState) { $this->projectorExecutor->init(); } else { - // Someone else initialized it in the meantime, reload the state - $projectionState = $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); + $projectionState = $storage->loadPartition($this->projectionName, $partitionKey); } return $projectionState; } diff --git a/packages/Ecotone/src/Projecting/ProjectionStateStorage.php b/packages/Ecotone/src/Projecting/ProjectionStateStorage.php index 04d0ed935..a1bbcf4eb 100644 --- a/packages/Ecotone/src/Projecting/ProjectionStateStorage.php +++ b/packages/Ecotone/src/Projecting/ProjectionStateStorage.php @@ -9,8 +9,9 @@ interface ProjectionStateStorage { + public function canHandle(string $projectionName): bool; public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState; - public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState; // Returns created state or null if already exists + public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState; public function savePartition(ProjectionPartitionState $projectionState): void; public function delete(string $projectionName): void; public function init(string $projectionName): void; diff --git a/packages/Ecotone/src/Projecting/ProjectionStateStorageReference.php b/packages/Ecotone/src/Projecting/ProjectionStateStorageReference.php new file mode 100644 index 000000000..131cdd4c7 --- /dev/null +++ b/packages/Ecotone/src/Projecting/ProjectionStateStorageReference.php @@ -0,0 +1,30 @@ +referenceName; + } +} + diff --git a/packages/Ecotone/src/Projecting/ProjectionStateStorageRegistry.php b/packages/Ecotone/src/Projecting/ProjectionStateStorageRegistry.php new file mode 100644 index 000000000..2a45e9f0e --- /dev/null +++ b/packages/Ecotone/src/Projecting/ProjectionStateStorageRegistry.php @@ -0,0 +1,40 @@ +userlandStorages as $storage) { + if ($storage->canHandle($projectionName)) { + return $storage; + } + } + + foreach ($this->builtinStorages as $storage) { + if ($storage->canHandle($projectionName)) { + return $storage; + } + } + + throw new RuntimeException("No projection state storage found for projection: {$projectionName}"); + } +} + diff --git a/packages/Ecotone/src/Projecting/SinglePartitionProvider.php b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php index 5eccfbca2..1fabadab9 100644 --- a/packages/Ecotone/src/Projecting/SinglePartitionProvider.php +++ b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php @@ -9,6 +9,19 @@ class SinglePartitionProvider implements PartitionProvider { + /** + * @param string[] $nonPartitionedProjectionNames + */ + public function __construct( + private array $nonPartitionedProjectionNames, + ) { + } + + public function canHandle(string $projectionName): bool + { + return \in_array($projectionName, $this->nonPartitionedProjectionNames, true); + } + public function count(StreamFilter $filter): int { return 1; @@ -16,8 +29,6 @@ public function count(StreamFilter $filter): int public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable { - // Global projection has a single null partition - // If offset is 0 and limit allows at least 1, yield the single partition if ($offset === 0 && ($limit === null || $limit >= 1)) { yield null; } diff --git a/packages/Ecotone/src/Projecting/StreamFilter.php b/packages/Ecotone/src/Projecting/StreamFilter.php index ca2ea12a4..37556bda7 100644 --- a/packages/Ecotone/src/Projecting/StreamFilter.php +++ b/packages/Ecotone/src/Projecting/StreamFilter.php @@ -21,7 +21,7 @@ final class StreamFilter public function __construct( public readonly string $streamName, public readonly ?string $aggregateType = null, - public readonly string $eventStoreReferenceName = EventStore::class, + public readonly ?string $eventStoreReferenceName = EventStore::class, public readonly array $eventNames = [], ) { } diff --git a/packages/Ecotone/src/Projecting/StreamSource.php b/packages/Ecotone/src/Projecting/StreamSource.php index e9a4760d3..3d78189a7 100644 --- a/packages/Ecotone/src/Projecting/StreamSource.php +++ b/packages/Ecotone/src/Projecting/StreamSource.php @@ -9,5 +9,7 @@ interface StreamSource { - public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage; + public function canHandle(string $projectionName): bool; + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage; } diff --git a/packages/Ecotone/src/Projecting/StreamSourceReference.php b/packages/Ecotone/src/Projecting/StreamSourceReference.php new file mode 100644 index 000000000..54cd98fc7 --- /dev/null +++ b/packages/Ecotone/src/Projecting/StreamSourceReference.php @@ -0,0 +1,30 @@ +referenceName; + } +} + diff --git a/packages/Ecotone/src/Projecting/StreamSourceRegistry.php b/packages/Ecotone/src/Projecting/StreamSourceRegistry.php new file mode 100644 index 000000000..2dee52139 --- /dev/null +++ b/packages/Ecotone/src/Projecting/StreamSourceRegistry.php @@ -0,0 +1,40 @@ +userlandSources as $source) { + if ($source->canHandle($projectionName)) { + return $source; + } + } + + foreach ($this->builtinSources as $source) { + if ($source->canHandle($projectionName)) { + return $source; + } + } + + throw new RuntimeException("No stream source found for projection: {$projectionName}"); + } +} + diff --git a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php index 5040ad372..5ac6a8519 100644 --- a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php +++ b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php @@ -12,10 +12,11 @@ use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Event; +use Ecotone\Projecting\Attribute; use Ecotone\Projecting\Attribute\Polling; use Ecotone\Projecting\Attribute\ProjectionV2; -use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorageBuilder; -use Ecotone\Projecting\InMemory\InMemoryStreamSourceBuilder; +use Ecotone\Projecting\StreamPage; +use Ecotone\Projecting\StreamSource; use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\TestCase; @@ -74,7 +75,7 @@ public function onEvent(object $event): void $this->assertEquals(['id' => 2, 'name' => 'Event 2'], $projection->events[1]); } - public function test_does_not_register_in_memory_stream_source_when_custom_stream_source_is_provided(): void + public function test_custom_userland_stream_source_is_used_when_provided(): void { $testEvent = new class () { public function __construct(public int $id = 0, public string $name = '') @@ -94,29 +95,45 @@ public function onEvent(object $event): void } }; - // And a custom stream source - $customStreamSource = new InMemoryStreamSourceBuilder(); + $customStreamSource = new #[Attribute\StreamSource] class implements StreamSource { + private array $events = []; + + public function append(Event ...$events): void + { + foreach ($events as $event) { + $this->events[] = $event; + } + } + + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + $from = $lastPosition !== null ? (int) $lastPosition : 0; + $events = array_slice($this->events, $from, $count); + $to = $from + count($events); + return new StreamPage($events, (string) $to); + } + }; - // When bootstrapping with a custom stream source $ecotone = EcotoneLite::bootstrapFlowTesting( - [$projection::class, $testEvent::class], - [$projection], + [$projection::class, $testEvent::class, $customStreamSource::class], + [$projection, $customStreamSource], configuration: ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) - ->withExtensionObjects([$customStreamSource, new InMemoryProjectionStateStorageBuilder()]) ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); - // And adding events to the custom stream source $customStreamSource->append( Event::create(new ($testEvent::class)(1, 'Event 1')), Event::create(new ($testEvent::class)(2, 'Event 2')), ); - // When running the polling projection $ecotone->run('test_projection_poller', ExecutionPollingMetadata::createWithTestingSetup()); - // Then the projection should have consumed events from the custom stream source $this->assertEquals(2, $projection->callCount, 'Event handler should have been called 2 times'); $this->assertCount(2, $projection->events, 'Projection should have consumed 2 events'); $this->assertEquals(['id' => 1, 'name' => 'Event 1'], $projection->events[0]); diff --git a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php index 1dec00e45..2703e841a 100644 --- a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php +++ b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php @@ -37,7 +37,7 @@ public function test_event_streaming_projection_consuming_from_streaming_channel $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection that consumes from streaming channel - $projection = new #[ProjectionV2('user_projection'), FromStream('test_stream'), Streaming('streaming_channel')] class { + $projection = new #[ProjectionV2('user_projection'), Streaming('streaming_channel')] class { public array $projectedUsers = []; #[EventHandler] @@ -87,7 +87,7 @@ public function test_event_streaming_projection_with_multiple_event_handlers_rou $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection with two event handlers routed by event names - $projection = new #[ProjectionV2('order_projection'), FromStream('test_stream'), Streaming('streaming_channel')] class { + $projection = new #[ProjectionV2('order_projection'), Streaming('streaming_channel')] class { public array $createdOrders = []; public array $completedOrders = []; @@ -151,7 +151,7 @@ public function test_event_streaming_projection_with_event_store_channel_adapter $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection that consumes from streaming channel - $projection = new #[ProjectionV2('product_projection'), FromStream('test_stream'), Streaming('event_stream')] class { + $projection = new #[ProjectionV2('product_projection'), Streaming('event_stream')] class { public array $projectedProducts = []; #[EventHandler] @@ -212,7 +212,7 @@ public function test_two_event_streaming_projections_consuming_from_same_channel $positionTracker = new InMemoryConsumerPositionTracker(); // Given two projections consuming from the same streaming channel - $productListProjection = new #[ProjectionV2('product_list_projection'), FromStream('test_stream'), Streaming('event_stream')] class { + $productListProjection = new #[ProjectionV2('product_list_projection'), Streaming('event_stream')] class { public array $productList = []; #[EventHandler] @@ -222,7 +222,7 @@ public function onProductRegistered(ProductRegistered $event): void } }; - $productPriceProjection = new #[ProjectionV2('product_price_projection'), FromStream('test_stream'), Streaming('event_stream')] class { + $productPriceProjection = new #[ProjectionV2('product_price_projection'), Streaming('event_stream')] class { public array $productPrices = []; #[EventHandler] @@ -305,7 +305,7 @@ public function onProductRegistered(ProductRegistered $event): void }; // Given an event streaming projection (processes events in polling mode from streaming channel) - $eventStreamingProjection = new #[ProjectionV2('streaming_product_list'), FromStream('test_stream'), Streaming('event_stream')] class { + $eventStreamingProjection = new #[ProjectionV2('streaming_product_list'), Streaming('event_stream')] class { public array $productList = []; #[EventHandler] diff --git a/packages/Ecotone/tests/Projecting/ProjectingTest.php b/packages/Ecotone/tests/Projecting/ProjectingTest.php index 71f83118b..4070c5fda 100644 --- a/packages/Ecotone/tests/Projecting/ProjectingTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectingTest.php @@ -21,13 +21,21 @@ use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Event; +use Ecotone\Projecting\Attribute; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; -use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorageBuilder; -use Ecotone\Projecting\InMemory\InMemoryStreamSourceBuilder; +use Ecotone\Projecting\NoOpTransaction; +use Ecotone\Projecting\PartitionProvider; +use Ecotone\Projecting\ProjectionInitializationStatus; +use Ecotone\Projecting\ProjectionPartitionState; +use Ecotone\Projecting\ProjectionStateStorage; +use Ecotone\Projecting\StreamFilter; +use Ecotone\Projecting\StreamPage; +use Ecotone\Projecting\StreamSource; +use Ecotone\Projecting\Transaction; use Ecotone\Test\LicenceTesting; use PHPUnit\Framework\Attributes\RequiresPhpExtension; use PHPUnit\Framework\TestCase; @@ -71,7 +79,6 @@ public function handle(array $event): void public function test_partitioned_projection(): void { - // Given a partitioned projection $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned('partitionHeader')] class { public array $handledEvents = []; #[EventHandler('*')] @@ -80,14 +87,44 @@ public function handle(array $event): void $this->handledEvents[] = $event; } }; + + $streamSource = new #[Attribute\StreamSource] class implements StreamSource { + private array $events = []; + private string $partitionHeader = 'id'; + + public function append(Event ...$events): void + { + foreach ($events as $event) { + $this->events[] = $event; + } + } + + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + $from = $lastPosition !== null ? (int) $lastPosition : 0; + if ($partitionKey) { + $events = array_filter($this->events, fn (Event $event) => $event->getMetadata()[$this->partitionHeader] === $partitionKey); + $events = array_values($events); + } else { + $events = $this->events; + } + $events = array_slice($events, $from, $count); + $to = $from + count($events); + return new StreamPage($events, (string) $to); + } + }; + $ecotone = EcotoneLite::bootstrapFlowTesting( - [$projection::class], - [$projection], + [$projection::class, $streamSource::class], + [$projection, $streamSource], configuration: ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackages()) ->withLicenceKey(LicenceTesting::VALID_LICENCE) - ->addExtensionObject($streamSource = new InMemoryStreamSourceBuilder(partitionField: 'id')) - ->addExtensionObject(new InMemoryProjectionStateStorageBuilder()) ); $streamSource->append( @@ -96,16 +133,13 @@ public function handle(array $event): void Event::createWithType('test-event', ['name' => 'Test'], ['id' => '1']), ); - // When event is published, triggering the projection $ecotone->publishEventWithRoutingKey('trigger', metadata: ['partitionHeader' => '1']); - // Then only events from partition 1 are handled $this->assertCount(2, $projection->handledEvents); } public function test_asynchronous_partitioned_projection(): void { - // Given a partitioned async projection $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned('partitionHeader'), Asynchronous('async')] class { public array $handledEvents = []; #[EventHandler('*')] @@ -114,15 +148,45 @@ public function handle(array $event): void $this->handledEvents[] = $event; } }; + + $streamSource = new #[Attribute\StreamSource] class implements StreamSource { + private array $events = []; + private string $partitionHeader = 'id'; + + public function append(Event ...$events): void + { + foreach ($events as $event) { + $this->events[] = $event; + } + } + + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + $from = $lastPosition !== null ? (int) $lastPosition : 0; + if ($partitionKey) { + $events = array_filter($this->events, fn (Event $event) => $event->getMetadata()[$this->partitionHeader] === $partitionKey); + $events = array_values($events); + } else { + $events = $this->events; + } + $events = array_slice($events, $from, $count); + $to = $from + count($events); + return new StreamPage($events, (string) $to); + } + }; + $ecotone = EcotoneLite::bootstrapFlowTesting( - [$projection::class], - [$projection], + [$projection::class, $streamSource::class], + [$projection, $streamSource], configuration: ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) ->withLicenceKey(LicenceTesting::VALID_LICENCE) - ->addExtensionObject($streamSource = new InMemoryStreamSourceBuilder(partitionField: 'id')) ->addExtensionObject(SimpleMessageChannelBuilder::createQueueChannel('async')) - ->addExtensionObject(new InMemoryProjectionStateStorageBuilder()) ); $streamSource->append( @@ -131,10 +195,8 @@ public function handle(array $event): void Event::createWithType('test-event', ['name' => 'Test'], ['id' => '1']), ); - // When event is published, triggering the projection $ecotone->publishEventWithRoutingKey('trigger', metadata: ['partitionHeader' => '1']); - // Then no event is handled until async channel is run $this->assertCount(0, $projection->handledEvents); $ecotone->run('async', ExecutionPollingMetadata::createWithTestingSetup()); $this->assertCount(2, $projection->handledEvents); @@ -482,12 +544,24 @@ public function handle(array $event): void { } }; + + $streamSource = new #[Attribute\StreamSource] class implements StreamSource { + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + return new StreamPage([], '0'); + } + }; + EcotoneLite::bootstrapFlowTesting( - [$projection::class], - [$projection], + [$projection::class, $streamSource::class], + [$projection, $streamSource], configuration: ServiceConfiguration::createWithDefaults() ->withSkippedModulePackageNames(ModulePackageList::allPackages()) - ->addExtensionObject(new InMemoryStreamSourceBuilder()) ); } @@ -645,4 +719,250 @@ public function handle(array $event): void $pcntlTerminationFlag->disable(); } } + + public function test_userland_partition_provider_is_used_during_backfill(): void + { + $userlandPartitionProvider = new #[Attribute\PartitionProvider] class implements PartitionProvider { + public function canHandle(string $projectionName): bool + { + return $projectionName === 'userland_backfill_projection'; + } + + public function count(StreamFilter $filter): int + { + return 3; + } + + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable + { + $partitions = ['partition-a', 'partition-b', 'partition-c']; + $partitions = array_slice($partitions, $offset, $limit); + yield from $partitions; + } + }; + + $projection = new #[ProjectionV2('userland_backfill_projection'), FromStream('test_stream'), Partitioned('partitionHeader'), Attribute\ProjectionBackfill(backfillPartitionBatchSize: 2, asyncChannelName: 'backfill_async')] class { + public array $processedEvents = []; + #[EventHandler('*')] + public function handle(array $event): void + { + $this->processedEvents[] = $event; + } + }; + + $streamSource = new #[Attribute\StreamSource] class implements StreamSource { + private array $events = []; + private string $partitionHeader = 'partitionId'; + + public function append(Event ...$events): void + { + foreach ($events as $event) { + $this->events[] = $event; + } + } + + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + $from = $lastPosition !== null ? (int) $lastPosition : 0; + if ($partitionKey) { + $events = array_filter($this->events, fn (Event $event) => $event->getMetadata()[$this->partitionHeader] === $partitionKey); + $events = array_values($events); + } else { + $events = $this->events; + } + $events = array_slice($events, $from, $count); + $to = $from + count($events); + return new StreamPage($events, (string) $to); + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class, $userlandPartitionProvider::class, $streamSource::class], + [$projection, $userlandPartitionProvider, $streamSource], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->addExtensionObject(SimpleMessageChannelBuilder::createQueueChannel('backfill_async')), + addInMemoryStateStoredRepository: false, + testConfiguration: \Ecotone\Lite\Test\TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_async') + ); + + $streamSource->append( + Event::createWithType('test-event', ['name' => 'Test'], ['partitionId' => 'partition-a']), + Event::createWithType('test-event', ['name' => 'Test'], ['partitionId' => 'partition-b']), + Event::createWithType('test-event', ['name' => 'Test'], ['partitionId' => 'partition-c']), + ); + + $ecotone->initializeProjection('userland_backfill_projection'); + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'userland_backfill_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_async'); + self::assertCount(2, $messages, 'Expected 2 batches for 3 partitions with batch size 2'); + } + + public function test_non_handled_projection_falls_back_to_single_partition_during_backfill(): void + { + $userlandPartitionProvider = new #[Attribute\PartitionProvider] class implements PartitionProvider { + public function canHandle(string $projectionName): bool + { + return $projectionName === 'only_this_projection'; + } + + public function count(StreamFilter $filter): int + { + return 10; + } + + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable + { + yield from range(1, 10); + } + }; + + $projection = new #[ProjectionV2('different_projection'), FromStream('test_stream'), Attribute\ProjectionBackfill(asyncChannelName: 'backfill_async')] class { + public array $processedEvents = []; + #[EventHandler('*')] + public function handle(array $event): void + { + $this->processedEvents[] = $event; + } + }; + + $streamSource = new #[Attribute\StreamSource] class implements StreamSource { + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + return new StreamPage([], '0'); + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class, $userlandPartitionProvider::class, $streamSource::class], + [$projection, $userlandPartitionProvider, $streamSource], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->addExtensionObject(SimpleMessageChannelBuilder::createQueueChannel('backfill_async')), + addInMemoryStateStoredRepository: false, + testConfiguration: \Ecotone\Lite\Test\TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_async') + ); + + $ecotone->initializeProjection('different_projection'); + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'different_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_async'); + self::assertCount(1, $messages, 'SinglePartitionProvider should produce exactly 1 batch'); + } + + public function test_userland_state_storage_is_prioritized_over_built_in(): void + { + $userlandStorage = new #[Attribute\StateStorage] class implements ProjectionStateStorage { + public bool $wasUsed = false; + private array $projectionStates = []; + + public function canHandle(string $projectionName): bool + { + return $projectionName === 'userland_storage_projection'; + } + + public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState + { + $this->wasUsed = true; + $key = $projectionName . ($partitionKey ? '-' . $partitionKey : ''); + return $this->projectionStates[$key] ?? null; + } + + public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState + { + $this->wasUsed = true; + $key = $projectionName . ($partitionKey ? '-' . $partitionKey : ''); + if (! isset($this->projectionStates[$key])) { + $this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED); + return $this->projectionStates[$key]; + } + return null; + } + + public function savePartition(ProjectionPartitionState $projectionState): void + { + $this->wasUsed = true; + $key = $projectionState->projectionName . ($projectionState->partitionKey ? '-' . $projectionState->partitionKey : ''); + $this->projectionStates[$key] = $projectionState; + } + + public function delete(string $projectionName): void + { + $this->wasUsed = true; + } + + public function init(string $projectionName): void + { + $this->wasUsed = true; + } + + public function beginTransaction(): Transaction + { + return new NoOpTransaction(); + } + }; + + $projection = new #[ProjectionV2('userland_storage_projection'), FromStream('test_stream')] class { + public array $processedEvents = []; + #[EventHandler('*')] + public function handle(array $event): void + { + $this->processedEvents[] = $event; + } + }; + + $streamSource = new #[Attribute\StreamSource] class implements StreamSource { + private array $events = []; + + public function append(Event ...$events): void + { + foreach ($events as $event) { + $this->events[] = $event; + } + } + + public function canHandle(string $projectionName): bool + { + return true; + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + { + $from = $lastPosition !== null ? (int) $lastPosition : 0; + $events = array_slice($this->events, $from, $count); + $to = $from + count($events); + return new StreamPage($events, (string) $to); + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class, $userlandStorage::class, $streamSource::class], + [$projection, $userlandStorage, $streamSource], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackages()) + ->withLicenceKey(LicenceTesting::VALID_LICENCE), + addInMemoryStateStoredRepository: false, + ); + + $streamSource->append( + Event::createWithType('test-event', ['name' => 'Test']), + ); + + $ecotone->initializeProjection('userland_storage_projection'); + + self::assertTrue($userlandStorage->wasUsed, 'Userland state storage should be prioritized and used'); + } } diff --git a/packages/Ecotone/tests/Projecting/ProjectionApiTest.php b/packages/Ecotone/tests/Projecting/ProjectionApiTest.php index d14292a25..9fb2e3ecb 100644 --- a/packages/Ecotone/tests/Projecting/ProjectionApiTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectionApiTest.php @@ -5,8 +5,13 @@ namespace Test\Ecotone\Projecting; use Ecotone\Lite\EcotoneLite; +use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; use Ecotone\Messaging\Config\ConfigurationException; +use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; +use Ecotone\Messaging\Consumer\ConsumerPositionTracker; +use Ecotone\Messaging\Consumer\InMemory\InMemoryConsumerPositionTracker; +use Ecotone\Messaging\Conversion\MediaType; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\Polling; @@ -80,4 +85,52 @@ public function handle(array $event): void ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); } + + public function test_streaming_projection_does_not_require_from_stream_attribute(): void + { + $positionTracker = new InMemoryConsumerPositionTracker(); + + $projection = new #[ProjectionV2('streaming_projection'), Streaming('streaming_channel')] class { + public array $events = []; + + #[EventHandler('*')] + public function handle(array $event): void + { + $this->events[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection, ConsumerPositionTracker::class => $positionTracker], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createStreamingChannel('streaming_channel', conversionMediaType: MediaType::createApplicationXPHP()), + ]) + ); + + $this->assertNotNull($ecotone); + } + + public function test_non_streaming_projection_requires_from_stream_attribute(): void + { + $projection = new #[ProjectionV2('non_streaming_projection')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage("Projection 'non_streaming_projection' must have at least one #[FromStream] or #[FromAggregateStream] attribute"); + + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ); + } } diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 4783f43a3..447bbea14 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -14,38 +14,47 @@ use Ecotone\Dbal\Database\DbalTableManagerReference; use Ecotone\EventSourcing\Database\ProjectionStateTableManager; use Ecotone\EventSourcing\EventSourcingConfiguration; -use Ecotone\EventSourcing\Projecting\AggregateIdPartitionProviderBuilder; -use Ecotone\EventSourcing\Projecting\PartitionState\DbalProjectionStateStorageBuilder; -use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreAggregateStreamSourceBuilder; -use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSourceBuilder; -use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreMultiStreamSourceBuilder; +use Ecotone\EventSourcing\PdoStreamTableNameProvider; +use Ecotone\EventSourcing\Projecting\AggregateIdPartitionProvider; +use Ecotone\EventSourcing\Projecting\PartitionState\DbalProjectionStateStorage; +use Ecotone\Projecting\ProjectionStateStorageReference; +use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreAggregateStreamSource; +use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreGlobalStreamSource; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotationModule; use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver; use Ecotone\Messaging\Config\Configuration; use Ecotone\Messaging\Config\ConfigurationException; use Ecotone\Messaging\Config\Container\Definition; +use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; +use Ecotone\Messaging\Scheduling\Duration; +use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionV2; -use Ecotone\Projecting\Config\ProjectionComponentBuilder; use Ecotone\Projecting\Config\StreamFilterRegistryModule; use Ecotone\Projecting\EventStoreAdapter\EventStreamingChannelAdapter; +use Ecotone\Projecting\PartitionProviderReference; use Ecotone\Projecting\StreamFilter; +use Ecotone\Projecting\StreamFilterRegistry; +use Ecotone\Projecting\StreamSourceReference; +use Enqueue\Dbal\DbalConnectionFactory; #[ModuleAnnotation] class ProophProjectingModule implements AnnotationModule { /** - * @param ProjectionComponentBuilder[] $extensions * @param string[] $projectionNames + * @param string[] $partitionedProjectionNames + * @param string[] $globalStreamProjectionNames */ public function __construct( - private array $extensions, private array $projectionNames, + private array $partitionedProjectionNames, + private array $globalStreamProjectionNames, ) { } @@ -53,7 +62,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I { $allStreamFilters = StreamFilterRegistryModule::collectStreamFilters($annotationRegistrationService, $interfaceToCallRegistry); - $extensions = self::resolveConfigs($annotationRegistrationService, $allStreamFilters); + [$partitionedProjectionNames, $globalStreamProjectionNames] = self::resolveProjectionTypes($annotationRegistrationService, $allStreamFilters); $projectionNames = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { @@ -62,21 +71,22 @@ public static function create(AnnotationFinder $annotationRegistrationService, I } return new self( - $extensions, $projectionNames, + $partitionedProjectionNames, + $globalStreamProjectionNames, ); } /** * @param array $allStreamFilters - * @return list + * @return array{list, list} */ - private static function resolveConfigs( + private static function resolveProjectionTypes( AnnotationFinder $annotationRegistrationService, array $allStreamFilters, ): array { - $extensions = []; - $partitionProviders = []; + $partitionedProjectionNames = []; + $globalStreamProjectionNames = []; foreach ($allStreamFilters as $projectionName => $streamFilters) { $projectionClass = null; @@ -101,85 +111,152 @@ private static function resolveConfigs( ); } - $sources = []; foreach ($streamFilters as $streamFilter) { if ($isPartitioned && ! $streamFilter->aggregateType) { throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); } if ($isPartitioned) { - $sourceIdentifier = $streamFilter->streamName . '.' . $streamFilter->aggregateType; - $sources[$sourceIdentifier] = new EventStoreAggregateStreamSourceBuilder( - $projectionName, - $streamFilter, - ); - if (! isset($partitionProviders[$projectionName])) { - $partitionProviders[$projectionName] = new AggregateIdPartitionProviderBuilder($projectionName); + if (!\in_array($projectionName, $partitionedProjectionNames, true)) { + $partitionedProjectionNames[] = $projectionName; } } else { - $sources[$streamFilter->streamName] = new EventStoreGlobalStreamSourceBuilder( - $streamFilter, - [$projectionName] - ); + if (!\in_array($projectionName, $globalStreamProjectionNames, true)) { + $globalStreamProjectionNames[] = $projectionName; + } } } - if (count($sources) > 1) { - $extensions[] = new EventStoreMultiStreamSourceBuilder( - $sources, - [$projectionName], - ); - } else { - $extensions[] = current($sources); - } - $extensions = [...$extensions, ...array_values($partitionProviders)]; } - return $extensions; + return [$partitionedProjectionNames, $globalStreamProjectionNames]; } public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { $dbalConfiguration = ExtensionObjectResolver::resolveUnique(DbalConfiguration::class, $extensionObjects, DbalConfiguration::createWithDefaults()); + $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $extensionObjects, EventSourcingConfiguration::createWithDefaults()); + foreach ($extensionObjects as $extensionObject) { + if ($extensionObject instanceof EventStreamingChannelAdapter) { + $this->globalStreamProjectionNames[] = $extensionObject->getProjectionName(); + } + } + + $hasProjections = $this->projectionNames !== [] || $this->globalStreamProjectionNames !== []; $messagingConfiguration->registerServiceDefinition( ProjectionStateTableManager::class, new Definition(ProjectionStateTableManager::class, [ ProjectionStateTableManager::DEFAULT_TABLE_NAME, - $this->projectionNames !== [], + $hasProjections, $dbalConfiguration->isAutomaticTableInitializationEnabled(), ]) ); + + if ($this->partitionedProjectionNames !== [] && ! $eventSourcingConfiguration->isInMemory()) { + $messagingConfiguration->registerServiceDefinition( + AggregateIdPartitionProvider::class, + new Definition(AggregateIdPartitionProvider::class, [ + new Reference(DbalConnectionFactory::class), + new Reference(PdoStreamTableNameProvider::class), + $this->partitionedProjectionNames, + ]) + ); + } + + if (! $eventSourcingConfiguration->isInMemory()) { + $this->registerGlobalStreamSource($messagingConfiguration); + $this->registerAggregateStreamSource($messagingConfiguration); + $this->registerDbalProjectionStateStorage($messagingConfiguration); + } } - public function canHandle($extensionObject): bool + private function registerDbalProjectionStateStorage(Configuration $messagingConfiguration): void { - return $extensionObject instanceof DbalConfiguration; + $hasProjections = $this->projectionNames !== [] || $this->globalStreamProjectionNames !== []; + if (! $hasProjections) { + return; + } + + $messagingConfiguration->registerServiceDefinition( + DbalProjectionStateStorage::class, + new Definition(DbalProjectionStateStorage::class, [ + new Reference(DbalConnectionFactory::class), + new Reference(ProjectionStateTableManager::class), + ]) + ); } - public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array + private function registerGlobalStreamSource(Configuration $messagingConfiguration): void { - $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $serviceExtensions, EventSourcingConfiguration::createWithDefaults()); - $extensions = $eventSourcingConfiguration->isInMemory() ? [] : [...$this->extensions]; + if ($this->globalStreamProjectionNames === []) { + return; + } - foreach ($serviceExtensions as $extensionObject) { - if (! ($extensionObject instanceof EventStreamingChannelAdapter)) { - continue; - } + $messagingConfiguration->registerServiceDefinition( + EventStoreGlobalStreamSource::class, + new Definition(EventStoreGlobalStreamSource::class, [ + new Reference(DbalConnectionFactory::class), + new Reference(EcotoneClockInterface::class), + new Reference(PdoStreamTableNameProvider::class), + new Reference(StreamFilterRegistry::class), + $this->globalStreamProjectionNames, + 5_000, + new Definition(Duration::class, [60 * 1_000_000]), + ]) + ); + } - $projectionName = $extensionObject->getProjectionName(); - $extensions[] = new EventStoreGlobalStreamSourceBuilder( - new StreamFilter($extensionObject->fromStream), - [$projectionName] - ); + private function registerAggregateStreamSource(Configuration $messagingConfiguration): void + { + if ($this->partitionedProjectionNames === []) { + return; } + $messagingConfiguration->registerServiceDefinition( + EventStoreAggregateStreamSource::class, + new Definition(EventStoreAggregateStreamSource::class, [ + new Reference('Ecotone\EventSourcing\EventStore'), + new Reference(StreamFilterRegistry::class), + $this->partitionedProjectionNames, + ]) + ); + } + + public function canHandle($extensionObject): bool + { + return $extensionObject instanceof DbalConfiguration + || $extensionObject instanceof EventSourcingConfiguration + || $extensionObject instanceof EventStreamingChannelAdapter; + } + + public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array + { + $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $serviceExtensions, EventSourcingConfiguration::createWithDefaults()); + $extensions = []; + $extensions[] = new DbalTableManagerReference(ProjectionStateTableManager::class); $eventStreamingChannelAdapters = ExtensionObjectResolver::resolve(EventStreamingChannelAdapter::class, $serviceExtensions); if (($this->projectionNames || $eventStreamingChannelAdapters) && ! $eventSourcingConfiguration->isInMemory()) { - $projectionNames = array_unique([...$this->projectionNames, ...array_map(fn (EventStreamingChannelAdapter $adapter) => $adapter->getProjectionName(), $eventStreamingChannelAdapters)]); + $extensions[] = new ProjectionStateStorageReference(DbalProjectionStateStorage::class); + } + + if ($this->partitionedProjectionNames !== [] && ! $eventSourcingConfiguration->isInMemory()) { + $extensions[] = new PartitionProviderReference(AggregateIdPartitionProvider::class, $this->partitionedProjectionNames); + } - $extensions[] = new DbalProjectionStateStorageBuilder($projectionNames); + if (! $eventSourcingConfiguration->isInMemory()) { + $globalStreamProjectionNames = array_unique([ + ...$this->globalStreamProjectionNames, + ...array_map(fn (EventStreamingChannelAdapter $adapter) => $adapter->getProjectionName(), $eventStreamingChannelAdapters), + ]); + + if ($globalStreamProjectionNames !== []) { + $extensions[] = new StreamSourceReference(EventStoreGlobalStreamSource::class); + } + if ($this->partitionedProjectionNames !== []) { + $extensions[] = new StreamSourceReference(EventStoreAggregateStreamSource::class); + } } return $extensions; diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php index af4047800..8772c9f2c 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php @@ -20,12 +20,21 @@ class AggregateIdPartitionProvider implements PartitionProvider { + /** + * @param array $partitionedProjections List of projection names this provider handles + */ public function __construct( private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory, - private PdoStreamTableNameProvider $tableNameProvider + private PdoStreamTableNameProvider $tableNameProvider, + private array $partitionedProjections = [], ) { } + public function canHandle(string $projectionName): bool + { + return \in_array($projectionName, $this->partitionedProjections, true); + } + public function count(StreamFilter $filter): int { $connection = $this->getConnection(); diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php deleted file mode 100644 index 42285b468..000000000 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php +++ /dev/null @@ -1,36 +0,0 @@ -handledProjectionName === $projectionName; - } - - public function compile(MessagingContainerBuilder $builder): Definition|Reference - { - return new Definition(AggregateIdPartitionProvider::class, [ - Reference::to(DbalConnectionFactory::class), - Reference::to(PdoStreamTableNameProvider::class), - ]); - } -} diff --git a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php index ff8c65c2e..b5352d457 100644 --- a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php +++ b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php @@ -29,12 +29,21 @@ class DbalProjectionStateStorage implements ProjectionStateStorage /** @var array Track initialization per connection */ private array $initialized = []; + /** + * @param string[]|null $projectionNames + */ public function __construct( private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, private ProjectionStateTableManager $tableManager, + private ?array $projectionNames = null, ) { } + public function canHandle(string $projectionName): bool + { + return $this->projectionNames === null || \in_array($projectionName, $this->projectionNames, true); + } + public function getTableName(): string { return $this->tableManager->getTableName(); diff --git a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorageBuilder.php b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorageBuilder.php deleted file mode 100644 index fe7708f5a..000000000 --- a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorageBuilder.php +++ /dev/null @@ -1,42 +0,0 @@ -handledProjectionNames, true); - } -} diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php index 336de6319..8873cd3fc 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php @@ -13,33 +13,48 @@ use Ecotone\EventSourcing\EventStore\Operator; use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\Support\Assert; -use Ecotone\Projecting\StreamFilter; +use Ecotone\Projecting\StreamFilterRegistry; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; use RuntimeException; +use function count; + class EventStoreAggregateStreamSource implements StreamSource { + /** + * @param string[] $handledProjectionNames + */ public function __construct( private EventStore $eventStore, - private StreamFilter $streamFilter, + private StreamFilterRegistry $streamFilterRegistry, + private array $handledProjectionNames, ) { } - public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function canHandle(string $projectionName): bool + { + return \in_array($projectionName, $this->handledProjectionNames, true); + } + + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage { Assert::notNull($partitionKey, 'Partition key cannot be null for aggregate stream source'); - if (! $this->eventStore->hasStream($this->streamFilter->streamName)) { + $streamFilters = $this->streamFilterRegistry->provide($projectionName); + Assert::isTrue(count($streamFilters) > 0, "No stream filter found for projection: {$projectionName}"); + $streamFilter = $streamFilters[0]; + + if (! $this->eventStore->hasStream($streamFilter->streamName)) { return new StreamPage([], $lastPosition ?? ''); } $metadataMatcher = new MetadataMatcher(); - if ($this->streamFilter->aggregateType !== null) { + if ($streamFilter->aggregateType !== null) { $metadataMatcher = $metadataMatcher->withMetadataMatch( MessageHeaders::EVENT_AGGREGATE_TYPE, Operator::EQUALS, - $this->streamFilter->aggregateType + $streamFilter->aggregateType ); } $metadataMatcher = $metadataMatcher->withMetadataMatch( @@ -53,17 +68,17 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = (int)$lastPosition + 1 ); - if ($this->streamFilter->eventNames !== []) { + if ($streamFilter->eventNames !== []) { $metadataMatcher = $metadataMatcher->withMetadataMatch( 'event_name', Operator::IN, - $this->streamFilter->eventNames, + $streamFilter->eventNames, FieldType::MESSAGE_PROPERTY ); } $events = $this->eventStore->load( - $this->streamFilter->streamName, + $streamFilter->streamName, 1, $count, $metadataMatcher, @@ -72,6 +87,9 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = return new StreamPage($events, $this->createPositionFrom($lastPosition, $events)); } + /** + * @param array $events + */ private function createPositionFrom(?string $lastPosition, array $events): string { $lastEvent = end($events); diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php deleted file mode 100644 index e24b29193..000000000 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php +++ /dev/null @@ -1,45 +0,0 @@ -handledProjectionName === $projectionName; - } - - public function compile(MessagingContainerBuilder $builder): Definition|Reference - { - return new Definition( - EventStoreAggregateStreamSource::class, - [ - new Reference($this->streamFilter->eventStoreReferenceName), - new Definition(StreamFilter::class, [ - $this->streamFilter->streamName, - $this->streamFilter->aggregateType, - $this->streamFilter->eventStoreReferenceName, - $this->streamFilter->eventNames, - ]), - ], - ); - } -} diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index f80bf1d2a..97d5140c7 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -20,7 +20,7 @@ use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Messaging\Support\Assert; -use Ecotone\Projecting\StreamFilter; +use Ecotone\Projecting\StreamFilterRegistry; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; use Enqueue\Dbal\DbalConnectionFactory; @@ -31,16 +31,25 @@ class EventStoreGlobalStreamSource implements StreamSource { + /** + * @param string[] $handledProjectionNames + */ public function __construct( private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, private EcotoneClockInterface $clock, - private StreamFilter $streamFilter, private PdoStreamTableNameProvider $tableNameProvider, + private StreamFilterRegistry $streamFilterRegistry, + private array $handledProjectionNames, private int $maxGapOffset = 5_000, private ?Duration $gapTimeout = null, ) { } + public function canHandle(string $projectionName): bool + { + return \in_array($projectionName, $this->handledProjectionNames, true); + } + private function getConnection(): Connection { if ($this->connectionFactory instanceof MultiTenantConnectionFactory) { @@ -50,13 +59,24 @@ private function getConnection(): Connection return $this->connectionFactory->createContext()->getDbalConnection(); } - public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage { Assert::null($partitionKey, 'Partition key is not supported for EventStoreGlobalStreamSource'); - $connection = $this->getConnection(); + $streamFilters = $this->streamFilterRegistry->provide($projectionName); + Assert::isTrue(count($streamFilters) > 0, "No stream filter found for projection: {$projectionName}"); - $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamFilter->streamName); + if (count($streamFilters) === 1) { + return $this->loadFromSingleStream($streamFilters[0], $lastPosition, $count); + } + + return $this->loadFromMultipleStreams($streamFilters, $lastPosition, $count); + } + + private function loadFromSingleStream(\Ecotone\Projecting\StreamFilter $streamFilter, ?string $lastPosition, int $count): StreamPage + { + $connection = $this->getConnection(); + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($streamFilter->streamName); if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $proophStreamTable)) { return new StreamPage([], ''); @@ -97,12 +117,85 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $tracking->cleanByMaxOffset($this->maxGapOffset); - $this->cleanGapsByTimeout($tracking, $connection); + $this->cleanGapsByTimeout($tracking, $connection, $proophStreamTable); return new StreamPage($events, (string) $tracking); } - private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $connection): void + /** + * @param \Ecotone\Projecting\StreamFilter[] $streamFilters + */ + private function loadFromMultipleStreams(array $streamFilters, ?string $lastPosition, int $count): StreamPage + { + $positions = $this->decodeMultiStreamPositions($lastPosition); + + $orderIndex = []; + $i = 0; + $newPositions = []; + $all = []; + + foreach ($streamFilters as $streamFilter) { + $streamName = $streamFilter->streamName; + $orderIndex[$streamName] = $i++; + + $streamPosition = $positions[$streamName] ?? null; + $limit = (int) ceil($count / max(1, count($streamFilters))) + 5; + + $streamPage = $this->loadFromSingleStream($streamFilter, $streamPosition, $limit); + $newPositions[$streamName] = $streamPage->lastPosition; + + foreach ($streamPage->events as $event) { + $all[] = [$streamName, $event]; + } + } + + usort($all, function (array $aTuple, array $bTuple) use ($orderIndex): int { + [$aStream, $a] = $aTuple; + [$bStream, $b] = $bTuple; + if ($aStream === $bStream) { + return $a->no <=> $b->no; + } + if ($a->timestamp === $b->timestamp) { + return $orderIndex[$aStream] <=> $orderIndex[$bStream]; + } + return $a->timestamp <=> $b->timestamp; + }); + + $events = array_map(fn (array $tuple) => $tuple[1], $all); + + return new StreamPage($events, $this->encodeMultiStreamPositions($newPositions)); + } + + private function encodeMultiStreamPositions(array $positions): string + { + $encoded = ''; + foreach ($positions as $stream => $pos) { + $encoded .= "{$stream}={$pos};"; + } + return $encoded; + } + + /** + * @return array + */ + private function decodeMultiStreamPositions(?string $position): array + { + $result = []; + if ($position === null || $position === '') { + return $result; + } + $pairs = explode(';', $position); + foreach ($pairs as $pair) { + if ($pair === '') { + continue; + } + [$stream, $pos] = explode('=', $pair, 2); + $result[$stream] = $pos; + } + return $result; + } + + private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $connection, string $proophStreamTable): void { if ($this->gapTimeout === null) { return; @@ -115,8 +208,6 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn $minGap = $gaps[0]; $maxGap = $gaps[count($gaps) - 1]; - $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamFilter->streamName); - $interleavedEvents = $connection->executeQuery(<<handledProjectionNames, true); - } - - public function compile(MessagingContainerBuilder $builder): Definition|Reference - { - return new Definition( - EventStoreGlobalStreamSource::class, - [ - Reference::to(DbalConnectionFactory::class), - Reference::to(EcotoneClockInterface::class), - new Definition(StreamFilter::class, [ - $this->streamFilter->streamName, - $this->streamFilter->aggregateType, - $this->streamFilter->eventStoreReferenceName, - $this->streamFilter->eventNames, - ]), - Reference::to(PdoStreamTableNameProvider::class), - 5_000, - new Definition(Duration::class, [60], 'seconds'), - ], - ); - } -} diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php deleted file mode 100644 index b43ca6af1..000000000 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSource.php +++ /dev/null @@ -1,93 +0,0 @@ - $sources map of logical stream name => stream source - */ - public function __construct( - private array $sources, - ) { - } - - public function load(?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage - { - $positions = $this->decodePositions($lastPosition); - - $orderIndex = []; - $i = 0; - $newPositions = []; - $all = []; - foreach ($this->sources as $stream => $source) { - $orderIndex[$stream] = $i++; - - $limit = (int)ceil($count / max(1, count($this->sources))) + 5; - - $page = $source->load($positions[$stream] ?? null, $limit, $partitionKey); - - $newPositions[$stream] = $page->lastPosition; - foreach ($page->events as $event) { - $all[] = [$stream, $event]; - } - } - - usort($all, function (array $aTuple, array $bTuple) use ($orderIndex): int { - [$aStream, $a] = $aTuple; - [$bStream, $b] = $bTuple; - if ($aStream === $bStream) { - return $a->no <=> $b->no; - } - if ($a->timestamp === $b->timestamp) { - return $orderIndex[$aStream] <=> $orderIndex[$bStream]; - } - return $a->timestamp <=> $b->timestamp; - }); - - $events = array_map(fn (array $tuple) => $tuple[1], $all); - - return new StreamPage($events, $this->encodePositions($newPositions)); - } - - /** - * Encodes map as: stream=position:g1,g2;stream2=position:...; - */ - private function encodePositions(array $positions): string - { - $encoded = ''; - foreach ($positions as $stream => $pos) { - $encoded .= "$stream=$pos;"; - } - return $encoded; - } - - /** - * Decodes the map encoded by encodePositions. - * Returns array key is stream name, value is position (opaque string) - */ - private function decodePositions(?string $position): array - { - $result = []; - if ($position === null || $position === '') { - return $result; - } - $pairs = explode(';', $position); - foreach ($pairs as $pair) { - if ($pair === '') { - continue; - } - [$stream, $pos] = explode('=', $pair, 2); - $result[$stream] = $pos; - } - return $result; - } -} diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php deleted file mode 100644 index 2268f6850..000000000 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreMultiStreamSourceBuilder.php +++ /dev/null @@ -1,46 +0,0 @@ - $streamToSourceBuilder - * @param string[] $handledProjectionNames - */ - public function __construct( - private array $streamToSourceBuilder, - private array $handledProjectionNames, - ) { - } - - public function canHandle(string $projectionName, string $component): bool - { - return $component === StreamSource::class && in_array($projectionName, $this->handledProjectionNames, true); - } - - public function compile(MessagingContainerBuilder $builder): Definition|Reference - { - $sourcesDefinitions = array_map(function ($sourceBuilder) use ($builder) { - return $sourceBuilder->compile($builder); - }, $this->streamToSourceBuilder); - - return new Definition( - EventStoreMultiStreamSource::class, - [ - $sourcesDefinitions, - ], - ); - } -} diff --git a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php index c24337882..88f14fbe3 100644 --- a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php @@ -22,6 +22,7 @@ use Ecotone\Projecting\ProjectingManager; use Ecotone\Projecting\ProjectionRegistry; use Ecotone\Projecting\StreamFilter; +use Ecotone\Projecting\StreamFilterRegistry; use Ecotone\Test\LicenceTesting; use Enqueue\Dbal\DbalConnectionFactory; use Psr\Clock\ClockInterface; @@ -110,12 +111,18 @@ public function test_gaps_are_added_to_position(): void public function test_max_gap_offset_cleaning(): void { + $projectionName = 'test_projection'; + $streamFilterRegistry = new StreamFilterRegistry([ + $projectionName => [new StreamFilter(Ticket::STREAM_NAME)], + ]); + // Create a stream source with small max gap offset $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, + $streamFilterRegistry, + [$projectionName], maxGapOffset: 3, // Only keep gaps within 3 positions gapTimeout: null ); @@ -124,7 +131,7 @@ public function test_max_gap_offset_cleaning(): void $tracking = new GapAwarePosition(10, [2, 5, 7, 9]); // Execute - $result = $streamSource->load((string) $tracking, 100); + $result = $streamSource->load($projectionName, (string) $tracking, 100); // Verify: Only gaps within 3 positions should remain (7, 9) $newTracking = GapAwarePosition::fromString($result->lastPosition); @@ -133,6 +140,11 @@ public function test_max_gap_offset_cleaning(): void public function test_gap_timeout_cleaning(): void { + $projectionName = 'test_projection'; + $streamFilterRegistry = new StreamFilterRegistry([ + $projectionName => [new StreamFilter(Ticket::STREAM_NAME)], + ]); + // Create events with specific timestamps $now = self::$clock->now()->getTimestamp(); @@ -152,13 +164,14 @@ public function test_gap_timeout_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, + $streamFilterRegistry, + [$projectionName], gapTimeout: Duration::seconds(5) ); // Execute - $result = $streamSource->load(null, 100); + $result = $streamSource->load($projectionName, null, 100); // All gaps should be present initially $tracking = GapAwarePosition::fromString($result->lastPosition); @@ -168,7 +181,7 @@ public function test_gap_timeout_cleaning(): void self::$clock->sleep(Duration::seconds(2)); // Execute - $result = $streamSource->load(null, 100); + $result = $streamSource->load($projectionName, null, 100); // Verify: Gaps 2, 4 should be removed (old timestamps), gap 6 should remain (recent timestamps) $newTracking = GapAwarePosition::fromString($result->lastPosition); @@ -176,18 +189,24 @@ public function test_gap_timeout_cleaning(): void // Delay 4 more second to exceed timeout for all gaps (6 seconds since insertion of the last event) self::$clock->sleep(Duration::seconds(4)); - $result = $streamSource->load($result->lastPosition, 100); + $result = $streamSource->load($projectionName, $result->lastPosition, 100); $newTracking = GapAwarePosition::fromString($result->lastPosition); self::assertSame([], $newTracking->getGaps()); } public function test_gap_cleaning_noop_when_no_gaps(): void { + $projectionName = 'test_projection'; + $streamFilterRegistry = new StreamFilterRegistry([ + $projectionName => [new StreamFilter(Ticket::STREAM_NAME)], + ]); + $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, + $streamFilterRegistry, + [$projectionName], maxGapOffset: 1000, gapTimeout: Duration::seconds(5) ); @@ -196,7 +215,7 @@ public function test_gap_cleaning_noop_when_no_gaps(): void $tracking = new GapAwarePosition(10, []); // Execute - $result = $streamSource->load((string) $tracking, 100); + $result = $streamSource->load($projectionName, (string) $tracking, 100); // Verify: No gaps should remain $newTracking = GapAwarePosition::fromString($result->lastPosition); @@ -205,11 +224,17 @@ public function test_gap_cleaning_noop_when_no_gaps(): void public function test_gap_cleaning_noop_when_timeout_disabled(): void { + $projectionName = 'test_projection'; + $streamFilterRegistry = new StreamFilterRegistry([ + $projectionName => [new StreamFilter(Ticket::STREAM_NAME)], + ]); + $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, + $streamFilterRegistry, + [$projectionName], maxGapOffset: 1000, gapTimeout: null // No timeout ); @@ -218,7 +243,7 @@ public function test_gap_cleaning_noop_when_timeout_disabled(): void $tracking = new GapAwarePosition(10, [2, 5, 7]); // Execute - $result = $streamSource->load((string) $tracking, 100); + $result = $streamSource->load($projectionName, (string) $tracking, 100); // Verify: All gaps should remain (no timeout cleaning) $newTracking = GapAwarePosition::fromString($result->lastPosition); diff --git a/packages/PdoEventSourcing/tests/Projecting/Partitioned/SynchronousEventDrivenProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/Partitioned/SynchronousEventDrivenProjectionTest.php index d45123db2..5dde37d2e 100644 --- a/packages/PdoEventSourcing/tests/Projecting/Partitioned/SynchronousEventDrivenProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/Partitioned/SynchronousEventDrivenProjectionTest.php @@ -12,13 +12,18 @@ use Ecotone\EventSourcing\Attribute\ProjectionReset; use Ecotone\Lite\EcotoneLite; use Ecotone\Lite\Test\FlowTestSupport; +use Ecotone\Lite\Test\TestConfiguration; +use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Attribute\QueryHandler; +use Ecotone\Projecting\Attribute; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionV2; +use Ecotone\Projecting\PartitionProvider; +use Ecotone\Projecting\StreamFilter; use Ecotone\Test\LicenceTesting; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\CloseTicket; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\RegisterTicket; @@ -318,6 +323,46 @@ public function reset(): void }; } + public function test_userland_partition_provider_is_prioritized_over_builtin_during_backfill(): void + { + $userlandPartitionProvider = new #[Attribute\PartitionProvider] class implements PartitionProvider { + public function canHandle(string $projectionName): bool + { + return $projectionName === 'userland_backfill_projection'; + } + + public function count(StreamFilter $filter): int + { + return 7; + } + + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable + { + $partitions = ['u1', 'u2', 'u3', 'u4', 'u5', 'u6', 'u7']; + $partitions = array_slice($partitions, $offset, $limit); + yield from $partitions; + } + }; + + $projection = new #[ProjectionV2('userland_backfill_projection'), FromAggregateStream(Ticket::class), Partitioned, Attribute\ProjectionBackfill(backfillPartitionBatchSize: 3, asyncChannelName: 'backfill_channel')] class { + #[EventHandler] + public function handle(TicketWasRegistered $event): void + { + } + }; + + $ecotone = $this->bootstrapEcotoneWithAsyncChannel( + [$projection::class, $userlandPartitionProvider::class], + [$projection, $userlandPartitionProvider] + ); + + $ecotone->initializeProjection('userland_backfill_projection'); + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'userland_backfill_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_channel'); + self::assertCount(3, $messages, 'Expected 3 batches for 7 partitions with batch size 3 (3+3+1)'); + } + private function bootstrapEcotone(array $classesToResolve, array $services): FlowTestSupport { return EcotoneLite::bootstrapFlowTestingWithEventStore( @@ -333,4 +378,22 @@ classesToResolve: array_merge($classesToResolve, [Ticket::class, TicketEventConv licenceKey: LicenceTesting::VALID_LICENCE, ); } + + private function bootstrapEcotoneWithAsyncChannel(array $classesToResolve, array $services): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: array_merge($classesToResolve, [Ticket::class, TicketEventConverter::class]), + containerOrAvailableServices: array_merge($services, [new TicketEventConverter(), self::getConnectionFactory()]), + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + enableAsynchronousProcessing: [SimpleMessageChannelBuilder::createQueueChannel('backfill_channel')], + licenceKey: LicenceTesting::VALID_LICENCE, + testConfiguration: TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_channel'), + ); + } }