diff --git a/Monorepo/Benchmark/ProjectingBenchmark.php b/Monorepo/Benchmark/ProjectingBenchmark.php index 840e1a8b0..46137c926 100644 --- a/Monorepo/Benchmark/ProjectingBenchmark.php +++ b/Monorepo/Benchmark/ProjectingBenchmark.php @@ -174,7 +174,7 @@ public function bench_ecotone_projection_backfill(): void Assert::assertEquals([], self::$ecotone->getQueryBus()->sendWithRouting('product.getPriceChange', self::$expectedProductIds[0]) ); - $projectionManager->backfill(); + $projectionManager->prepareBackfill(); Assert::assertEquals([ new PriceChange(100, 0), new PriceChange(120, 20), diff --git a/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php b/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php deleted file mode 100644 index 3b56d4390..000000000 --- a/Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php +++ /dev/null @@ -1,22 +0,0 @@ -getGateway(ProjectionRegistry::class)->has($name)) { - $this->getGateway(ProjectionRegistry::class)->get($name)->backfill(); + $this->getGateway(ProjectionRegistry::class)->get($name)->prepareBackfill(); } else { $this->getGateway(ProjectionManager::class)->triggerProjection($name); } diff --git a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php index 91f219a75..aaa3787d1 100644 --- a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php +++ b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php @@ -68,6 +68,7 @@ use Ecotone\Projecting\Config\ProjectingAttributeModule; use Ecotone\Projecting\Config\ProjectingConsoleCommands; use Ecotone\Projecting\Config\ProjectingModule; +use Ecotone\Projecting\Config\StreamFilterRegistryModule; use Ecotone\Projecting\EventStoreAdapter\EventStoreAdapterModule; use Ecotone\Redis\Configuration\RedisMessageConsumerModule; use Ecotone\Redis\Configuration\RedisMessagePublisherModule; @@ -115,6 +116,7 @@ class ModuleClassList EventSourcedRepositoryModule::class, ProjectingModule::class, ProjectingAttributeModule::class, + StreamFilterRegistryModule::class, EventStoreAdapterModule::class, /** Attribute based configurations */ diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php new file mode 100644 index 000000000..200f1a2ab --- /dev/null +++ b/packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php @@ -0,0 +1,38 @@ +backfillPartitionBatchSize < 1) { + throw new InvalidArgumentException('Backfill partition batch size must be at least 1'); + } + } +} + diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php deleted file mode 100644 index 2f9a2863f..000000000 --- a/packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php +++ /dev/null @@ -1,18 +0,0 @@ -projectionRegistry->get($projectionName); + $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); + + foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { + $projectingManager->execute($partition, true); + if ($this->terminationListener->shouldTerminate()) { + break; + } + } + } +} + diff --git a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php index 67670bcf6..a2224250f 100644 --- a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php @@ -25,7 +25,8 @@ class EcotoneProjectionExecutorBuilder implements ProjectionExecutorBuilder { - private const DEFAULT_BATCH_SIZE = 1_000; + private const DEFAULT_EVENT_LOADING_BATCH_SIZE = 1_000; + private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; /** * @param AnnotatedDefinition[] $projectionEventHandlers @@ -41,7 +42,9 @@ public function __construct( private ?string $flushChannel = null, private array $projectionEventHandlers = [], private ?string $asyncChannelName = null, - private ?int $batchSize = null, + private ?int $eventLoadingBatchSize = null, + private ?int $backfillPartitionBatchSize = null, + private ?string $backfillAsyncChannelName = null, ) { if ($this->partitionHeader && ! $this->automaticInitialization) { throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled"); @@ -93,9 +96,19 @@ public function automaticInitialization(): bool return $this->automaticInitialization; } - public function batchSize(): int + public function eventLoadingBatchSize(): int { - return $this->batchSize ?? self::DEFAULT_BATCH_SIZE; + return $this->eventLoadingBatchSize ?? self::DEFAULT_EVENT_LOADING_BATCH_SIZE; + } + + public function backfillPartitionBatchSize(): int + { + return $this->backfillPartitionBatchSize ?? self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE; + } + + public function backfillAsyncChannelName(): ?string + { + return $this->backfillAsyncChannelName; } public function compile(MessagingContainerBuilder $builder): Definition|Reference diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index cca66d24e..2c8ce7e9f 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -34,7 +34,8 @@ use Ecotone\Modelling\Attribute\NamedEvent; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\Polling; -use Ecotone\Projecting\Attribute\ProjectionBatchSize; +use Ecotone\Projecting\Attribute\ProjectionBackfill; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -77,7 +78,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $eventStreamingProjections = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { $projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class); - $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class); + $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionExecution::class); + $backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class); $pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class); $streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class); $projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class); @@ -88,7 +90,16 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $automaticInitialization = self::resolveAutomaticInitialization($partitionAttribute, $projectionDeployment); $isLive = $projectionDeployment?->live ?? true; - $projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $isLive, $namedEvents, batchSize: $batchSizeAttribute?->batchSize); + $projectionBuilder = new EcotoneProjectionExecutorBuilder( + $projectionAttribute->name, + $partitionHeaderName, + $automaticInitialization, + $isLive, + $namedEvents, + eventLoadingBatchSize: $batchSizeAttribute?->eventLoadingBatchSize, + backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize, + backfillAsyncChannelName: $backfillAttribute?->asyncChannelName, + ); $asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName); $isPolling = $pollingAttribute !== null; diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php index 62d48c85c..a90ad9951 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php @@ -42,7 +42,7 @@ public function backfillProjection(string $name): void if (! $this->registry->has($name)) { throw new InvalidArgumentException("There is no projection with name {$name}"); } - $this->registry->get($name)->backfill(); + $this->registry->get($name)->prepareBackfill(); } #[ConsoleCommand('ecotone:projection:delete')] diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 6569de0d4..2ad37e20e 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -20,18 +20,22 @@ use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener; +use Ecotone\Messaging\Gateway\MessagingEntrypoint; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\HeaderBuilder; +use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\PayloadBuilder; use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\ValueBuilder; use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder; use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder; +use Ecotone\Projecting\BackfillExecutorHandler; use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry; -use Ecotone\Projecting\NullPartitionProvider; +use Ecotone\Projecting\SinglePartitionProvider; use Ecotone\Projecting\PartitionProvider; use Ecotone\Projecting\ProjectingHeaders; use Ecotone\Projecting\ProjectingManager; use Ecotone\Projecting\ProjectionRegistry; use Ecotone\Projecting\ProjectionStateStorage; +use Ecotone\Projecting\StreamFilterRegistry; use Ecotone\Projecting\StreamSource; use Ramsey\Uuid\Uuid; @@ -97,11 +101,15 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $components[$projectionName][ProjectionStateStorage::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have projection state storage configured. Please check your configuration."), new Reference($reference), $components[$projectionName][StreamSource::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have stream source configured. Please check your configuration."), - $components[$projectionName][PartitionProvider::class] ?? new Definition(NullPartitionProvider::class), + $components[$projectionName][PartitionProvider::class] ?? new Definition(SinglePartitionProvider::class), + new Reference(StreamFilterRegistry::class), $projectionName, new Reference(TerminationListener::class), - $projectionBuilder->batchSize(), // batchSize + new Reference(MessagingEntrypoint::class), + $projectionBuilder->eventLoadingBatchSize(), $projectionBuilder->automaticInitialization(), + $projectionBuilder->backfillPartitionBatchSize(), + $projectionBuilder->backfillAsyncChannelName(), ]) ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); @@ -142,6 +150,35 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO new Definition(InMemoryProjectionRegistry::class, [$projectionRegistryMap]) ); + // Register BackfillExecutorHandler and its message handler + $messagingConfiguration->registerServiceDefinition( + BackfillExecutorHandler::class, + new Definition(BackfillExecutorHandler::class, [ + new Reference(ProjectionRegistry::class), + new Reference(TerminationListener::class), + ]) + ); + + $messagingConfiguration->registerMessageHandler( + MessageProcessorActivatorBuilder::create() + ->chainInterceptedProcessor( + MethodInvokerBuilder::create( + BackfillExecutorHandler::class, + InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'), + [ + PayloadBuilder::create('projectionName'), + HeaderBuilder::createOptional('limit', 'backfill.limit'), + HeaderBuilder::createOptional('offset', 'backfill.offset'), + HeaderBuilder::createOptional('streamName', 'backfill.streamName'), + HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'), + HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'), + ], + ) + ) + ->withEndpointId('backfill_executor_handler') + ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL) + ); + // Register console commands $messagingConfiguration->registerServiceDefinition(ProjectingConsoleCommands::class, new Definition(ProjectingConsoleCommands::class, [new Reference(ProjectionRegistry::class)])); } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php index 620c84979..e51f3dda5 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php @@ -15,5 +15,7 @@ public function projectionName(): string; public function asyncChannelName(): ?string; public function partitionHeader(): ?string; public function automaticInitialization(): bool; - public function batchSize(): int; + public function eventLoadingBatchSize(): int; + public function backfillPartitionBatchSize(): int; + public function backfillAsyncChannelName(): ?string; } diff --git a/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php new file mode 100644 index 000000000..a58732ca3 --- /dev/null +++ b/packages/Ecotone/src/Projecting/Config/StreamFilterRegistryModule.php @@ -0,0 +1,209 @@ + $streamFilters */ + public function __construct(private array $streamFilters) + { + } + + public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static + { + return new self(self::collectStreamFilters($annotationRegistrationService, $interfaceToCallRegistry)); + } + + /** + * @return array Map of projection name to stream filters + */ + public static function collectStreamFilters(AnnotationFinder $annotationFinder, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array + { + $projectionEventNames = $interfaceToCallRegistry !== null + ? self::collectProjectionEventNames($annotationFinder, $interfaceToCallRegistry) + : []; + + $streamFilters = []; + + foreach ($annotationFinder->findAnnotatedClasses(ProjectionV2::class) as $classname) { + $projectionAttribute = $annotationFinder->getAttributeForClass($classname, ProjectionV2::class); + $projectionName = $projectionAttribute->name; + $eventNames = $projectionEventNames[$projectionName] ?? []; + + foreach ($annotationFinder->getAnnotationsForClass($classname, FromStream::class) as $streamAttribute) { + $streamFilters[$projectionName][] = new StreamFilter( + $streamAttribute->stream, + $streamAttribute->aggregateType, + $streamAttribute->eventStoreReferenceName, + $eventNames, + ); + } + + foreach ($annotationFinder->getAnnotationsForClass($classname, FromAggregateStream::class) as $aggregateStreamAttribute) { + $streamFilters[$projectionName][] = self::resolveFromAggregateStream($annotationFinder, $aggregateStreamAttribute, $projectionName, $eventNames); + } + + if (! isset($streamFilters[$projectionName]) || $streamFilters[$projectionName] === []) { + throw ConfigurationException::create( + "Projection '{$projectionName}' must have at least one #[FromStream] or #[FromAggregateStream] attribute defined on class {$classname}." + ); + } + } + + return $streamFilters; + } + + /** + * @param array $namedEvents Map of class name to named event name + * @return array> Map of projection name to event names (empty array means no filtering) + */ + public static function collectProjectionEventNames( + AnnotationFinder $annotationFinder, + InterfaceToCallRegistry $interfaceToCallRegistry, + ): array { + $namedEvents = []; + foreach ($annotationFinder->findAnnotatedClasses(NamedEvent::class) as $className) { + $attribute = $annotationFinder->getAttributeForClass($className, NamedEvent::class); + $namedEvents[$className] = $attribute->getName(); + } + + $projectionEventNames = []; + $disabledFiltering = []; + $routingMapBuilder = new BusRoutingMapBuilder(); + + foreach ($annotationFinder->findCombined(ProjectionV2::class, EventHandler::class) as $projectionEventHandler) { + /** @var ProjectionV2 $projectionAttribute */ + $projectionAttribute = $projectionEventHandler->getAnnotationForClass(); + $projectionName = $projectionAttribute->name; + + if (! isset($projectionEventNames[$projectionName])) { + $projectionEventNames[$projectionName] = []; + } + + if (isset($disabledFiltering[$projectionName])) { + continue; + } + + $routes = $routingMapBuilder->getRoutesFromAnnotatedFinding($projectionEventHandler, $interfaceToCallRegistry); + foreach ($routes as $route) { + if ($route === '*' || $route === 'object') { + $projectionEventNames[$projectionName] = []; + $disabledFiltering[$projectionName] = true; + break; + } + + if (str_contains($route, '*')) { + throw ConfigurationException::create( + "Projection {$projectionName} uses glob pattern '{$route}' which is not allowed. " . + 'For query optimization, event handlers must use explicit event names. Use union type parameters instead.' + ); + } + + if (class_exists($route) && isset($namedEvents[$route])) { + $projectionEventNames[$projectionName][] = $namedEvents[$route]; + } else { + $projectionEventNames[$projectionName][] = $route; + } + } + } + + foreach ($projectionEventNames as $projectionName => $eventNames) { + if (! isset($disabledFiltering[$projectionName]) && $eventNames !== []) { + $projectionEventNames[$projectionName] = array_values(array_unique($eventNames)); + } + } + + return $projectionEventNames; + } + + private static function resolveFromAggregateStream( + AnnotationFinder $annotationFinder, + FromAggregateStream $attribute, + string $projectionName, + array $eventNames = [] + ): StreamFilter { + $aggregateClass = $attribute->aggregateClass; + + $eventSourcingAggregateAttribute = $annotationFinder->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); + if ($eventSourcingAggregateAttribute === null) { + throw ConfigurationException::create("Class {$aggregateClass} referenced in #[FromAggregateStream] for projection {$projectionName} must be an EventSourcingAggregate."); + } + + $streamName = $aggregateClass; + if (class_exists(Stream::class)) { + $streamAttribute = $annotationFinder->findAttributeForClass($aggregateClass, Stream::class); + $streamName = $streamAttribute?->getName() ?? $aggregateClass; + } + + $aggregateType = $aggregateClass; + $aggregateTypeAttribute = $annotationFinder->findAttributeForClass($aggregateClass, AggregateType::class); + $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; + + return new StreamFilter($streamName, $aggregateType, $attribute->eventStoreReferenceName, $eventNames); + } + + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void + { + $filtersDefinition = []; + foreach ($this->streamFilters as $projectionName => $filters) { + $filtersDefinition[$projectionName] = []; + foreach ($filters as $filter) { + $filtersDefinition[$projectionName][] = new Definition(StreamFilter::class, [ + $filter->streamName, + $filter->aggregateType, + $filter->eventStoreReferenceName, + $filter->eventNames, + ]); + } + } + + $messagingConfiguration->registerServiceDefinition( + StreamFilterRegistry::class, + new Definition(StreamFilterRegistry::class, [$filtersDefinition]) + ); + } + + public function canHandle($extensionObject): bool + { + return false; + } + + public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array + { + return []; + } + + public function getModulePackageName(): string + { + return ModulePackageList::CORE_PACKAGE; + } +} + diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index 243cf3387..bfef18135 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -45,11 +45,21 @@ public function automaticInitialization(): bool return true; } - public function batchSize(): int + public function eventLoadingBatchSize(): int { return $this->channelAdapter->batchSize; } + public function backfillPartitionBatchSize(): int + { + return 100; // Default value, streaming channel adapters don't support partitioned backfill + } + + public function backfillAsyncChannelName(): ?string + { + return null; // Streaming channel adapters don't support async backfill + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { // Create the projection executor that forwards events to the streaming channel diff --git a/packages/Ecotone/src/Projecting/NullPartitionProvider.php b/packages/Ecotone/src/Projecting/NullPartitionProvider.php deleted file mode 100644 index 97ae79d3b..000000000 --- a/packages/Ecotone/src/Projecting/NullPartitionProvider.php +++ /dev/null @@ -1,16 +0,0 @@ - Partition keys + */ + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable; } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 769c38c48..955c946d7 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -8,23 +8,33 @@ namespace Ecotone\Projecting; use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener; +use Ecotone\Messaging\Gateway\MessagingEntrypoint; use InvalidArgumentException; use Throwable; class ProjectingManager { + private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; + public function __construct( private ProjectionStateStorage $projectionStateStorage, private ProjectorExecutor $projectorExecutor, private StreamSource $streamSource, private PartitionProvider $partitionProvider, + private StreamFilterRegistry $streamFilterRegistry, private string $projectionName, private TerminationListener $terminationListener, - private int $batchSize = 1000, + private MessagingEntrypoint $messagingEntrypoint, + private int $eventLoadingBatchSize = 1000, private bool $automaticInitialization = true, + private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, + private ?string $backfillAsyncChannelName = null, ) { - if ($batchSize < 1) { - throw new InvalidArgumentException('Batch size must be at least 1'); + if ($eventLoadingBatchSize < 1) { + throw new InvalidArgumentException('Event loading batch size must be at least 1'); + } + if ($backfillPartitionBatchSize < 1) { + throw new InvalidArgumentException('Backfill partition batch size must be at least 1'); } } @@ -48,7 +58,7 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial return 0; } - $streamPage = $this->streamSource->load($projectionState->lastPosition, $this->batchSize, $partitionKeyValue); + $streamPage = $this->streamSource->load($projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); $userState = $projectionState->userState; $processedEvents = 0; @@ -83,6 +93,16 @@ public function loadState(?string $partitionKey = null): ProjectionPartitionStat return $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); } + public function getPartitionProvider(): PartitionProvider + { + return $this->partitionProvider; + } + + public function getProjectionName(): string + { + return $this->projectionName; + } + public function init(): void { $this->projectionStateStorage->init($this->projectionName); @@ -97,16 +117,71 @@ public function delete(): void $this->projectorExecutor->delete(); } - public function backfill(): void + /** + * Prepares backfill by calculating batches and sending messages to BackfillExecutorHandler. + * Each batch message contains a limit and offset for processing a subset of partitions. + * This enables the backfill to be executed synchronously or asynchronously depending on configuration. + */ + public function prepareBackfill(): void { - foreach ($this->partitionProvider->partitions() as $partition) { - $this->execute($partition, true); - if ($this->terminationListener->shouldTerminate()) { - break; - } + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + + foreach ($streamFilters as $streamFilter) { + $this->prepareBackfillForFilter($streamFilter); } } + private function prepareBackfillForFilter(StreamFilter $streamFilter): void + { + $totalPartitions = $this->partitionProvider->count($streamFilter); + + if ($totalPartitions === 0) { + return; + } + + $numberOfBatches = (int) ceil($totalPartitions / $this->backfillPartitionBatchSize); + + for ($batch = 0; $batch < $numberOfBatches; $batch++) { + $offset = $batch * $this->backfillPartitionBatchSize; + + $headers = [ + 'backfill.limit' => $this->backfillPartitionBatchSize, + 'backfill.offset' => $offset, + 'backfill.streamName' => $streamFilter->streamName, + 'backfill.aggregateType' => $streamFilter->aggregateType, + 'backfill.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, + ]; + + $this->sendBackfillMessage($headers); + } + } + + private function sendBackfillMessage(array $headers): void + { + if ($this->backfillAsyncChannelName !== null) { + $this->messagingEntrypoint->sendWithHeaders( + $this->projectionName, + $headers, + $this->backfillAsyncChannelName, + BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL + ); + } else { + $this->messagingEntrypoint->sendWithHeaders( + $this->projectionName, + $headers, + BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL + ); + } + } + + /** + * @deprecated Use prepareBackfill() instead. This method is kept for backward compatibility. + */ + public function backfill(): void + { + $this->prepareBackfill(); + } + private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState { $projectionState = $this->projectionStateStorage->loadPartition($this->projectionName, $partitionKey); diff --git a/packages/Ecotone/src/Projecting/SinglePartitionProvider.php b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php new file mode 100644 index 000000000..5eccfbca2 --- /dev/null +++ b/packages/Ecotone/src/Projecting/SinglePartitionProvider.php @@ -0,0 +1,25 @@ += 1)) { + yield null; + } + } +} diff --git a/packages/Ecotone/src/Projecting/StreamFilter.php b/packages/Ecotone/src/Projecting/StreamFilter.php new file mode 100644 index 000000000..212efe65b --- /dev/null +++ b/packages/Ecotone/src/Projecting/StreamFilter.php @@ -0,0 +1,29 @@ + $eventNames Event names to filter, empty array means no filtering + */ + public function __construct( + public readonly string $streamName, + public readonly ?string $aggregateType = null, + public readonly string $eventStoreReferenceName = EventStore::class, + public readonly array $eventNames = [], + ) { + } +} + diff --git a/packages/Ecotone/src/Projecting/StreamFilterRegistry.php b/packages/Ecotone/src/Projecting/StreamFilterRegistry.php new file mode 100644 index 000000000..2773d14ec --- /dev/null +++ b/packages/Ecotone/src/Projecting/StreamFilterRegistry.php @@ -0,0 +1,32 @@ + $filters key is projection name + */ + public function __construct( + private array $filters = [], + ) { + } + + /** + * @return StreamFilter[] + */ + public function provide(string $projectionName): array + { + return $this->filters[$projectionName] ?? []; + } +} + diff --git a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php index 974058340..5040ad372 100644 --- a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php +++ b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php @@ -4,6 +4,7 @@ namespace Test\Ecotone\Lite; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Converter; use Ecotone\Messaging\Config\ModulePackageList; @@ -25,15 +26,13 @@ class InMemoryEventStoreRegistrationTest extends TestCase { public function test_registers_in_memory_event_store_stream_source_when_pdo_event_sourcing_is_disabled(): void { - // Given a test event class $testEvent = new class () { public function __construct(public int $id = 0, public string $name = '') { } }; - // Given a polling projection (polling projections read from stream sources) - $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller')] class ($testEvent) { + $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller'), FromStream('test_stream')] class ($testEvent) { public array $events = []; public int $callCount = 0; private string $eventClass; @@ -77,18 +76,13 @@ public function onEvent(object $event): void public function test_does_not_register_in_memory_stream_source_when_custom_stream_source_is_provided(): void { - // This test verifies that when a custom stream source is provided, - // the InMemoryEventStoreStreamSourceBuilder is NOT registered - - // Given a test event class $testEvent = new class () { public function __construct(public int $id = 0, public string $name = '') { } }; - // Given a polling projection - $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller')] class { + $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller'), FromStream('test_stream')] class { public array $events = []; public int $callCount = 0; diff --git a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php index 866ae678a..1dec00e45 100644 --- a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php +++ b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php @@ -7,6 +7,7 @@ namespace Test\Ecotone\Projecting; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder; use Ecotone\Messaging\Config\ModulePackageList; @@ -36,7 +37,7 @@ public function test_event_streaming_projection_consuming_from_streaming_channel $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection that consumes from streaming channel - $projection = new #[ProjectionV2('user_projection'), Streaming('streaming_channel')] class { + $projection = new #[ProjectionV2('user_projection'), FromStream('test_stream'), Streaming('streaming_channel')] class { public array $projectedUsers = []; #[EventHandler] @@ -86,7 +87,7 @@ public function test_event_streaming_projection_with_multiple_event_handlers_rou $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection with two event handlers routed by event names - $projection = new #[ProjectionV2('order_projection'), Streaming('streaming_channel')] class { + $projection = new #[ProjectionV2('order_projection'), FromStream('test_stream'), Streaming('streaming_channel')] class { public array $createdOrders = []; public array $completedOrders = []; @@ -150,7 +151,7 @@ public function test_event_streaming_projection_with_event_store_channel_adapter $positionTracker = new InMemoryConsumerPositionTracker(); // Given a projection that consumes from streaming channel - $projection = new #[ProjectionV2('product_projection'), Streaming('event_stream')] class { + $projection = new #[ProjectionV2('product_projection'), FromStream('test_stream'), Streaming('event_stream')] class { public array $projectedProducts = []; #[EventHandler] @@ -211,7 +212,7 @@ public function test_two_event_streaming_projections_consuming_from_same_channel $positionTracker = new InMemoryConsumerPositionTracker(); // Given two projections consuming from the same streaming channel - $productListProjection = new #[ProjectionV2('product_list_projection'), Streaming('event_stream')] class { + $productListProjection = new #[ProjectionV2('product_list_projection'), FromStream('test_stream'), Streaming('event_stream')] class { public array $productList = []; #[EventHandler] @@ -221,7 +222,7 @@ public function onProductRegistered(ProductRegistered $event): void } }; - $productPriceProjection = new #[ProjectionV2('product_price_projection'), Streaming('event_stream')] class { + $productPriceProjection = new #[ProjectionV2('product_price_projection'), FromStream('test_stream'), Streaming('event_stream')] class { public array $productPrices = []; #[EventHandler] @@ -293,7 +294,7 @@ public function test_event_driven_projection_combined_with_event_streaming_proje $positionTracker = new InMemoryConsumerPositionTracker(); // Given an event-driven projection (catches up from stream when triggered) - $eventDrivenProjection = new #[ProjectionV2('event_driven_product_count')] class { + $eventDrivenProjection = new #[ProjectionV2('event_driven_product_count'), FromStream('test_stream')] class { public int $productCount = 0; #[EventHandler] @@ -304,7 +305,7 @@ public function onProductRegistered(ProductRegistered $event): void }; // Given an event streaming projection (processes events in polling mode from streaming channel) - $eventStreamingProjection = new #[ProjectionV2('streaming_product_list'), Streaming('event_stream')] class { + $eventStreamingProjection = new #[ProjectionV2('streaming_product_list'), FromStream('test_stream'), Streaming('event_stream')] class { public array $productList = []; #[EventHandler] diff --git a/packages/Ecotone/tests/Projecting/ProjectingTest.php b/packages/Ecotone/tests/Projecting/ProjectingTest.php index 11a09cc60..e162381d0 100644 --- a/packages/Ecotone/tests/Projecting/ProjectingTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectingTest.php @@ -7,6 +7,7 @@ namespace Test\Ecotone\Projecting; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Asynchronous; @@ -21,7 +22,7 @@ use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Modelling\Event; use Ecotone\Projecting\Attribute\Partitioned; -use Ecotone\Projecting\Attribute\ProjectionBatchSize; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -40,7 +41,7 @@ class ProjectingTest extends TestCase public function test_asynchronous_projection(): void { // Given an asynchronous projection - $projection = new #[ProjectionV2('test'), Asynchronous('async')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Asynchronous('async')] class { public array $handledEvents = []; #[EventHandler('*')] public function handle(array $event): void @@ -71,7 +72,7 @@ public function handle(array $event): void public function test_partitioned_projection(): void { // Given a partitioned projection - $projection = new #[ProjectionV2('test'), Partitioned('partitionHeader')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned('partitionHeader')] class { public array $handledEvents = []; #[EventHandler('*')] public function handle(array $event): void @@ -105,7 +106,7 @@ public function handle(array $event): void public function test_asynchronous_partitioned_projection(): void { // Given a partitioned async projection - $projection = new #[ProjectionV2('test'), Partitioned('partitionHeader'), Asynchronous('async')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream'), Partitioned('partitionHeader'), Asynchronous('async')] class { public array $handledEvents = []; #[EventHandler('*')] public function handle(array $event): void @@ -141,7 +142,7 @@ public function handle(array $event): void public function test_it_can_init_projection_lifecycle_state(): void { - $projection = new #[ProjectionV2('projection_with_lifecycle')] class { + $projection = new #[ProjectionV2('projection_with_lifecycle'), FromStream('test_stream')] class { public const TICKET_CREATED = 'ticket.created'; private bool $initialized = false; public array $projectedEvents = []; @@ -184,7 +185,7 @@ public function init(): void public function test_it_skips_execution_when_automatic_initialization_is_off_and_not_initialized(): void { - $projection = new #[ProjectionV2('projection_with_manual_initialization'), ProjectionDeployment(manualKickOff: true)] class { + $projection = new #[ProjectionV2('projection_with_manual_initialization'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; @@ -224,7 +225,7 @@ public function on(array $event): void public function test_init_partition_concurrency_protection(): void { - $projection = new #[ProjectionV2('concurrent_projection')] class { + $projection = new #[ProjectionV2('concurrent_projection'), FromStream('test_stream')] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -267,7 +268,7 @@ public function init(): void public function test_auto_initialization_mode_processes_events(): void { - $projection = new #[ProjectionV2('auto_projection'), ProjectionDeployment(manualKickOff: false)] class { + $projection = new #[ProjectionV2('auto_projection'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: false)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -307,7 +308,7 @@ public function init(): void public function test_skip_initialization_mode_skips_events_when_not_initialized(): void { - $projection = new #[ProjectionV2('skip_projection'), ProjectionDeployment(manualKickOff: true)] class { + $projection = new #[ProjectionV2('skip_projection'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -347,7 +348,7 @@ public function init(): void public function test_skip_mode_with_multiple_events(): void { - $projection = new #[ProjectionV2('skip_multiple_events'), ProjectionDeployment(manualKickOff: true)] class { + $projection = new #[ProjectionV2('skip_multiple_events'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: true)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -390,7 +391,7 @@ public function init(): void public function test_auto_mode_with_multiple_events(): void { - $projection = new #[ProjectionV2('auto_multiple_events'), ProjectionDeployment(manualKickOff: false)] class { + $projection = new #[ProjectionV2('auto_multiple_events'), FromStream('test_stream'), ProjectionDeployment(manualKickOff: false)] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -432,7 +433,7 @@ public function init(): void public function test_projection_with_partitioned_events(): void { - $projection = new #[ProjectionV2('partitioned_auto_projection'), Partitioned('tenantId')] class { + $projection = new #[ProjectionV2('partitioned_auto_projection'), FromStream('test_stream'), Partitioned('tenantId')] class { public const TICKET_CREATED = 'ticket.created'; public array $projectedEvents = []; public int $initCallCount = 0; @@ -475,7 +476,7 @@ public function test_it_throws_exception_when_no_licence(): void $this->expectException(ConfigurationException::class); $this->expectExceptionMessage('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.'); - $projection = new #[ProjectionV2('test')] class { + $projection = new #[ProjectionV2('test'), FromStream('test_stream')] class { #[EventHandler('*')] public function handle(array $event): void { @@ -490,10 +491,29 @@ public function handle(array $event): void ); } + public function test_it_throws_exception_when_no_stream_defined(): void + { + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage("Projection 'test_no_stream' must have at least one #[FromStream] or #[FromAggregateStream] attribute defined"); + + $projection = new #[ProjectionV2('test_no_stream')] class { + #[EventHandler('*')] + public function handle(array $event): void + { + } + }; + EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + configuration: ServiceConfiguration::createWithDefaults() + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ); + } + public function test_it_with_event_handler_priority(): void { $db = []; - $projectionA = new #[ProjectionV2('A')] class ($db) { + $projectionA = new #[ProjectionV2('A'), FromStream('test_stream')] class ($db) { public function __construct(private array &$db) { } @@ -509,7 +529,7 @@ public function handleHighPriority(array $event): void $this->db[] = 'projectionA-with-priority'; } }; - $projectionB = new #[ProjectionV2('B')] class ($db) { + $projectionB = new #[ProjectionV2('B'), FromStream('test_stream')] class ($db) { public function __construct(private array &$db) { } @@ -551,7 +571,7 @@ public function handleHighPriority(array $event): void public function test_it_can_flush_by_batches(): void { - $projection = new #[ProjectionV2('batch_projection')] class () { + $projection = new #[ProjectionV2('batch_projection'), FromStream('test_stream')] class () { public array $processingEvents = []; public array $flushedEvents = []; #[EventHandler('*')] @@ -592,7 +612,7 @@ public function flush(): void #[RequiresPhpExtension('posix')] public function test_pcntl_signals_handling(): void { - $projection = new #[ProjectionV2('signals_projection'), ProjectionBatchSize(2)] class () { + $projection = new #[ProjectionV2('signals_projection'), FromStream('test_stream'), ProjectionExecution(eventLoadingBatchSize: 2)] class () { public array $processedEvents = []; #[EventHandler('*')] public function handle(array $event): void diff --git a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php index 4b874580b..2b723ac34 100644 --- a/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php +++ b/packages/PdoEventSourcing/src/Config/ProophProjectingModule.php @@ -10,10 +10,6 @@ use Ecotone\AnnotationFinder\AnnotationFinder; use Ecotone\Dbal\Configuration\DbalConfiguration; use Ecotone\Dbal\Database\DbalTableManagerReference; -use Ecotone\EventSourcing\Attribute\AggregateType; -use Ecotone\EventSourcing\Attribute\FromAggregateStream; -use Ecotone\EventSourcing\Attribute\FromStream; -use Ecotone\EventSourcing\Attribute\Stream; use Ecotone\EventSourcing\Database\ProjectionStateTableManager; use Ecotone\EventSourcing\EventSourcingConfiguration; use Ecotone\EventSourcing\Projecting\AggregateIdPartitionProviderBuilder; @@ -31,14 +27,12 @@ use Ecotone\Messaging\Config\ModuleReferenceSearchService; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Handler\InterfaceToCallRegistry; -use Ecotone\Modelling\Attribute\EventHandler; -use Ecotone\Modelling\Attribute\EventSourcingAggregate; -use Ecotone\Modelling\Attribute\NamedEvent; -use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder; use Ecotone\Projecting\Attribute\Partitioned; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Projecting\Config\ProjectionComponentBuilder; +use Ecotone\Projecting\Config\StreamFilterRegistryModule; use Ecotone\Projecting\EventStoreAdapter\EventStreamingChannelAdapter; +use Ecotone\Projecting\StreamFilter; #[ModuleAnnotation] class ProophProjectingModule implements AnnotationModule @@ -55,15 +49,9 @@ public function __construct( public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static { - $namedEvents = []; - foreach ($annotationRegistrationService->findAnnotatedClasses(NamedEvent::class) as $className) { - $attribute = $annotationRegistrationService->getAttributeForClass($className, NamedEvent::class); - $namedEvents[$className] = $attribute->getName(); - } - - $projectionEventNames = self::collectProjectionEventNames($annotationRegistrationService, $interfaceToCallRegistry, $namedEvents); + $allStreamFilters = StreamFilterRegistryModule::collectStreamFilters($annotationRegistrationService, $interfaceToCallRegistry); - $extensions = self::resolveConfigs($annotationRegistrationService, $projectionEventNames); + $extensions = self::resolveConfigs($annotationRegistrationService, $allStreamFilters); $projectionNames = []; foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) { @@ -78,68 +66,61 @@ public static function create(AnnotationFinder $annotationRegistrationService, I } /** - * Resolve stream configurations from FromStream attributes. - * + * @param array $allStreamFilters * @return list */ private static function resolveConfigs( AnnotationFinder $annotationRegistrationService, - array $projectionEventNames + array $allStreamFilters, ): array { $extensions = []; $partitionProviders = []; - foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { - $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); - $streamAttributes = [ - ...$annotationRegistrationService->getAnnotationsForClass($classname, FromStream::class), - ...\array_map( - fn (FromAggregateStream $aggregateStreamAttribute) => self::resolveFromAggregateStream($annotationRegistrationService, $aggregateStreamAttribute, $projectionAttribute->name), - $annotationRegistrationService->getAnnotationsForClass($classname, FromAggregateStream::class) - ), - ]; - $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class); + foreach ($allStreamFilters as $projectionName => $streamFilters) { + $projectionClass = null; + foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $classname) { + $projectionAttribute = $annotationRegistrationService->getAttributeForClass($classname, ProjectionV2::class); + if ($projectionAttribute->name === $projectionName) { + $projectionClass = $classname; + break; + } + } - if (empty($streamAttributes)) { + if ($projectionClass === null) { continue; } - $projectionName = $projectionAttribute->name; + $partitionedAttribute = $annotationRegistrationService->findAttributeForClass($projectionClass, Partitioned::class); $isPartitioned = $partitionedAttribute !== null; - // @todo: Partitioned projections cannot be declared with multiple streams because the current partition provider cannot merge partitions from multiple streams. - if ($isPartitioned && count($streamAttributes) > 1) { + if ($isPartitioned && \count($streamFilters) > 1) { throw ConfigurationException::create( "Partitioned projection {$projectionName} cannot declare multiple streams. Use a single aggregate stream or remove #[Partitioned]." ); } $sources = []; - foreach ($streamAttributes as $streamAttribute) { - if ($isPartitioned && ! $streamAttribute->aggregateType) { + foreach ($streamFilters as $streamFilter) { + if ($isPartitioned && ! $streamFilter->aggregateType) { throw ConfigurationException::create("Aggregate type must be provided for projection {$projectionName} as partition header name is provided"); } if ($isPartitioned) { - $sourceIdentifier = $streamAttribute->stream.'.'.$streamAttribute->aggregateType; + $sourceIdentifier = $streamFilter->streamName . '.' . $streamFilter->aggregateType; $sources[$sourceIdentifier] = new EventStoreAggregateStreamSourceBuilder( $projectionName, - $streamAttribute->aggregateType, - $streamAttribute->stream, - $projectionEventNames[$projectionName] ?? [], - ); - $partitionProviders[$streamAttribute->stream] ??= new AggregateIdPartitionProviderBuilder( - $projectionName, - $streamAttribute->aggregateType, - $streamAttribute->stream, + $streamFilter, ); + if (! isset($partitionProviders[$projectionName])) { + $partitionProviders[$projectionName] = new AggregateIdPartitionProviderBuilder($projectionName); + } } else { - $sources[$streamAttribute->stream] = new EventStoreGlobalStreamSourceBuilder( - $streamAttribute->stream, + $sources[$streamFilter->streamName] = new EventStoreGlobalStreamSourceBuilder( + $streamFilter, [$projectionName] ); } } - if (count($sources) > 1) { + if (\count($sources) > 1) { $extensions[] = new EventStoreMultiStreamSourceBuilder( $sources, [$projectionName], @@ -153,30 +134,6 @@ private static function resolveConfigs( return $extensions; } - /** - * Resolve stream configurations from FromAggregateStream attributes. - */ - private static function resolveFromAggregateStream( - AnnotationFinder $annotationRegistrationService, - FromAggregateStream $aggregateStreamAttribute, - string $projectionName - ): FromStream { - $aggregateClass = $aggregateStreamAttribute->aggregateClass; - - $eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class); - if ($eventSourcingAggregateAttribute === null) { - throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class."); - } - - $streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class); - $streamName = $streamAttribute?->getName() ?? $aggregateClass; - - $aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class); - $aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass; - - return new FromStream($streamName, $aggregateType, $aggregateStreamAttribute->eventStoreReferenceName); - } - public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { $dbalConfiguration = ExtensionObjectResolver::resolveUnique(DbalConfiguration::class, $extensionObjects, DbalConfiguration::createWithDefaults()); @@ -198,7 +155,8 @@ public function canHandle($extensionObject): bool public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array { - $extensions = [...$this->extensions]; + $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $serviceExtensions, EventSourcingConfiguration::createWithDefaults()); + $extensions = $eventSourcingConfiguration->isInMemory() ? [] : [...$this->extensions]; foreach ($serviceExtensions as $extensionObject) { if (! ($extensionObject instanceof EventStreamingChannelAdapter)) { @@ -207,14 +165,13 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, $projectionName = $extensionObject->getProjectionName(); $extensions[] = new EventStoreGlobalStreamSourceBuilder( - $extensionObject->fromStream, + new StreamFilter($extensionObject->fromStream), [$projectionName] ); } $extensions[] = new DbalTableManagerReference(ProjectionStateTableManager::class); - $eventSourcingConfiguration = ExtensionObjectResolver::resolveUnique(EventSourcingConfiguration::class, $serviceExtensions, EventSourcingConfiguration::createWithDefaults()); $eventStreamingChannelAdapters = ExtensionObjectResolver::resolve(EventStreamingChannelAdapter::class, $serviceExtensions); if (($this->projectionNames || $eventStreamingChannelAdapters) && ! $eventSourcingConfiguration->isInMemory()) { @@ -230,69 +187,4 @@ public function getModulePackageName(): string { return ModulePackageList::EVENT_SOURCING_PACKAGE; } - - /** - * Collect event names for each partitioned projection. - * Returns empty array for projections that use catch-all patterns or object types. - * - * @param array $namedEvents Map of class name to named event name - * @return array> Map of projection name to event names (empty array means no filtering) - */ - private static function collectProjectionEventNames( - AnnotationFinder $annotationRegistrationService, - InterfaceToCallRegistry $interfaceToCallRegistry, - array $namedEvents - ): array { - $projectionEventNames = []; - $disabledFiltering = []; - $routingMapBuilder = new BusRoutingMapBuilder(); - - foreach ($annotationRegistrationService->findCombined(ProjectionV2::class, EventHandler::class) as $projectionEventHandler) { - /** @var ProjectionV2 $projectionAttribute */ - $projectionAttribute = $projectionEventHandler->getAnnotationForClass(); - $projectionName = $projectionAttribute->name; - - if (! isset($projectionEventNames[$projectionName])) { - $projectionEventNames[$projectionName] = []; - } - - if (isset($disabledFiltering[$projectionName])) { - continue; - } - - $routes = $routingMapBuilder->getRoutesFromAnnotatedFinding($projectionEventHandler, $interfaceToCallRegistry); - foreach ($routes as $route) { - // Check for catch-all pattern - disable filtering by keeping empty array - if ($route === '*' || $route === 'object') { - $projectionEventNames[$projectionName] = []; - $disabledFiltering[$projectionName] = true; - break; - } - - // Check for glob patterns (containing * but not exactly *) - if (str_contains($route, '*')) { - throw ConfigurationException::create( - "Projection {$projectionName} uses glob pattern '{$route}' which is not allowed. " . - 'For query optimization, event handlers must use explicit event names. Use union type parameters instead.' - ); - } - - // Check if route is a class with NamedEvent annotation - if (class_exists($route) && isset($namedEvents[$route])) { - $projectionEventNames[$projectionName][] = $namedEvents[$route]; - } else { - $projectionEventNames[$projectionName][] = $route; - } - } - } - - // Deduplicate event names (skip disabled ones which are empty arrays) - foreach ($projectionEventNames as $projectionName => $eventNames) { - if (! isset($disabledFiltering[$projectionName]) && $eventNames !== []) { - $projectionEventNames[$projectionName] = array_values(array_unique($eventNames)); - } - } - - return $projectionEventNames; - } } diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php index 28272c69c..af4047800 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProvider.php @@ -14,6 +14,7 @@ use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory; use Ecotone\EventSourcing\PdoStreamTableNameProvider; use Ecotone\Projecting\PartitionProvider; +use Ecotone\Projecting\StreamFilter; use Enqueue\Dbal\DbalConnectionFactory; use RuntimeException; @@ -21,19 +22,54 @@ class AggregateIdPartitionProvider implements PartitionProvider { public function __construct( private DbalConnectionFactory|MultiTenantConnectionFactory $connectionFactory, - private string $aggregateType, - private string $streamName, private PdoStreamTableNameProvider $tableNameProvider ) { } - public function partitions(): iterable + public function count(StreamFilter $filter): int { $connection = $this->getConnection(); $platform = $connection->getDatabasePlatform(); // Resolve table name at runtime using the provider - $streamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $streamTable = $this->tableNameProvider->generateTableNameForStream($filter->streamName); + + // Build platform-specific count query + if ($platform instanceof PostgreSQLPlatform) { + // PostgreSQL: Use JSONB operators + $result = $connection->executeQuery(<<>'_aggregate_id') + FROM {$streamTable} + WHERE metadata->>'_aggregate_type' = ? + SQL, [$filter->aggregateType]); + } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { + // MySQL/MariaDB: Use generated indexed columns for better performance + $result = $connection->executeQuery(<<aggregateType]); + } else { + throw new RuntimeException('Unsupported database platform: ' . get_class($platform)); + } + + return (int) $result->fetchOne(); + } + + public function partitions(StreamFilter $filter, ?int $limit = null, int $offset = 0): iterable + { + $connection = $this->getConnection(); + $platform = $connection->getDatabasePlatform(); + + // Resolve table name at runtime using the provider + $streamTable = $this->tableNameProvider->generateTableNameForStream($filter->streamName); + + // Build pagination clause + $limitClause = ''; + if ($limit !== null) { + $limitClause = " LIMIT {$limit}"; + } + $offsetClause = $offset > 0 ? " OFFSET {$offset}" : ''; // Build platform-specific query if ($platform instanceof PostgreSQLPlatform) { @@ -42,14 +78,18 @@ public function partitions(): iterable SELECT DISTINCT metadata->>'_aggregate_id' AS aggregate_id FROM {$streamTable} WHERE metadata->>'_aggregate_type' = ? - SQL, [$this->aggregateType]); + ORDER BY aggregate_id + {$limitClause}{$offsetClause} + SQL, [$filter->aggregateType]); } elseif ($platform instanceof MySQLPlatform || $platform instanceof MariaDBPlatform) { // MySQL/MariaDB: Use generated indexed columns for better performance $query = $connection->executeQuery(<<aggregateType]); + ORDER BY aggregate_id + {$limitClause}{$offsetClause} + SQL, [$filter->aggregateType]); } else { throw new RuntimeException('Unsupported database platform: ' . get_class($platform)); } diff --git a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php index 8223da7ed..42285b468 100644 --- a/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/AggregateIdPartitionProviderBuilder.php @@ -17,7 +17,7 @@ class AggregateIdPartitionProviderBuilder implements ProjectionComponentBuilder { - public function __construct(public readonly string $handledProjectionName, public readonly ?string $aggregateType, private string $streamName) + public function __construct(public readonly string $handledProjectionName) { } @@ -30,8 +30,6 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc { return new Definition(AggregateIdPartitionProvider::class, [ Reference::to(DbalConnectionFactory::class), - $this->aggregateType, - $this->streamName, Reference::to(PdoStreamTableNameProvider::class), ]); } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php index 35f53d069..336de6319 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php @@ -13,20 +13,16 @@ use Ecotone\EventSourcing\EventStore\Operator; use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\Support\Assert; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; use RuntimeException; class EventStoreAggregateStreamSource implements StreamSource { - /** - * @param array $eventNames Event names to filter by, empty array means no filtering - */ public function __construct( - private EventStore $eventStore, - private string $streamName, - private ?string $aggregateType, - private array $eventNames = [], + private EventStore $eventStore, + private StreamFilter $streamFilter, ) { } @@ -34,17 +30,16 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = { Assert::notNull($partitionKey, 'Partition key cannot be null for aggregate stream source'); - if (! $this->eventStore->hasStream($this->streamName)) { + if (! $this->eventStore->hasStream($this->streamFilter->streamName)) { return new StreamPage([], $lastPosition ?? ''); } $metadataMatcher = new MetadataMatcher(); - if ($this->aggregateType !== null) { - // @todo: watch out ! Prooph's event store has an index on (aggregate_type, aggregate_id). Not adding aggregate type here will result in a full table scan + if ($this->streamFilter->aggregateType !== null) { $metadataMatcher = $metadataMatcher->withMetadataMatch( MessageHeaders::EVENT_AGGREGATE_TYPE, Operator::EQUALS, - $this->aggregateType + $this->streamFilter->aggregateType ); } $metadataMatcher = $metadataMatcher->withMetadataMatch( @@ -58,17 +53,17 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = (int)$lastPosition + 1 ); - if ($this->eventNames !== []) { + if ($this->streamFilter->eventNames !== []) { $metadataMatcher = $metadataMatcher->withMetadataMatch( 'event_name', Operator::IN, - $this->eventNames, + $this->streamFilter->eventNames, FieldType::MESSAGE_PROPERTY ); } $events = $this->eventStore->load( - $this->streamName, + $this->streamFilter->streamName, 1, $count, $metadataMatcher, diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php index 7b6909bcf..e24b29193 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSourceBuilder.php @@ -7,23 +7,18 @@ namespace Ecotone\EventSourcing\Projecting\StreamSource; -use Ecotone\EventSourcing\EventStore; use Ecotone\Messaging\Config\Container\Definition; use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Projecting\Config\ProjectionComponentBuilder; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamSource; class EventStoreAggregateStreamSourceBuilder implements ProjectionComponentBuilder { - /** - * @param array $eventNames Event names to filter, empty array means no filtering - */ public function __construct( public readonly string $handledProjectionName, - public ?string $aggregateType, - private string $streamName, - private array $eventNames = [], + public readonly StreamFilter $streamFilter, ) { } @@ -37,10 +32,13 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc return new Definition( EventStoreAggregateStreamSource::class, [ - new Reference(EventStore::class), - $this->streamName, - $this->aggregateType, - $this->eventNames, + new Reference($this->streamFilter->eventStoreReferenceName), + new Definition(StreamFilter::class, [ + $this->streamFilter->streamName, + $this->streamFilter->aggregateType, + $this->streamFilter->eventStoreReferenceName, + $this->streamFilter->eventNames, + ]), ], ); } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index 8fed841e1..8ebc2beb2 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -18,7 +18,7 @@ use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Messaging\Support\Assert; -use Ecotone\Modelling\Event; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamPage; use Ecotone\Projecting\StreamSource; use Enqueue\Dbal\DbalConnectionFactory; @@ -31,7 +31,7 @@ class EventStoreGlobalStreamSource implements StreamSource public function __construct( private DbalConnectionFactory|ManagerRegistryConnectionFactory|MultiTenantConnectionFactory $connectionFactory, private EcotoneClockInterface $clock, - private string $streamName, + private StreamFilter $streamFilter, private PdoStreamTableNameProvider $tableNameProvider, private int $maxGapOffset = 5_000, private ?Duration $gapTimeout = null, @@ -53,8 +53,7 @@ public function load(?string $lastPosition, int $count, ?string $partitionKey = $connection = $this->getConnection(); - // Resolve table name at runtime using the provider - $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamFilter->streamName); if (empty($lastPosition) && ! SchemaManagerCompatibility::tableExists($connection, $proophStreamTable)) { return new StreamPage([], ''); @@ -111,12 +110,10 @@ private function cleanGapsByTimeout(GapAwarePosition $tracking, Connection $conn } $minGap = $gaps[0]; - $maxGap = $gaps[count($gaps) - 1]; + $maxGap = $gaps[\count($gaps) - 1]; - // Resolve table name at runtime - $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamName); + $proophStreamTable = $this->tableNameProvider->generateTableNameForStream($this->streamFilter->streamName); - // Query interleaved events in the gap range $interleavedEvents = $connection->executeQuery(<<clock->now()->sub($this->gapTimeout)->unixTime()->inSeconds(); - // Find the highest position with timestamp < timeThreshold - $cutoffPosition = $minGap; // default: keep all gaps + $cutoffPosition = $minGap; foreach ($interleavedEvents as $event) { $interleavedEventPosition = $event['no']; $timestamp = $this->getTimestamp($event['created_at']); if ($timestamp > $timestampThreshold) { - // Event is recent, do not remove any gaps below this position break; } - if (in_array($interleavedEventPosition, $gaps, true)) { - // This position is a gap that could be filled, stop cleaning + if (\in_array($interleavedEventPosition, $gaps, true)) { break; } if ($timestamp < $timestampThreshold && $interleavedEventPosition > $cutoffPosition) { - $cutoffPosition = $interleavedEventPosition + 1; // Remove gaps below this position + $cutoffPosition = $interleavedEventPosition + 1; } } diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php index 39975353d..7455cc258 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSourceBuilder.php @@ -14,13 +14,14 @@ use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Projecting\Config\ProjectionComponentBuilder; +use Ecotone\Projecting\StreamFilter; use Ecotone\Projecting\StreamSource; use Enqueue\Dbal\DbalConnectionFactory; class EventStoreGlobalStreamSourceBuilder implements ProjectionComponentBuilder { public function __construct( - private string $streamName, + private StreamFilter $streamFilter, private array $handledProjectionNames, ) { } @@ -37,7 +38,12 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc [ Reference::to(DbalConnectionFactory::class), Reference::to(EcotoneClockInterface::class), - $this->streamName, + new Definition(StreamFilter::class, [ + $this->streamFilter->streamName, + $this->streamFilter->aggregateType, + $this->streamFilter->eventStoreReferenceName, + $this->streamFilter->eventNames, + ]), Reference::to(PdoStreamTableNameProvider::class), 5_000, new Definition(Duration::class, [60], 'seconds'), diff --git a/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php b/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php index 749f84721..b840f0580 100644 --- a/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php +++ b/packages/PdoEventSourcing/tests/InMemory/InMemoryEventStoreRegistrationTest.php @@ -4,6 +4,7 @@ namespace Test\Ecotone\EventSourcing\InMemory; +use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\EventSourcingConfiguration; use Ecotone\Lite\EcotoneLite; use Ecotone\Messaging\Attribute\Converter; @@ -24,8 +25,7 @@ class InMemoryEventStoreRegistrationTest extends TestCase { public function test_registers_in_memory_event_store_stream_source_when_pdo_event_sourcing_is_in_memory_mode(): void { - // Given a polling projection (polling projections read from stream sources) - $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller')] class { + $projection = new #[ProjectionV2('test_projection'), Polling('test_projection_poller'), FromStream('test_stream')] class { public array $events = []; public int $callCount = 0; diff --git a/packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php new file mode 100644 index 000000000..f7f7922fd --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php @@ -0,0 +1,329 @@ +connection->executeStatement( + "INSERT INTO {$this->tableName()} VALUES (?,?)", + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + "CREATE TABLE IF NOT EXISTS {$this->tableName()} (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))" + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->tableName()}"); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement("DELETE FROM {$this->tableName()}"); + } + + public function getTickets(): array + { + return $this->connection->executeQuery("SELECT * FROM {$this->tableName()} ORDER BY ticket_id ASC")->fetchAllAssociative(); + } +} + +/** + * licence Enterprise + * @internal + */ +final class BackfillProjectionTest extends ProjectingTestCase +{ + public function test_throws_exception_when_backfill_batch_size_is_less_than_one(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Backfill partition batch size must be at least 1'); + + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('batch0_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 0), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + protected function tableName(): string + { + return 'batch0_tickets'; + } + }; + + $this->bootstrapEcotone([$projection::class], [$projection], true); + } + + public function test_partitioned_projection_async_backfill_with_batch_of_2_processes_5_partitions_in_3_runs(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('batch2_async_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 2, asyncChannelName: 'backfill_channel'), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getBackfillTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'batch2_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('backfill_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_channel') + ); + + $this->createPartitions($ecotone, 5); + + $ecotone->deleteProjection('batch2_async_projection') + ->initializeProjection('batch2_async_projection'); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'batch2_async_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_channel'); + self::assertCount(3, $messages); + + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(4, $ecotone->sendQueryWithRouting('getBackfillTickets')); + + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(5, $ecotone->sendQueryWithRouting('getBackfillTickets')); + } + + public function test_partitioned_projection_async_backfill_with_batch_of_5_completes_in_single_run(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('batch5_async_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 5, asyncChannelName: 'backfill_channel'), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getBackfillTickets5')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'batch5_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('backfill_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_channel') + ); + + $this->createPartitions($ecotone, 5); + + $ecotone->deleteProjection('batch5_async_projection') + ->initializeProjection('batch5_async_projection'); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'batch5_async_projection']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_channel'); + self::assertCount(1, $messages); + + $ecotone->run('backfill_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(5, $ecotone->sendQueryWithRouting('getBackfillTickets5')); + } + + public function test_partitioned_projection_sync_backfill_processes_all_partitions_immediately(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('sync_partitioned_projection'), + Partitioned(MessageHeaders::EVENT_AGGREGATE_ID), + ProjectionBackfill(backfillPartitionBatchSize: 2), + FromStream(stream: Ticket::class, aggregateType: Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getSyncBackfillTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'sync_partitioned_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->deleteProjection('sync_partitioned_projection') + ->initializeProjection('sync_partitioned_projection'); + + $this->createPartitions($ecotone, 5); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'sync_partitioned_projection']); + + self::assertCount(5, $ecotone->sendQueryWithRouting('getSyncBackfillTickets')); + } + + public function test_global_projection_async_backfill_processes_all_events_after_running_channel(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('global_async_projection'), + ProjectionBackfill(asyncChannelName: 'backfill_global_channel'), + FromStream(Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getGlobalAsyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'global_async_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('backfill_global_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('backfill_global_channel') + ); + + $this->createTickets($ecotone, 3); + + $ecotone->deleteProjection('global_async_projection') + ->initializeProjection('global_async_projection'); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'global_async_projection']); + + self::assertCount(0, $ecotone->sendQueryWithRouting('getGlobalAsyncTickets')); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('backfill_global_channel'); + self::assertCount(1, $messages); + + $ecotone->run('backfill_global_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(3, $ecotone->sendQueryWithRouting('getGlobalAsyncTickets')); + } + + public function test_global_projection_sync_backfill_processes_all_events_immediately(): void + { + $connection = $this->getConnection(); + $projection = new #[ + ProjectionV2('global_sync_projection'), + ProjectionBackfill(), + FromStream(Ticket::class) + ] class ($connection) extends AbstractTicketProjection { + #[QueryHandler('getGlobalSyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'global_sync_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->deleteProjection('global_sync_projection') + ->initializeProjection('global_sync_projection'); + + $this->createTickets($ecotone, 3); + + $ecotone->runConsoleCommand('ecotone:projection:backfill', ['name' => 'global_sync_projection']); + + self::assertCount(3, $ecotone->sendQueryWithRouting('getGlobalSyncTickets')); + } + + private function createPartitions(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string)$i, "User{$i}", "type{$i}")); + } + } + + private function createTickets(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string)$i, "User{$i}", 'alert')); + } + } + + private function bootstrapEcotone(array $classesToResolve, array $services, bool|array $channels, ?TestConfiguration $testConfiguration = null): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [...$classesToResolve, Ticket::class, TicketEventConverter::class], + containerOrAvailableServices: [...$services, new TicketEventConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + enableAsynchronousProcessing: $channels, + licenceKey: LicenceTesting::VALID_LICENCE, + testConfiguration: $testConfiguration, + ); + } +} diff --git a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php index 4f0e17308..c24337882 100644 --- a/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/GapAwarePositionIntegrationTest.php @@ -21,6 +21,7 @@ use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Projecting\ProjectingManager; use Ecotone\Projecting\ProjectionRegistry; +use Ecotone\Projecting\StreamFilter; use Ecotone\Test\LicenceTesting; use Enqueue\Dbal\DbalConnectionFactory; use Psr\Clock\ClockInterface; @@ -113,7 +114,7 @@ public function test_max_gap_offset_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, maxGapOffset: 3, // Only keep gaps within 3 positions gapTimeout: null @@ -151,7 +152,7 @@ public function test_gap_timeout_cleaning(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, gapTimeout: Duration::seconds(5) ); @@ -185,7 +186,7 @@ public function test_gap_cleaning_noop_when_no_gaps(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, maxGapOffset: 1000, gapTimeout: Duration::seconds(5) @@ -207,7 +208,7 @@ public function test_gap_cleaning_noop_when_timeout_disabled(): void $streamSource = new EventStoreGlobalStreamSource( self::$connectionFactory, self::$clock, - Ticket::STREAM_NAME, + new StreamFilter(Ticket::STREAM_NAME), self::$tableNameProvider, maxGapOffset: 1000, gapTimeout: null // No timeout diff --git a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php index 8d9202580..45c5ae03a 100644 --- a/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/ProophIntegrationTest.php @@ -17,7 +17,7 @@ use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Attribute\EventHandler; use Ecotone\Projecting\Attribute\Partitioned; -use Ecotone\Projecting\Attribute\ProjectionBatchSize; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionDeployment; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -358,7 +358,7 @@ public function init(): void public function test_it_handles_batches(): void { $connectionFactory = self::getConnectionFactory(); - $projection = new #[ProjectionV2(self::NAME), ProjectionDeployment(manualKickOff: true), FromStream(Ticket::STREAM_NAME), ProjectionBatchSize(3)] class ($connectionFactory->establishConnection()) extends DbalTicketProjection { + $projection = new #[ProjectionV2(self::NAME), ProjectionDeployment(manualKickOff: true), FromStream(Ticket::STREAM_NAME), ProjectionExecution(eventLoadingBatchSize: 3)] class ($connectionFactory->establishConnection()) extends DbalTicketProjection { public const NAME = 'batch_projection'; public int $flushCallCount = 0; #[ProjectionFlush]