Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

`bentools/etl` is a PHP library implementing the Extract/Transform/Load pattern for data processing workflows. It's designed to be flexible, event-driven, and support both synchronous and asynchronous (ReactPHP) processing.

**Core concept:** Extract data from a source, apply transformations, and load results into a destination.

## Commands

### Testing & Quality
```bash
# Run all CI checks (PHP-CS-Fixer, PHPStan, Pest with coverage)
composer ci:check

# Run tests only
vendor/bin/pest

# Run tests with coverage
vendor/bin/pest --coverage

# Run a single test file
vendor/bin/pest tests/Behavior/FlushTest.php

# Run PHPStan type checking
vendor/bin/phpstan analyse

# Run code style fixer
vendor/bin/php-cs-fixer fix
```

### Requirements
- PHP >=8.2
- Tests use Pest (not PHPUnit syntax)
- 100% code coverage expected before PRs

## Architecture

### Core Components

**EtlExecutor** (`src/EtlExecutor.php`)
- Main entry point for building and executing ETL workflows
- Uses builder pattern via `EtlBuilderTrait` to chain extractors, transformers, and loaders
- Dispatches events at each lifecycle stage (init, extract, transform, load, flush, end)
- Handles exceptions through dedicated event types (ExtractException, TransformException, etc.)

**EtlState** (`src/EtlState.php`)
- Immutable state object passed through the entire workflow
- Tracks: current item, indices, flush timing, loaded items count, output
- Contains context (arbitrary data), source, and destination
- Version system for state updates during processing

**EtlConfiguration** (`src/EtlConfiguration.php`)
- Configuration object for flush frequency, batch size, and other options
- `flushEvery` - Controls how often the loader flushes (default: INF)
- `batchSize` - Controls how many items are grouped for batch transformation (default: 1)

### Three Main Interfaces

1. **ExtractorInterface** (`src/Extractor/`)
- `extract(EtlState $state): iterable` - Returns an iterable of items to process
- Built-in: CSV, JSON, FileExtractor, STDINExtractor, IterableExtractor, ReactStreamExtractor

2. **TransformerInterface** (`src/Transformer/`)
- `transform(mixed $item, EtlState $state): mixed` - Transforms extracted items
- Return value can be a single value, an array, or a generator (yield)
- Yielded items generate multiple loads from a single extracted item
- Built-in: CallableTransformer, ChainTransformer, NullTransformer

3. **BatchTransformerInterface** (`src/Transformer/`)
- `transform(array $items, EtlState $state): Generator` - Transforms a batch of items at once
- Separate interface from `TransformerInterface` (does NOT extend it)
- Activated when `batchSize` is set in `EtlConfiguration` and transformer implements this interface
- Each yielded value becomes an individual item for the load phase
- Built-in: CallableBatchTransformer

4. **LoaderInterface** (`src/Loader/`)
- `load(mixed $item, EtlState $state): void` - Loads transformed items
- `flush(bool $isEarly, EtlState $state): mixed` - Called at flush frequency or end
- Built-in: InMemoryLoader, CSV, JSON, DoctrineORM, STDOUTLoader

### Event System

**Event dispatching** (`src/EventDispatcher/`)
- Custom PSR-14 implementation with priority support
- Events: InitEvent, StartEvent, ExtractEvent, TransformEvent, BeforeLoadEvent, LoadEvent, FlushEvent, EndEvent
- Exception events: ExtractExceptionEvent, TransformExceptionEvent, LoadExceptionEvent, FlushExceptionEvent
- Use `->on{EventName}(callable $listener, int $priority = 0)` on EtlExecutor

**Control flow exceptions:**
- `SkipRequest` - Skip current item, continue processing
- `StopRequest` - Stop entire workflow immediately

### Processors

**ProcessorInterface** (`src/Processor/`)
- `IterableProcessor` - Default synchronous processing
- `ReactStreamProcessor` - Async processing with ReactPHP streams (experimental)

### Recipes

**Recipe** (`src/Recipe/`)
- Reusable workflow configurations (combine extractors, transformers, loaders, event listeners)
- `FilterRecipe` - Skip/exclude items based on callable filter
- `LoggerRecipe` - PSR-3 logging integration

### Utility Functions

`src/functions.php` provides helper functions:
- `extractFrom()` - Create executor starting with extractor
- `transformWith()` - Create executor starting with transformer
- `loadInto()` - Create executor starting with loader
- `withRecipe()` - Create executor with recipe
- `chain()` - Chain multiple extractors/transformers/loaders
- `stdIn()` / `stdOut()` - STDIN/STDOUT helpers
- `skipWhen()` - Conditional skip recipe

## Key Patterns

### Immutability & Cloning
- EtlExecutor uses `ClonableTrait` - all builder methods return clones
- EtlState has version tracking - always get latest via `$state->getLastVersion()`

### Fluent Building
```php
$executor = (new EtlExecutor())
->extractFrom($extractor)
->transformWith($transformer)
->loadInto($loader)
->onTransform(fn($event) => /* ... */)
->process($source, $destination);
```

### NextTick Callbacks
- `$state->nextTick(callable $callback)` - Schedule callback after current item
- Useful for deferring operations or cleanup
- Consumed between items and guaranteed to run even if workflow stops

### Batch Transform
- Configure via `new EtlConfiguration(batchSize: N)` to group N items per batch
- Requires a transformer implementing `BatchTransformerInterface` (separate from `TransformerInterface`)
- Processing flow: items are chunked via `iterable_chunk()`, then for each chunk:
1. ExtractEvent fires per item (items can be skipped individually)
2. `transform(array $items, EtlState $state): Generator` is called once for the whole batch
3. Each yielded result goes through TransformEvent → Load individually
- `nextTick` callbacks are consumed between batches, not between items within a batch
- When `batchSize` is set but transformer is not `BatchTransformerInterface`, batching is ignored
- Note: `$state->currentItemKey` during Transform/Load events points to the last item of the batch

### Flush Timing
- Configurable via `new EtlConfiguration(flushEvery: N)`
- `flush()` called when: frequency threshold reached, or at end (with `$isEarly = false`)
- Early flush = during processing, final flush = at termination

## Testing Patterns

- Tests are organized in `tests/Behavior/` and `tests/Unit/`
- Use Pest syntax (`test()`, `expect()`, `it()`)
- Mock with Mockery when needed
- Coverage is tracked - don't reduce it

## Development Notes

- PHP 8.2+ features are welcome (readonly properties, enums, etc.)
- Prefer immutability and value objects
- Event listeners should be side-effect free when possible
- Transformers returning generators (yield) allow 1-to-many transformations
- Loaders can implement `ConditionalLoaderInterface` to skip certain items
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Table of Contents
- [Advanced Usage](doc/advanced_usage.md)
- [Creating your own Extractor / Transformers / Loaders](doc/advanced_usage.md#creating-your-own-extractor--transformers--loaders)
- [Difference between yield and return in transformers](doc/advanced_usage.md#difference-between-yield-and-return-in-transformers)
- [Batch transforms](doc/advanced_usage.md#batch-transforms)
- [Next tick](doc/advanced_usage.md#next-tick)
- [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders)
- [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout)
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"php": ">=8.2",
"psr/event-dispatcher": "^1.0",
"psr/log": "^3.0",
"bentools/iterable-functions": "^2.1",
"symfony/options-resolver": "@stable"
},
"require-dev": {
"bentools/iterable-functions": "^2.1",
"doctrine/orm": "^2.16",
"friendsofphp/php-cs-fixer": "^3.35",
"mockery/mockery": "^1.6",
Expand Down
68 changes: 68 additions & 0 deletions doc/advanced_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,74 @@ But the last transformer of the chain (or your only one transformer) is determin
- If your transformer `yields` values, each yielded value will be passed to the loader (and the loader will be called for each yielded value).


Batch transforms
-----------------

By default, transformers process items one-by-one. But sometimes you want to process multiple items at once — for example, sending concurrent HTTP requests instead of waiting for each response sequentially.

The `BatchTransformerInterface` allows you to transform a batch of items in a single call:

```php
use BenTools\ETL\EtlConfiguration;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\Transformer\CallableBatchTransformer;

$executor = (new EtlExecutor())
->extractFrom($urlExtractor)
->transformWith(new CallableBatchTransformer(
function (array $items, EtlState $state): array {
// $items contains a batch of URLs (e.g., 10 at a time)
// Send all HTTP requests concurrently
$responses = $httpClient->sendConcurrent(
array_map(fn ($url) => new Request('GET', $url), $items)
);

return array_map(
fn ($response) => json_decode($response->getBody(), true),
$responses
);
}
))
->loadInto($loader)
->withOptions(new EtlConfiguration(batchSize: 10))
->process($urls);
```

The `batchSize` option in `EtlConfiguration` controls how many items are grouped into each batch. Each batch is passed as an array to your transformer's `transform(array $items, EtlState $state): Generator` method.

> [!NOTE]
> `BatchTransformerInterface` is a **separate interface** from `TransformerInterface` — it does not extend it.
> When `batchSize` is configured but the transformer does not implement `BatchTransformerInterface`, batching is silently ignored.

You can also implement `BatchTransformerInterface` directly for more complex use cases:

```php
use BenTools\ETL\Transformer\BatchTransformerInterface;
use Generator;

final class ConcurrentApiTransformer implements BatchTransformerInterface
{
public function __construct(
private readonly HttpClientInterface $httpClient,
) {}

public function transform(array $items, EtlState $state): Generator
{
$responses = $this->httpClient->sendConcurrent(
array_map(fn ($item) => new Request('GET', $item['url']), $items)
);

foreach ($responses as $i => $response) {
yield [...$items[$i], 'data' => json_decode($response->getBody(), true)];
}
}
}
```

> [!TIP]
> Each value yielded by the generator becomes an individual item for the load phase, so you can also implement fan-out (yielding more items than inputs).

Next tick
---------

Expand Down
6 changes: 6 additions & 0 deletions src/EtlConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
class EtlConfiguration
{
public readonly float|int $flushFrequency;
public readonly int $batchSize;

public function __construct(
float|int $flushEvery = INF,
int $batchSize = 1,
) {
if (INF !== $flushEvery && is_float($flushEvery)) {
throw new InvalidArgumentException('Expected \\INF or int, float given.');
}
if ($flushEvery < 1) {
throw new InvalidArgumentException(sprintf('Expected positive integer > 0, got %d', $flushEvery));
}
if ($batchSize < 1) {
throw new InvalidArgumentException(sprintf('Expected positive integer > 0 for batchSize, got %d', $batchSize));
}
$this->flushFrequency = $flushEvery;
$this->batchSize = $batchSize;
}
}
74 changes: 73 additions & 1 deletion src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Processor\IterableProcessor;
use BenTools\ETL\Processor\ProcessorInterface;
use BenTools\ETL\Transformer\BatchTransformerInterface;
use BenTools\ETL\Transformer\NullTransformer;
use BenTools\ETL\Transformer\TransformerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
Expand Down Expand Up @@ -61,7 +62,7 @@ final class EtlExecutor implements EventDispatcherInterface
*/
public function __construct(
public readonly ExtractorInterface $extractor = new IterableExtractor(),
public readonly TransformerInterface $transformer = new NullTransformer(),
public readonly TransformerInterface|BatchTransformerInterface $transformer = new NullTransformer(),
public readonly LoaderInterface $loader = new InMemoryLoader(),
public readonly EtlConfiguration $options = new EtlConfiguration(),
public readonly ProcessorInterface $processor = new IterableProcessor(),
Expand Down Expand Up @@ -117,6 +118,43 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void
$this->load($itemsToLoad, $state);
}

/**
* @param array<mixed, mixed> $chunk Key-value pairs from extraction with preserved keys
*/
public function processItemBatch(array $chunk, EtlState $state): void
{
$items = [];
$isFirstItem = true;
$stopped = false;

foreach ($chunk as $key => $item) {
$state = $state->update($state->getLastVersion()->withUpdatedItemKey($key));

if ($isFirstItem && $state->currentItemIndex > 0) {
$this->consumeNextTick($state);
}
$isFirstItem = false;

try {
$items[] = $this->emitExtractEvent($state, $item);
} catch (SkipRequest) {
continue;
} catch (StopRequest) {
$stopped = true;
break;
}
}

if ([] !== $items) {
$itemsToLoad = $this->batchTransform($items, $state);
$this->load($itemsToLoad, $state->getLastVersion());
}

if ($stopped) {
throw new StopRequest();
}
}

/**
* @internal
*/
Expand Down Expand Up @@ -159,6 +197,40 @@ private function transform(mixed $item, EtlState $state): array
return [];
}

/**
* @param list<mixed> $items
*
* @return list<mixed>
*/
private function batchTransform(array $items, EtlState $state): array
{
try {
assert($this->transformer instanceof BatchTransformerInterface);
$results = $this->transformer->transform($items, $state->getLastVersion());

$allItems = [];
foreach ($results as $singleItem) {
try {
$singleResult = $this->emitTransformEvent(
$state,
TransformResult::create($singleItem),
);
array_push($allItems, ...$singleResult);
} catch (SkipRequest) {
continue;
}
}

return $allItems;
} catch (SkipRequest|StopRequest $e) {
throw $e;
} catch (Throwable $e) {
TransformException::emit($this->eventDispatcher, $e, $state);
}

return [];
}

/**
* @param list<mixed> $items
*
Expand Down
Loading