From d1c368d7e043c8266aa3fb29449188e5aab031b5 Mon Sep 17 00:00:00 2001 From: Benoit POLASZEK Date: Tue, 10 Feb 2026 11:12:03 +0100 Subject: [PATCH 1/3] Feat: Batch transform support (BatchTransformerInterface) Allow transforming multiple items at once instead of one-by-one, enabling use cases like concurrent HTTP requests. Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 170 +++++++++++ composer.json | 2 +- src/EtlConfiguration.php | 6 + src/EtlExecutor.php | 74 ++++- src/Internal/EtlBuilderTrait.php | 7 +- src/Processor/IterableProcessor.php | 20 +- src/Transformer/BatchTransformerInterface.php | 20 ++ src/Transformer/CallableBatchTransformer.php | 25 ++ src/functions.php | 3 +- tests/Behavior/BatchTransformTest.php | 288 ++++++++++++++++++ tests/Unit/EtlConfigurationTest.php | 8 + .../CallableBatchTransformerTest.php | 52 ++++ 12 files changed, 667 insertions(+), 8 deletions(-) create mode 100644 CLAUDE.md create mode 100644 src/Transformer/BatchTransformerInterface.php create mode 100644 src/Transformer/CallableBatchTransformer.php create mode 100644 tests/Behavior/BatchTransformTest.php create mode 100644 tests/Unit/Transformer/CallableBatchTransformerTest.php diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..bda3a7c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,170 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +`bentools/etl` is a PHP library implementing the Extract/Transform/Load pattern for data processing workflows. It's designed to be flexible, event-driven, and support both synchronous and asynchronous (ReactPHP) processing. + +**Core concept:** Extract data from a source, apply transformations, and load results into a destination. + +## Commands + +### Testing & Quality +```bash +# Run all CI checks (PHP-CS-Fixer, PHPStan, Pest with coverage) +composer ci:check + +# Run tests only +vendor/bin/pest + +# Run tests with coverage +vendor/bin/pest --coverage + +# Run a single test file +vendor/bin/pest tests/Behavior/FlushTest.php + +# Run PHPStan type checking +vendor/bin/phpstan analyse + +# Run code style fixer +vendor/bin/php-cs-fixer fix +``` + +### Requirements +- PHP >=8.2 +- Tests use Pest (not PHPUnit syntax) +- 100% code coverage expected before PRs + +## Architecture + +### Core Components + +**EtlExecutor** (`src/EtlExecutor.php`) +- Main entry point for building and executing ETL workflows +- Uses builder pattern via `EtlBuilderTrait` to chain extractors, transformers, and loaders +- Dispatches events at each lifecycle stage (init, extract, transform, load, flush, end) +- Handles exceptions through dedicated event types (ExtractException, TransformException, etc.) + +**EtlState** (`src/EtlState.php`) +- Immutable state object passed through the entire workflow +- Tracks: current item, indices, flush timing, loaded items count, output +- Contains context (arbitrary data), source, and destination +- Version system for state updates during processing + +**EtlConfiguration** (`src/EtlConfiguration.php`) +- Configuration object for flush frequency, batch size, and other options +- `flushEvery` - Controls how often the loader flushes (default: INF) +- `batchSize` - Controls how many items are grouped for batch transformation (default: 1) + +### Three Main Interfaces + +1. **ExtractorInterface** (`src/Extractor/`) + - `extract(EtlState $state): iterable` - Returns an iterable of items to process + - Built-in: CSV, JSON, FileExtractor, STDINExtractor, IterableExtractor, ReactStreamExtractor + +2. **TransformerInterface** (`src/Transformer/`) + - `transform(mixed $item, EtlState $state): mixed` - Transforms extracted items + - Return value can be a single value, an array, or a generator (yield) + - Yielded items generate multiple loads from a single extracted item + - Built-in: CallableTransformer, ChainTransformer, NullTransformer + +3. **BatchTransformerInterface** (`src/Transformer/`) + - `transform(array $items, EtlState $state): Generator` - Transforms a batch of items at once + - Separate interface from `TransformerInterface` (does NOT extend it) + - Activated when `batchSize` is set in `EtlConfiguration` and transformer implements this interface + - Each yielded value becomes an individual item for the load phase + - Built-in: CallableBatchTransformer + +4. **LoaderInterface** (`src/Loader/`) + - `load(mixed $item, EtlState $state): void` - Loads transformed items + - `flush(bool $isEarly, EtlState $state): mixed` - Called at flush frequency or end + - Built-in: InMemoryLoader, CSV, JSON, DoctrineORM, STDOUTLoader + +### Event System + +**Event dispatching** (`src/EventDispatcher/`) +- Custom PSR-14 implementation with priority support +- Events: InitEvent, StartEvent, ExtractEvent, TransformEvent, BeforeLoadEvent, LoadEvent, FlushEvent, EndEvent +- Exception events: ExtractExceptionEvent, TransformExceptionEvent, LoadExceptionEvent, FlushExceptionEvent +- Use `->on{EventName}(callable $listener, int $priority = 0)` on EtlExecutor + +**Control flow exceptions:** +- `SkipRequest` - Skip current item, continue processing +- `StopRequest` - Stop entire workflow immediately + +### Processors + +**ProcessorInterface** (`src/Processor/`) +- `IterableProcessor` - Default synchronous processing +- `ReactStreamProcessor` - Async processing with ReactPHP streams (experimental) + +### Recipes + +**Recipe** (`src/Recipe/`) +- Reusable workflow configurations (combine extractors, transformers, loaders, event listeners) +- `FilterRecipe` - Skip/exclude items based on callable filter +- `LoggerRecipe` - PSR-3 logging integration + +### Utility Functions + +`src/functions.php` provides helper functions: +- `extractFrom()` - Create executor starting with extractor +- `transformWith()` - Create executor starting with transformer +- `loadInto()` - Create executor starting with loader +- `withRecipe()` - Create executor with recipe +- `chain()` - Chain multiple extractors/transformers/loaders +- `stdIn()` / `stdOut()` - STDIN/STDOUT helpers +- `skipWhen()` - Conditional skip recipe + +## Key Patterns + +### Immutability & Cloning +- EtlExecutor uses `ClonableTrait` - all builder methods return clones +- EtlState has version tracking - always get latest via `$state->getLastVersion()` + +### Fluent Building +```php +$executor = (new EtlExecutor()) + ->extractFrom($extractor) + ->transformWith($transformer) + ->loadInto($loader) + ->onTransform(fn($event) => /* ... */) + ->process($source, $destination); +``` + +### NextTick Callbacks +- `$state->nextTick(callable $callback)` - Schedule callback after current item +- Useful for deferring operations or cleanup +- Consumed between items and guaranteed to run even if workflow stops + +### Batch Transform +- Configure via `new EtlConfiguration(batchSize: N)` to group N items per batch +- Requires a transformer implementing `BatchTransformerInterface` (separate from `TransformerInterface`) +- Processing flow: items are chunked via `iterable_chunk()`, then for each chunk: + 1. ExtractEvent fires per item (items can be skipped individually) + 2. `transform(array $items, EtlState $state): Generator` is called once for the whole batch + 3. Each yielded result goes through TransformEvent → Load individually +- `nextTick` callbacks are consumed between batches, not between items within a batch +- When `batchSize` is set but transformer is not `BatchTransformerInterface`, batching is ignored +- Note: `$state->currentItemKey` during Transform/Load events points to the last item of the batch + +### Flush Timing +- Configurable via `new EtlConfiguration(flushEvery: N)` +- `flush()` called when: frequency threshold reached, or at end (with `$isEarly = false`) +- Early flush = during processing, final flush = at termination + +## Testing Patterns + +- Tests are organized in `tests/Behavior/` and `tests/Unit/` +- Use Pest syntax (`test()`, `expect()`, `it()`) +- Mock with Mockery when needed +- Coverage is tracked - don't reduce it + +## Development Notes + +- PHP 8.2+ features are welcome (readonly properties, enums, etc.) +- Prefer immutability and value objects +- Event listeners should be side-effect free when possible +- Transformers returning generators (yield) allow 1-to-many transformations +- Loaders can implement `ConditionalLoaderInterface` to skip certain items diff --git a/composer.json b/composer.json index cdd0b61..174636d 100644 --- a/composer.json +++ b/composer.json @@ -6,10 +6,10 @@ "php": ">=8.2", "psr/event-dispatcher": "^1.0", "psr/log": "^3.0", + "bentools/iterable-functions": "^2.1", "symfony/options-resolver": "@stable" }, "require-dev": { - "bentools/iterable-functions": "^2.1", "doctrine/orm": "^2.16", "friendsofphp/php-cs-fixer": "^3.35", "mockery/mockery": "^1.6", diff --git a/src/EtlConfiguration.php b/src/EtlConfiguration.php index df775fa..46ff13d 100644 --- a/src/EtlConfiguration.php +++ b/src/EtlConfiguration.php @@ -14,9 +14,11 @@ class EtlConfiguration { public readonly float|int $flushFrequency; + public readonly int $batchSize; public function __construct( float|int $flushEvery = INF, + int $batchSize = 1, ) { if (INF !== $flushEvery && is_float($flushEvery)) { throw new InvalidArgumentException('Expected \\INF or int, float given.'); @@ -24,6 +26,10 @@ public function __construct( if ($flushEvery < 1) { throw new InvalidArgumentException(sprintf('Expected positive integer > 0, got %d', $flushEvery)); } + if ($batchSize < 1) { + throw new InvalidArgumentException(sprintf('Expected positive integer > 0 for batchSize, got %d', $batchSize)); + } $this->flushFrequency = $flushEvery; + $this->batchSize = $batchSize; } } diff --git a/src/EtlExecutor.php b/src/EtlExecutor.php index f618184..6edd50b 100644 --- a/src/EtlExecutor.php +++ b/src/EtlExecutor.php @@ -28,6 +28,7 @@ use BenTools\ETL\Loader\LoaderInterface; use BenTools\ETL\Processor\IterableProcessor; use BenTools\ETL\Processor\ProcessorInterface; +use BenTools\ETL\Transformer\BatchTransformerInterface; use BenTools\ETL\Transformer\NullTransformer; use BenTools\ETL\Transformer\TransformerInterface; use Psr\EventDispatcher\EventDispatcherInterface; @@ -61,7 +62,7 @@ final class EtlExecutor implements EventDispatcherInterface */ public function __construct( public readonly ExtractorInterface $extractor = new IterableExtractor(), - public readonly TransformerInterface $transformer = new NullTransformer(), + public readonly TransformerInterface|BatchTransformerInterface $transformer = new NullTransformer(), public readonly LoaderInterface $loader = new InMemoryLoader(), public readonly EtlConfiguration $options = new EtlConfiguration(), public readonly ProcessorInterface $processor = new IterableProcessor(), @@ -117,6 +118,43 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void $this->load($itemsToLoad, $state); } + /** + * @param array $chunk Key-value pairs from extraction with preserved keys + */ + public function processItemBatch(array $chunk, EtlState $state): void + { + $items = []; + $isFirstItem = true; + $stopped = false; + + foreach ($chunk as $key => $item) { + $state = $state->update($state->getLastVersion()->withUpdatedItemKey($key)); + + if ($isFirstItem && $state->currentItemIndex > 0) { + $this->consumeNextTick($state); + } + $isFirstItem = false; + + try { + $items[] = $this->emitExtractEvent($state, $item); + } catch (SkipRequest) { + continue; + } catch (StopRequest) { + $stopped = true; + break; + } + } + + if ([] !== $items) { + $itemsToLoad = $this->batchTransform($items, $state); + $this->load($itemsToLoad, $state->getLastVersion()); + } + + if ($stopped) { + throw new StopRequest(); + } + } + /** * @internal */ @@ -159,6 +197,40 @@ private function transform(mixed $item, EtlState $state): array return []; } + /** + * @param list $items + * + * @return list + */ + private function batchTransform(array $items, EtlState $state): array + { + try { + assert($this->transformer instanceof BatchTransformerInterface); + $results = $this->transformer->transform($items, $state->getLastVersion()); + + $allItems = []; + foreach ($results as $singleItem) { + try { + $singleResult = $this->emitTransformEvent( + $state, + TransformResult::create($singleItem), + ); + array_push($allItems, ...$singleResult); + } catch (SkipRequest) { + continue; + } + } + + return $allItems; + } catch (SkipRequest|StopRequest $e) { + throw $e; + } catch (Throwable $e) { + TransformException::emit($this->eventDispatcher, $e, $state); + } + + return []; + } + /** * @param list $items * diff --git a/src/Internal/EtlBuilderTrait.php b/src/Internal/EtlBuilderTrait.php index 4360b31..2016754 100644 --- a/src/Internal/EtlBuilderTrait.php +++ b/src/Internal/EtlBuilderTrait.php @@ -14,6 +14,7 @@ use BenTools\ETL\Loader\LoaderInterface; use BenTools\ETL\Processor\ProcessorInterface; use BenTools\ETL\Recipe\Recipe; +use BenTools\ETL\Transformer\BatchTransformerInterface; use BenTools\ETL\Transformer\CallableTransformer; use BenTools\ETL\Transformer\ChainTransformer; use BenTools\ETL\Transformer\TransformerInterface; @@ -53,9 +54,13 @@ public function extractFrom( } public function transformWith( - TransformerInterface|callable $transformer, + TransformerInterface|BatchTransformerInterface|callable $transformer, TransformerInterface|callable ...$transformers, ): self { + if ($transformer instanceof BatchTransformerInterface) { + return $this->cloneWith(['transformer' => $transformer]); + } + $transformers = [$transformer, ...$transformers]; foreach ($transformers as $t => $_transformer) { diff --git a/src/Processor/IterableProcessor.php b/src/Processor/IterableProcessor.php index f19b70f..836b50f 100644 --- a/src/Processor/IterableProcessor.php +++ b/src/Processor/IterableProcessor.php @@ -8,9 +8,11 @@ use BenTools\ETL\EtlState; use BenTools\ETL\Exception\ExtractException; use BenTools\ETL\Exception\SkipRequest; +use BenTools\ETL\Transformer\BatchTransformerInterface; use Generator; use Throwable; +use function BenTools\IterableFunctions\iterable_chunk; use function is_iterable; /** @@ -28,10 +30,20 @@ public function supports(mixed $extracted): bool */ public function process(EtlExecutor $executor, EtlState $state, mixed $items): EtlState { - foreach ($this->extract($executor, $state, $items) as $key => $item) { - try { - $executor->processItem($item, $key, $state); - } catch (SkipRequest) { + if ($executor->transformer instanceof BatchTransformerInterface) { + $batchSize = $executor->options->batchSize; + foreach (iterable_chunk($this->extract($executor, $state, $items), $batchSize, true) as $chunk) { + try { + $executor->processItemBatch($chunk, $state); + } catch (SkipRequest) { + } + } + } else { + foreach ($this->extract($executor, $state, $items) as $key => $item) { + try { + $executor->processItem($item, $key, $state); + } catch (SkipRequest) { + } } } diff --git a/src/Transformer/BatchTransformerInterface.php b/src/Transformer/BatchTransformerInterface.php new file mode 100644 index 0000000..58dcab0 --- /dev/null +++ b/src/Transformer/BatchTransformerInterface.php @@ -0,0 +1,20 @@ + $items + */ + public function transform(array $items, EtlState $state): Generator; +} diff --git a/src/Transformer/CallableBatchTransformer.php b/src/Transformer/CallableBatchTransformer.php new file mode 100644 index 0000000..e4df6d4 --- /dev/null +++ b/src/Transformer/CallableBatchTransformer.php @@ -0,0 +1,25 @@ +closure = $callback(...); + } + + public function transform(array $items, EtlState $state): Generator + { + yield from ($this->closure)($items, $state); + } +} diff --git a/src/functions.php b/src/functions.php index b4ef7ae..4e9a86e 100644 --- a/src/functions.php +++ b/src/functions.php @@ -15,6 +15,7 @@ use BenTools\ETL\Recipe\FilterRecipe; use BenTools\ETL\Recipe\FilterRecipeMode; use BenTools\ETL\Recipe\Recipe; +use BenTools\ETL\Transformer\BatchTransformerInterface; use BenTools\ETL\Transformer\ChainTransformer; use BenTools\ETL\Transformer\TransformerInterface; use Iterator; @@ -61,7 +62,7 @@ function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface| } function transformWith( - TransformerInterface|callable $transformer, + TransformerInterface|BatchTransformerInterface|callable $transformer, TransformerInterface|callable ...$transformers, ): EtlExecutor { return (new EtlExecutor())->transformWith(...func_get_args()); diff --git a/tests/Behavior/BatchTransformTest.php b/tests/Behavior/BatchTransformTest.php new file mode 100644 index 0000000..5c30dc8 --- /dev/null +++ b/tests/Behavior/BatchTransformTest.php @@ -0,0 +1,288 @@ +transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then + expect($report->output)->toBe(['FOO', 'BAR', 'BAZ', 'QUX']); +}); + +it('handles partial last batch', function () { + // Given + $items = ['a', 'b', 'c', 'd', 'e', 'f', 'g']; + + $batchSizes = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items) use (&$batchSizes) { + $batchSizes[] = count($items); + + return array_map(strtoupper(...), $items); + }, + )) + ->withOptions(new EtlConfiguration(batchSize: 3)); + + // When + $report = $executor->process($items); + + // Then + expect($report->output)->toBe(['A', 'B', 'C', 'D', 'E', 'F', 'G']); + expect($batchSizes)->toBe([3, 3, 1]); +}); + +it('handles empty input', function () { + // Given + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->withOptions(new EtlConfiguration(batchSize: 5)); + + // When + $report = $executor->process([]); + + // Then + expect($report->output)->toBeNull(); +}); + +it('skips items during extract phase', function () { + // Given + $items = ['foo', 'bar', 'baz', 'qux']; + + $batchSizes = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items) use (&$batchSizes) { + $batchSizes[] = count($items); + + return array_map(strtoupper(...), $items); + }, + )) + ->onExtract(function (ExtractEvent $event) { + if ('bar' === $event->item) { + $event->state->skip(); + } + }) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then - 'bar' was skipped, so batch 1 only has 'foo', batch 2 has 'baz'+'qux' + expect($report->output)->toBe(['FOO', 'BAZ', 'QUX']); + expect($batchSizes)->toBe([1, 2]); +}); + +it('stops processing during extract phase', function () { + // Given + $items = ['foo', 'bar', 'baz', 'qux']; + + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->onExtract(function (ExtractEvent $event) { + if ('baz' === $event->item) { + $event->state->stop(); + } + }) + ->withOptions(new EtlConfiguration(batchSize: 4)); + + // When + $report = $executor->process($items); + + // Then - stopped at 'baz', only 'foo' and 'bar' were processed + expect($report->output)->toBe(['FOO', 'BAR']); +}); + +it('skips items during transform phase', function () { + // Given + $items = ['foo', 'bar', 'baz']; + + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->onTransform(function (TransformEvent $event) { + if ('BAR' === [...$event->transformResult][0]) { + $event->state->skip(); + } + }) + ->withOptions(new EtlConfiguration(batchSize: 3)); + + // When + $report = $executor->process($items); + + // Then + expect($report->output)->toBe(['FOO', 'BAZ']); +}); + +it('supports fan-out in batch mode', function () { + // Given + $items = ['foo', 'bar']; + + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items): Generator { + foreach ($items as $item) { + yield $item; + yield strtoupper($item); + } + }, + )) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then + expect($report->output)->toBe(['foo', 'FOO', 'bar', 'BAR']); +}); + +it('works with batchSize of 1', function () { + // Given + $items = ['foo', 'bar']; + + $batchSizes = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items) use (&$batchSizes) { + $batchSizes[] = count($items); + + return array_map(strtoupper(...), $items); + }, + )) + ->withOptions(new EtlConfiguration(batchSize: 1)); + + // When + $report = $executor->process($items); + + // Then - batchSize=1 still uses batch path with chunks of 1 + expect($report->output)->toBe(['FOO', 'BAR']); + expect($batchSizes)->toBe([1, 1]); +}); + +it('ignores batchSize when transformer is not BatchTransformerInterface', function () { + // Given + $items = ['foo', 'bar', 'baz']; + + $executor = (new EtlExecutor()) + ->transformWith(fn (mixed $item) => strtoupper($item)) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then - works normally, no batching + expect($report->output)->toBe(['FOO', 'BAR', 'BAZ']); +}); + +it('works with flushFrequency', function () { + // Given + $items = ['a', 'b', 'c', 'd', 'e', 'f']; + + $flushCount = 0; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->onFlush(function () use (&$flushCount) { + ++$flushCount; + }) + ->withOptions(new EtlConfiguration(batchSize: 3, flushEvery: 3)); + + // When + $report = $executor->process($items); + + // Then + expect($report->output)->toBe(['A', 'B', 'C', 'D', 'E', 'F']); + expect($flushCount)->toBe(3); // flush at 3 items + flush at 6 items + final flush +}); + +it('fires extract events for each item individually', function () { + // Given + $items = ['foo', 'bar', 'baz']; + + $extractedItems = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->onExtract(function (ExtractEvent $event) use (&$extractedItems) { + $extractedItems[] = $event->item; + }) + ->withOptions(new EtlConfiguration(batchSize: 3)); + + // When + $executor->process($items); + + // Then - ExtractEvent fires per item, not per batch + expect($extractedItems)->toBe(['foo', 'bar', 'baz']); +}); + +it('fires transform events for each result individually', function () { + // Given + $items = ['foo', 'bar']; + + $transformedItems = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + )) + ->onTransform(function (TransformEvent $event) use (&$transformedItems) { + $transformedItems[] = [...$event->transformResult][0]; + }) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $executor->process($items); + + // Then - TransformEvent fires per result, not per batch + expect($transformedItems)->toBe(['FOO', 'BAR']); +}); + +it('handles batchSize larger than total items', function () { + // Given + $items = ['foo', 'bar']; + + $batchSizes = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items) use (&$batchSizes) { + $batchSizes[] = count($items); + + return array_map(strtoupper(...), $items); + }, + )) + ->withOptions(new EtlConfiguration(batchSize: 100)); + + // When + $report = $executor->process($items); + + // Then - single batch with all items + expect($report->output)->toBe(['FOO', 'BAR']); + expect($batchSizes)->toBe([2]); +}); diff --git a/tests/Unit/EtlConfigurationTest.php b/tests/Unit/EtlConfigurationTest.php index 3451332..9000bfc 100644 --- a/tests/Unit/EtlConfigurationTest.php +++ b/tests/Unit/EtlConfigurationTest.php @@ -14,3 +14,11 @@ it('denies negative values', function () { new EtlConfiguration(flushEvery: -10); })->throws(InvalidArgumentException::class); + +it('denies negative batchSize values', function () { + new EtlConfiguration(batchSize: -1); +})->throws(InvalidArgumentException::class); + +it('denies zero batchSize', function () { + new EtlConfiguration(batchSize: 0); +})->throws(InvalidArgumentException::class); diff --git a/tests/Unit/Transformer/CallableBatchTransformerTest.php b/tests/Unit/Transformer/CallableBatchTransformerTest.php new file mode 100644 index 0000000..d2c8bc8 --- /dev/null +++ b/tests/Unit/Transformer/CallableBatchTransformerTest.php @@ -0,0 +1,52 @@ + array_map(strtoupper(...), $items), + ); + + // When + $transformed = $transformer->transform(['foo', 'bar'], $state); + + // Then + expect([...$transformed])->toBe(['FOO', 'BAR']); +}); + +it('handles single-item batches', function () { + // Given + $state = new EtlState(); + $transformer = new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + ); + + // When + $transformed = $transformer->transform(['foo'], $state); + + // Then + expect([...$transformed])->toBe(['FOO']); +}); + +it('handles empty batches', function () { + // Given + $state = new EtlState(); + $transformer = new CallableBatchTransformer( + fn (array $items) => array_map(strtoupper(...), $items), + ); + + // When + $transformed = $transformer->transform([], $state); + + // Then + expect([...$transformed])->toBe([]); +}); From d5d9041702ccc9e4a8cb595ee7ac3efba04b31e9 Mon Sep 17 00:00:00 2001 From: Benoit POLASZEK Date: Tue, 10 Feb 2026 11:24:34 +0100 Subject: [PATCH 2/3] Docs: Add batch transform documentation Co-Authored-By: Claude Opus 4.6 --- README.md | 1 + doc/advanced_usage.md | 68 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/README.md b/README.md index 06658ca..e76aaeb 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Table of Contents - [Advanced Usage](doc/advanced_usage.md) - [Creating your own Extractor / Transformers / Loaders](doc/advanced_usage.md#creating-your-own-extractor--transformers--loaders) - [Difference between yield and return in transformers](doc/advanced_usage.md#difference-between-yield-and-return-in-transformers) + - [Batch transforms](doc/advanced_usage.md#batch-transforms) - [Next tick](doc/advanced_usage.md#next-tick) - [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders) - [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout) diff --git a/doc/advanced_usage.md b/doc/advanced_usage.md index f24b9de..a2e1fce 100644 --- a/doc/advanced_usage.md +++ b/doc/advanced_usage.md @@ -55,6 +55,74 @@ But the last transformer of the chain (or your only one transformer) is determin - If your transformer `yields` values, each yielded value will be passed to the loader (and the loader will be called for each yielded value). +Batch transforms +----------------- + +By default, transformers process items one-by-one. But sometimes you want to process multiple items at once — for example, sending concurrent HTTP requests instead of waiting for each response sequentially. + +The `BatchTransformerInterface` allows you to transform a batch of items in a single call: + +```php +use BenTools\ETL\EtlConfiguration; +use BenTools\ETL\EtlExecutor; +use BenTools\ETL\EtlState; +use BenTools\ETL\Transformer\CallableBatchTransformer; + +$executor = (new EtlExecutor()) + ->extractFrom($urlExtractor) + ->transformWith(new CallableBatchTransformer( + function (array $items, EtlState $state): array { + // $items contains a batch of URLs (e.g., 10 at a time) + // Send all HTTP requests concurrently + $responses = $httpClient->sendConcurrent( + array_map(fn ($url) => new Request('GET', $url), $items) + ); + + return array_map( + fn ($response) => json_decode($response->getBody(), true), + $responses + ); + } + )) + ->loadInto($loader) + ->withOptions(new EtlConfiguration(batchSize: 10)) + ->process($urls); +``` + +The `batchSize` option in `EtlConfiguration` controls how many items are grouped into each batch. Each batch is passed as an array to your transformer's `transform(array $items, EtlState $state): Generator` method. + +> [!NOTE] +> `BatchTransformerInterface` is a **separate interface** from `TransformerInterface` — it does not extend it. +> When `batchSize` is configured but the transformer does not implement `BatchTransformerInterface`, batching is silently ignored. + +You can also implement `BatchTransformerInterface` directly for more complex use cases: + +```php +use BenTools\ETL\Transformer\BatchTransformerInterface; +use Generator; + +final class ConcurrentApiTransformer implements BatchTransformerInterface +{ + public function __construct( + private readonly HttpClientInterface $httpClient, + ) {} + + public function transform(array $items, EtlState $state): Generator + { + $responses = $this->httpClient->sendConcurrent( + array_map(fn ($item) => new Request('GET', $item['url']), $items) + ); + + foreach ($responses as $i => $response) { + yield [...$items[$i], 'data' => json_decode($response->getBody(), true)]; + } + } +} +``` + +> [!TIP] +> Each value yielded by the generator becomes an individual item for the load phase, so you can also implement fan-out (yielding more items than inputs). + Next tick --------- From c92c63695afb7be54a202f69ef3a46d5c040b933 Mon Sep 17 00:00:00 2001 From: Benoit POLASZEK Date: Tue, 10 Feb 2026 11:26:41 +0100 Subject: [PATCH 3/3] Tests: Cover batchTransform exception handling paths Co-Authored-By: Claude Opus 4.6 --- tests/Behavior/BatchTransformTest.php | 98 +++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/tests/Behavior/BatchTransformTest.php b/tests/Behavior/BatchTransformTest.php index 5c30dc8..f020c0e 100644 --- a/tests/Behavior/BatchTransformTest.php +++ b/tests/Behavior/BatchTransformTest.php @@ -6,10 +6,14 @@ use BenTools\ETL\EtlConfiguration; use BenTools\ETL\EtlExecutor; +use BenTools\ETL\EtlState; use BenTools\ETL\EventDispatcher\Event\ExtractEvent; use BenTools\ETL\EventDispatcher\Event\TransformEvent; +use BenTools\ETL\EventDispatcher\Event\TransformExceptionEvent; +use BenTools\ETL\Exception\TransformException; use BenTools\ETL\Transformer\CallableBatchTransformer; use Generator; +use RuntimeException; use function strtoupper; @@ -286,3 +290,97 @@ function (array $items) use (&$batchSizes) { expect($report->output)->toBe(['FOO', 'BAR']); expect($batchSizes)->toBe([2]); }); + +it('resumes processing when transform exception is dismissed', function () { + // Given + $items = ['foo', 'bar', 'baz', 'qux']; + + $loaded = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items) { + if (in_array('baz', $items, true)) { + throw new RuntimeException('Batch failed'); + } + + return array_map(strtoupper(...), $items); + }, + )) + ->loadInto(function (string $item) use (&$loaded) { + $loaded[] = $item; + }) + ->onTransformException(function (TransformExceptionEvent $event) { + $event->removeException(); + }) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then - first batch OK, second batch exception dismissed (returns []), third batch continues + expect($loaded)->toBe(['FOO', 'BAR']); +}); + +it('wraps transformer exceptions into TransformException', function () { + // Given + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items) { + throw new RuntimeException('API is down'); + }, + )) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When / Then + $executor->process(['foo', 'bar']); +})->throws(TransformException::class, 'Error during transformation.'); + +it('propagates StopRequest thrown by batch transformer', function () { + // Given + $items = ['foo', 'bar', 'baz', 'qux']; + + $loaded = []; + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items, EtlState $state) { + if (in_array('baz', $items, true)) { + $state->stop(); + } + + return array_map(strtoupper(...), $items); + }, + )) + ->loadInto(function (string $item) use (&$loaded) { + $loaded[] = $item; + }) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then - first batch processed, second batch triggers stop + expect($loaded)->toBe(['FOO', 'BAR']); +}); + +it('propagates SkipRequest thrown by batch transformer', function () { + // Given + $items = ['foo', 'bar', 'baz', 'qux']; + + $executor = (new EtlExecutor()) + ->transformWith(new CallableBatchTransformer( + function (array $items, EtlState $state) { + if (in_array('baz', $items, true)) { + $state->skip(); + } + + return array_map(strtoupper(...), $items); + }, + )) + ->withOptions(new EtlConfiguration(batchSize: 2)); + + // When + $report = $executor->process($items); + + // Then - first batch loaded, second batch skipped entirely + expect($report->output)->toBe(['FOO', 'BAR']); +});