Skip to content

Commit cc2bc01

Browse files
authored
Feat: Async Batched Backfill (#606)
* feat: async batched projection backfills * stream filtering * backfill batch * naming * fixes * async backfill * fixes * use stream filter * fixes * fixes
1 parent 3a0594c commit cc2bc01

38 files changed

+1126
-327
lines changed

Monorepo/Benchmark/ProjectingBenchmark.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public function bench_ecotone_projection_backfill(): void
174174
Assert::assertEquals([],
175175
self::$ecotone->getQueryBus()->sendWithRouting('product.getPriceChange', self::$expectedProductIds[0])
176176
);
177-
$projectionManager->backfill();
177+
$projectionManager->prepareBackfill();
178178
Assert::assertEquals([
179179
new PriceChange(100, 0),
180180
new PriceChange(120, 20),

Monorepo/ExampleAppEventSourcing/EcotoneProjection/EcotoneConfiguration.php

Lines changed: 0 additions & 22 deletions
This file was deleted.

Monorepo/ExampleAppEventSourcing/EcotoneProjection/PriceChangeOverTimeProjectionWithEcotoneProjection.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
*/
55
namespace Monorepo\ExampleAppEventSourcing\EcotoneProjection;
66

7+
use Ecotone\EventSourcing\Attribute\FromAggregateStream;
78
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
89
use Ecotone\Modelling\Attribute\EventHandler;
910
use Ecotone\Modelling\Attribute\QueryHandler;
11+
use Ecotone\Projecting\Attribute\ProjectionV2;
1012
use Monorepo\ExampleAppEventSourcing\Common\Event\PriceWasChanged;
1113
use Monorepo\ExampleAppEventSourcing\Common\Event\ProductWasRegistered;
1214
use Monorepo\ExampleAppEventSourcing\Common\PriceChange;
15+
use Monorepo\ExampleAppEventSourcing\Common\Product;
1316

14-
#[\Ecotone\Projecting\Attribute\ProjectionV2("price_change_over_time")]
17+
#[ProjectionV2("price_change_over_time")]
18+
#[FromAggregateStream(Product::class)]
1519
class PriceChangeOverTimeProjectionWithEcotoneProjection
1620
{
1721
public const NAME = "price_change_over_time";

packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php renamed to packages/Ecotone/src/EventSourcing/Attribute/FromAggregateStream.php

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,37 @@
11
<?php
22

3+
/*
4+
* licence Enterprise
5+
*/
36
declare(strict_types=1);
47

58
namespace Ecotone\EventSourcing\Attribute;
69

710
use Attribute;
811
use Ecotone\EventSourcing\EventStore;
9-
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
1012

11-
/*
13+
/**
1214
* Configures a projection to read from an aggregate's event stream.
13-
* Automatically reads Stream and AggregateType attributes from the aggregate class.
14-
*
15-
* This simplifies projection configuration by avoiding duplication of stream
16-
* and aggregate type configuration that is already defined on the aggregate.
15+
* Automatically resolves Stream and AggregateType from the aggregate class.
1716
*
1817
* Example usage:
1918
* ```php
2019
* #[ProjectionV2('order_list')]
21-
* #[AggregateStream(Order::class)]
20+
* #[FromAggregateStream(Order::class)]
2221
* class OrderListProjection { ... }
2322
* ```
24-
*
25-
* licence Enterprise
2623
*/
2724
#[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)]
28-
class FromAggregateStream
25+
readonly class FromAggregateStream
2926
{
3027
/**
31-
* @param class-string $aggregateClass The aggregate class to read Stream and AggregateType from.
28+
* @param class-string $aggregateClass The aggregate class to read stream info from.
3229
* Must be an EventSourcingAggregate.
33-
* @param string $eventStoreReferenceName Reference name for the event store
3430
*/
3531
public function __construct(
36-
public readonly string $aggregateClass,
37-
public readonly string $eventStoreReferenceName = EventStore::class
32+
public string $aggregateClass,
33+
public string $eventStoreReferenceName = EventStore::class
3834
) {
3935
}
4036
}
37+

packages/PdoEventSourcing/src/Attribute/FromStream.php renamed to packages/Ecotone/src/EventSourcing/Attribute/FromStream.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
use Attribute;
1111
use Ecotone\EventSourcing\EventStore;
1212

13+
/**
14+
* Configures a projection to read from a specific event stream.
15+
*/
1316
#[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)]
1417
readonly class FromStream
1518
{
@@ -20,3 +23,4 @@ public function __construct(
2023
) {
2124
}
2225
}
26+

packages/Ecotone/src/Lite/Test/FlowTestSupport.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public function triggerProjection(string|array $projectionName): self
237237

238238
foreach ($projectionName as $name) {
239239
if ($this->getGateway(ProjectionRegistry::class)->has($name)) {
240-
$this->getGateway(ProjectionRegistry::class)->get($name)->backfill();
240+
$this->getGateway(ProjectionRegistry::class)->get($name)->prepareBackfill();
241241
} else {
242242
$this->getGateway(ProjectionManager::class)->triggerProjection($name);
243243
}

packages/Ecotone/src/Messaging/Config/ModuleClassList.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
use Ecotone\Projecting\Config\ProjectingAttributeModule;
6969
use Ecotone\Projecting\Config\ProjectingConsoleCommands;
7070
use Ecotone\Projecting\Config\ProjectingModule;
71+
use Ecotone\Projecting\Config\StreamFilterRegistryModule;
7172
use Ecotone\Projecting\EventStoreAdapter\EventStoreAdapterModule;
7273
use Ecotone\Redis\Configuration\RedisMessageConsumerModule;
7374
use Ecotone\Redis\Configuration\RedisMessagePublisherModule;
@@ -115,6 +116,7 @@ class ModuleClassList
115116
EventSourcedRepositoryModule::class,
116117
ProjectingModule::class,
117118
ProjectingAttributeModule::class,
119+
StreamFilterRegistryModule::class,
118120
EventStoreAdapterModule::class,
119121

120122
/** Attribute based configurations */
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
/*
4+
* licence Enterprise
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Ecotone\Projecting\Attribute;
9+
10+
use Attribute;
11+
use InvalidArgumentException;
12+
13+
/**
14+
* Configure projection backfill settings.
15+
* This attribute controls how partitions are batched during backfill operations.
16+
*/
17+
#[Attribute(Attribute::TARGET_CLASS)]
18+
class ProjectionBackfill
19+
{
20+
public function __construct(
21+
/**
22+
* Number of partitions to process in a single batch during backfill.
23+
* Must be at least 1.
24+
*/
25+
public readonly int $backfillPartitionBatchSize = 100,
26+
/**
27+
* Async channel name for backfill operations.
28+
* When set, backfill batches are sent to this channel first, then routed to the backfill handler.
29+
* When null, backfill executes synchronously.
30+
*/
31+
public readonly ?string $asyncChannelName = null,
32+
) {
33+
if ($this->backfillPartitionBatchSize < 1) {
34+
throw new InvalidArgumentException('Backfill partition batch size must be at least 1');
35+
}
36+
}
37+
}
38+

packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/*
4+
* licence Enterprise
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Ecotone\Projecting\Attribute;
9+
10+
use Attribute;
11+
12+
#[Attribute(Attribute::TARGET_CLASS)]
13+
class ProjectionExecution
14+
{
15+
public function __construct(
16+
/**
17+
* Configure the batch size for loading events during projection execution.
18+
* * This controls how many events are loaded from the stream in a single batch.
19+
20+
*/
21+
public readonly int $eventLoadingBatchSize
22+
)
23+
{
24+
}
25+
}

0 commit comments

Comments
 (0)