Skip to content

Commit 9386ed5

Browse files
bpolaszekclaude
andauthored
Feat: Batch transform support (BatchTransformerInterface) (#60)
--------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 26fdf50 commit 9386ed5

14 files changed

+834
-8
lines changed

CLAUDE.md

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Project Overview
6+
7+
`bentools/etl` is a PHP library implementing the Extract/Transform/Load pattern for data processing workflows. It's designed to be flexible, event-driven, and support both synchronous and asynchronous (ReactPHP) processing.
8+
9+
**Core concept:** Extract data from a source, apply transformations, and load results into a destination.
10+
11+
## Commands
12+
13+
### Testing & Quality
14+
```bash
15+
# Run all CI checks (PHP-CS-Fixer, PHPStan, Pest with coverage)
16+
composer ci:check
17+
18+
# Run tests only
19+
vendor/bin/pest
20+
21+
# Run tests with coverage
22+
vendor/bin/pest --coverage
23+
24+
# Run a single test file
25+
vendor/bin/pest tests/Behavior/FlushTest.php
26+
27+
# Run PHPStan type checking
28+
vendor/bin/phpstan analyse
29+
30+
# Run code style fixer
31+
vendor/bin/php-cs-fixer fix
32+
```
33+
34+
### Requirements
35+
- PHP >=8.2
36+
- Tests use Pest (not PHPUnit syntax)
37+
- 100% code coverage expected before PRs
38+
39+
## Architecture
40+
41+
### Core Components
42+
43+
**EtlExecutor** (`src/EtlExecutor.php`)
44+
- Main entry point for building and executing ETL workflows
45+
- Uses builder pattern via `EtlBuilderTrait` to chain extractors, transformers, and loaders
46+
- Dispatches events at each lifecycle stage (init, extract, transform, load, flush, end)
47+
- Handles exceptions through dedicated event types (ExtractException, TransformException, etc.)
48+
49+
**EtlState** (`src/EtlState.php`)
50+
- Immutable state object passed through the entire workflow
51+
- Tracks: current item, indices, flush timing, loaded items count, output
52+
- Contains context (arbitrary data), source, and destination
53+
- Version system for state updates during processing
54+
55+
**EtlConfiguration** (`src/EtlConfiguration.php`)
56+
- Configuration object for flush frequency, batch size, and other options
57+
- `flushEvery` - Controls how often the loader flushes (default: INF)
58+
- `batchSize` - Controls how many items are grouped for batch transformation (default: 1)
59+
60+
### Three Main Interfaces
61+
62+
1. **ExtractorInterface** (`src/Extractor/`)
63+
- `extract(EtlState $state): iterable` - Returns an iterable of items to process
64+
- Built-in: CSV, JSON, FileExtractor, STDINExtractor, IterableExtractor, ReactStreamExtractor
65+
66+
2. **TransformerInterface** (`src/Transformer/`)
67+
- `transform(mixed $item, EtlState $state): mixed` - Transforms extracted items
68+
- Return value can be a single value, an array, or a generator (yield)
69+
- Yielded items generate multiple loads from a single extracted item
70+
- Built-in: CallableTransformer, ChainTransformer, NullTransformer
71+
72+
3. **BatchTransformerInterface** (`src/Transformer/`)
73+
- `transform(array $items, EtlState $state): Generator` - Transforms a batch of items at once
74+
- Separate interface from `TransformerInterface` (does NOT extend it)
75+
- Activated when `batchSize` is set in `EtlConfiguration` and transformer implements this interface
76+
- Each yielded value becomes an individual item for the load phase
77+
- Built-in: CallableBatchTransformer
78+
79+
4. **LoaderInterface** (`src/Loader/`)
80+
- `load(mixed $item, EtlState $state): void` - Loads transformed items
81+
- `flush(bool $isEarly, EtlState $state): mixed` - Called at flush frequency or end
82+
- Built-in: InMemoryLoader, CSV, JSON, DoctrineORM, STDOUTLoader
83+
84+
### Event System
85+
86+
**Event dispatching** (`src/EventDispatcher/`)
87+
- Custom PSR-14 implementation with priority support
88+
- Events: InitEvent, StartEvent, ExtractEvent, TransformEvent, BeforeLoadEvent, LoadEvent, FlushEvent, EndEvent
89+
- Exception events: ExtractExceptionEvent, TransformExceptionEvent, LoadExceptionEvent, FlushExceptionEvent
90+
- Use `->on{EventName}(callable $listener, int $priority = 0)` on EtlExecutor
91+
92+
**Control flow exceptions:**
93+
- `SkipRequest` - Skip current item, continue processing
94+
- `StopRequest` - Stop entire workflow immediately
95+
96+
### Processors
97+
98+
**ProcessorInterface** (`src/Processor/`)
99+
- `IterableProcessor` - Default synchronous processing
100+
- `ReactStreamProcessor` - Async processing with ReactPHP streams (experimental)
101+
102+
### Recipes
103+
104+
**Recipe** (`src/Recipe/`)
105+
- Reusable workflow configurations (combine extractors, transformers, loaders, event listeners)
106+
- `FilterRecipe` - Skip/exclude items based on callable filter
107+
- `LoggerRecipe` - PSR-3 logging integration
108+
109+
### Utility Functions
110+
111+
`src/functions.php` provides helper functions:
112+
- `extractFrom()` - Create executor starting with extractor
113+
- `transformWith()` - Create executor starting with transformer
114+
- `loadInto()` - Create executor starting with loader
115+
- `withRecipe()` - Create executor with recipe
116+
- `chain()` - Chain multiple extractors/transformers/loaders
117+
- `stdIn()` / `stdOut()` - STDIN/STDOUT helpers
118+
- `skipWhen()` - Conditional skip recipe
119+
120+
## Key Patterns
121+
122+
### Immutability & Cloning
123+
- EtlExecutor uses `ClonableTrait` - all builder methods return clones
124+
- EtlState has version tracking - always get latest via `$state->getLastVersion()`
125+
126+
### Fluent Building
127+
```php
128+
$executor = (new EtlExecutor())
129+
->extractFrom($extractor)
130+
->transformWith($transformer)
131+
->loadInto($loader)
132+
->onTransform(fn($event) => /* ... */)
133+
->process($source, $destination);
134+
```
135+
136+
### NextTick Callbacks
137+
- `$state->nextTick(callable $callback)` - Schedule callback after current item
138+
- Useful for deferring operations or cleanup
139+
- Consumed between items and guaranteed to run even if workflow stops
140+
141+
### Batch Transform
142+
- Configure via `new EtlConfiguration(batchSize: N)` to group N items per batch
143+
- Requires a transformer implementing `BatchTransformerInterface` (separate from `TransformerInterface`)
144+
- Processing flow: items are chunked via `iterable_chunk()`, then for each chunk:
145+
1. ExtractEvent fires per item (items can be skipped individually)
146+
2. `transform(array $items, EtlState $state): Generator` is called once for the whole batch
147+
3. Each yielded result goes through TransformEvent → Load individually
148+
- `nextTick` callbacks are consumed between batches, not between items within a batch
149+
- When `batchSize` is set but transformer is not `BatchTransformerInterface`, batching is ignored
150+
- Note: `$state->currentItemKey` during Transform/Load events points to the last item of the batch
151+
152+
### Flush Timing
153+
- Configurable via `new EtlConfiguration(flushEvery: N)`
154+
- `flush()` called when: frequency threshold reached, or at end (with `$isEarly = false`)
155+
- Early flush = during processing, final flush = at termination
156+
157+
## Testing Patterns
158+
159+
- Tests are organized in `tests/Behavior/` and `tests/Unit/`
160+
- Use Pest syntax (`test()`, `expect()`, `it()`)
161+
- Mock with Mockery when needed
162+
- Coverage is tracked - don't reduce it
163+
164+
## Development Notes
165+
166+
- PHP 8.2+ features are welcome (readonly properties, enums, etc.)
167+
- Prefer immutability and value objects
168+
- Event listeners should be side-effect free when possible
169+
- Transformers returning generators (yield) allow 1-to-many transformations
170+
- Loaders can implement `ConditionalLoaderInterface` to skip certain items

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Table of Contents
2323
- [Advanced Usage](doc/advanced_usage.md)
2424
- [Creating your own Extractor / Transformers / Loaders](doc/advanced_usage.md#creating-your-own-extractor--transformers--loaders)
2525
- [Difference between yield and return in transformers](doc/advanced_usage.md#difference-between-yield-and-return-in-transformers)
26+
- [Batch transforms](doc/advanced_usage.md#batch-transforms)
2627
- [Next tick](doc/advanced_usage.md#next-tick)
2728
- [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders)
2829
- [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout)

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
"php": ">=8.2",
77
"psr/event-dispatcher": "^1.0",
88
"psr/log": "^3.0",
9+
"bentools/iterable-functions": "^2.1",
910
"symfony/options-resolver": "@stable"
1011
},
1112
"require-dev": {
12-
"bentools/iterable-functions": "^2.1",
1313
"doctrine/orm": "^2.16",
1414
"friendsofphp/php-cs-fixer": "^3.35",
1515
"mockery/mockery": "^1.6",

doc/advanced_usage.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,74 @@ But the last transformer of the chain (or your only one transformer) is determin
5555
- If your transformer `yields` values, each yielded value will be passed to the loader (and the loader will be called for each yielded value).
5656

5757

58+
Batch transforms
59+
-----------------
60+
61+
By default, transformers process items one-by-one. But sometimes you want to process multiple items at once — for example, sending concurrent HTTP requests instead of waiting for each response sequentially.
62+
63+
The `BatchTransformerInterface` allows you to transform a batch of items in a single call:
64+
65+
```php
66+
use BenTools\ETL\EtlConfiguration;
67+
use BenTools\ETL\EtlExecutor;
68+
use BenTools\ETL\EtlState;
69+
use BenTools\ETL\Transformer\CallableBatchTransformer;
70+
71+
$executor = (new EtlExecutor())
72+
->extractFrom($urlExtractor)
73+
->transformWith(new CallableBatchTransformer(
74+
function (array $items, EtlState $state): array {
75+
// $items contains a batch of URLs (e.g., 10 at a time)
76+
// Send all HTTP requests concurrently
77+
$responses = $httpClient->sendConcurrent(
78+
array_map(fn ($url) => new Request('GET', $url), $items)
79+
);
80+
81+
return array_map(
82+
fn ($response) => json_decode($response->getBody(), true),
83+
$responses
84+
);
85+
}
86+
))
87+
->loadInto($loader)
88+
->withOptions(new EtlConfiguration(batchSize: 10))
89+
->process($urls);
90+
```
91+
92+
The `batchSize` option in `EtlConfiguration` controls how many items are grouped into each batch. Each batch is passed as an array to your transformer's `transform(array $items, EtlState $state): Generator` method.
93+
94+
> [!NOTE]
95+
> `BatchTransformerInterface` is a **separate interface** from `TransformerInterface` — it does not extend it.
96+
> When `batchSize` is configured but the transformer does not implement `BatchTransformerInterface`, batching is silently ignored.
97+
98+
You can also implement `BatchTransformerInterface` directly for more complex use cases:
99+
100+
```php
101+
use BenTools\ETL\Transformer\BatchTransformerInterface;
102+
use Generator;
103+
104+
final class ConcurrentApiTransformer implements BatchTransformerInterface
105+
{
106+
public function __construct(
107+
private readonly HttpClientInterface $httpClient,
108+
) {}
109+
110+
public function transform(array $items, EtlState $state): Generator
111+
{
112+
$responses = $this->httpClient->sendConcurrent(
113+
array_map(fn ($item) => new Request('GET', $item['url']), $items)
114+
);
115+
116+
foreach ($responses as $i => $response) {
117+
yield [...$items[$i], 'data' => json_decode($response->getBody(), true)];
118+
}
119+
}
120+
}
121+
```
122+
123+
> [!TIP]
124+
> Each value yielded by the generator becomes an individual item for the load phase, so you can also implement fan-out (yielding more items than inputs).
125+
58126
Next tick
59127
---------
60128

src/EtlConfiguration.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,22 @@
1414
class EtlConfiguration
1515
{
1616
public readonly float|int $flushFrequency;
17+
public readonly int $batchSize;
1718

1819
public function __construct(
1920
float|int $flushEvery = INF,
21+
int $batchSize = 1,
2022
) {
2123
if (INF !== $flushEvery && is_float($flushEvery)) {
2224
throw new InvalidArgumentException('Expected \\INF or int, float given.');
2325
}
2426
if ($flushEvery < 1) {
2527
throw new InvalidArgumentException(sprintf('Expected positive integer > 0, got %d', $flushEvery));
2628
}
29+
if ($batchSize < 1) {
30+
throw new InvalidArgumentException(sprintf('Expected positive integer > 0 for batchSize, got %d', $batchSize));
31+
}
2732
$this->flushFrequency = $flushEvery;
33+
$this->batchSize = $batchSize;
2834
}
2935
}

src/EtlExecutor.php

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use BenTools\ETL\Loader\LoaderInterface;
2929
use BenTools\ETL\Processor\IterableProcessor;
3030
use BenTools\ETL\Processor\ProcessorInterface;
31+
use BenTools\ETL\Transformer\BatchTransformerInterface;
3132
use BenTools\ETL\Transformer\NullTransformer;
3233
use BenTools\ETL\Transformer\TransformerInterface;
3334
use Psr\EventDispatcher\EventDispatcherInterface;
@@ -61,7 +62,7 @@ final class EtlExecutor implements EventDispatcherInterface
6162
*/
6263
public function __construct(
6364
public readonly ExtractorInterface $extractor = new IterableExtractor(),
64-
public readonly TransformerInterface $transformer = new NullTransformer(),
65+
public readonly TransformerInterface|BatchTransformerInterface $transformer = new NullTransformer(),
6566
public readonly LoaderInterface $loader = new InMemoryLoader(),
6667
public readonly EtlConfiguration $options = new EtlConfiguration(),
6768
public readonly ProcessorInterface $processor = new IterableProcessor(),
@@ -117,6 +118,43 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void
117118
$this->load($itemsToLoad, $state);
118119
}
119120

121+
/**
122+
* @param array<mixed, mixed> $chunk Key-value pairs from extraction with preserved keys
123+
*/
124+
public function processItemBatch(array $chunk, EtlState $state): void
125+
{
126+
$items = [];
127+
$isFirstItem = true;
128+
$stopped = false;
129+
130+
foreach ($chunk as $key => $item) {
131+
$state = $state->update($state->getLastVersion()->withUpdatedItemKey($key));
132+
133+
if ($isFirstItem && $state->currentItemIndex > 0) {
134+
$this->consumeNextTick($state);
135+
}
136+
$isFirstItem = false;
137+
138+
try {
139+
$items[] = $this->emitExtractEvent($state, $item);
140+
} catch (SkipRequest) {
141+
continue;
142+
} catch (StopRequest) {
143+
$stopped = true;
144+
break;
145+
}
146+
}
147+
148+
if ([] !== $items) {
149+
$itemsToLoad = $this->batchTransform($items, $state);
150+
$this->load($itemsToLoad, $state->getLastVersion());
151+
}
152+
153+
if ($stopped) {
154+
throw new StopRequest();
155+
}
156+
}
157+
120158
/**
121159
* @internal
122160
*/
@@ -159,6 +197,40 @@ private function transform(mixed $item, EtlState $state): array
159197
return [];
160198
}
161199

200+
/**
201+
* @param list<mixed> $items
202+
*
203+
* @return list<mixed>
204+
*/
205+
private function batchTransform(array $items, EtlState $state): array
206+
{
207+
try {
208+
assert($this->transformer instanceof BatchTransformerInterface);
209+
$results = $this->transformer->transform($items, $state->getLastVersion());
210+
211+
$allItems = [];
212+
foreach ($results as $singleItem) {
213+
try {
214+
$singleResult = $this->emitTransformEvent(
215+
$state,
216+
TransformResult::create($singleItem),
217+
);
218+
array_push($allItems, ...$singleResult);
219+
} catch (SkipRequest) {
220+
continue;
221+
}
222+
}
223+
224+
return $allItems;
225+
} catch (SkipRequest|StopRequest $e) {
226+
throw $e;
227+
} catch (Throwable $e) {
228+
TransformException::emit($this->eventDispatcher, $e, $state);
229+
}
230+
231+
return [];
232+
}
233+
162234
/**
163235
* @param list<mixed> $items
164236
*

0 commit comments

Comments
 (0)