|
6 | 6 |
|
7 | 7 | use BenTools\ETL\EtlConfiguration; |
8 | 8 | use BenTools\ETL\EtlExecutor; |
| 9 | +use BenTools\ETL\EtlState; |
9 | 10 | use BenTools\ETL\EventDispatcher\Event\ExtractEvent; |
10 | 11 | use BenTools\ETL\EventDispatcher\Event\TransformEvent; |
| 12 | +use BenTools\ETL\EventDispatcher\Event\TransformExceptionEvent; |
| 13 | +use BenTools\ETL\Exception\TransformException; |
11 | 14 | use BenTools\ETL\Transformer\CallableBatchTransformer; |
12 | 15 | use Generator; |
| 16 | +use RuntimeException; |
13 | 17 |
|
14 | 18 | use function strtoupper; |
15 | 19 |
|
@@ -286,3 +290,97 @@ function (array $items) use (&$batchSizes) { |
286 | 290 | expect($report->output)->toBe(['FOO', 'BAR']); |
287 | 291 | expect($batchSizes)->toBe([2]); |
288 | 292 | }); |
| 293 | + |
| 294 | +it('resumes processing when transform exception is dismissed', function () { |
| 295 | + // Given |
| 296 | + $items = ['foo', 'bar', 'baz', 'qux']; |
| 297 | + |
| 298 | + $loaded = []; |
| 299 | + $executor = (new EtlExecutor()) |
| 300 | + ->transformWith(new CallableBatchTransformer( |
| 301 | + function (array $items) { |
| 302 | + if (in_array('baz', $items, true)) { |
| 303 | + throw new RuntimeException('Batch failed'); |
| 304 | + } |
| 305 | + |
| 306 | + return array_map(strtoupper(...), $items); |
| 307 | + }, |
| 308 | + )) |
| 309 | + ->loadInto(function (string $item) use (&$loaded) { |
| 310 | + $loaded[] = $item; |
| 311 | + }) |
| 312 | + ->onTransformException(function (TransformExceptionEvent $event) { |
| 313 | + $event->removeException(); |
| 314 | + }) |
| 315 | + ->withOptions(new EtlConfiguration(batchSize: 2)); |
| 316 | + |
| 317 | + // When |
| 318 | + $report = $executor->process($items); |
| 319 | + |
| 320 | + // Then - first batch OK, second batch exception dismissed (returns []), third batch continues |
| 321 | + expect($loaded)->toBe(['FOO', 'BAR']); |
| 322 | +}); |
| 323 | + |
| 324 | +it('wraps transformer exceptions into TransformException', function () { |
| 325 | + // Given |
| 326 | + $executor = (new EtlExecutor()) |
| 327 | + ->transformWith(new CallableBatchTransformer( |
| 328 | + function (array $items) { |
| 329 | + throw new RuntimeException('API is down'); |
| 330 | + }, |
| 331 | + )) |
| 332 | + ->withOptions(new EtlConfiguration(batchSize: 2)); |
| 333 | + |
| 334 | + // When / Then |
| 335 | + $executor->process(['foo', 'bar']); |
| 336 | +})->throws(TransformException::class, 'Error during transformation.'); |
| 337 | + |
| 338 | +it('propagates StopRequest thrown by batch transformer', function () { |
| 339 | + // Given |
| 340 | + $items = ['foo', 'bar', 'baz', 'qux']; |
| 341 | + |
| 342 | + $loaded = []; |
| 343 | + $executor = (new EtlExecutor()) |
| 344 | + ->transformWith(new CallableBatchTransformer( |
| 345 | + function (array $items, EtlState $state) { |
| 346 | + if (in_array('baz', $items, true)) { |
| 347 | + $state->stop(); |
| 348 | + } |
| 349 | + |
| 350 | + return array_map(strtoupper(...), $items); |
| 351 | + }, |
| 352 | + )) |
| 353 | + ->loadInto(function (string $item) use (&$loaded) { |
| 354 | + $loaded[] = $item; |
| 355 | + }) |
| 356 | + ->withOptions(new EtlConfiguration(batchSize: 2)); |
| 357 | + |
| 358 | + // When |
| 359 | + $report = $executor->process($items); |
| 360 | + |
| 361 | + // Then - first batch processed, second batch triggers stop |
| 362 | + expect($loaded)->toBe(['FOO', 'BAR']); |
| 363 | +}); |
| 364 | + |
| 365 | +it('propagates SkipRequest thrown by batch transformer', function () { |
| 366 | + // Given |
| 367 | + $items = ['foo', 'bar', 'baz', 'qux']; |
| 368 | + |
| 369 | + $executor = (new EtlExecutor()) |
| 370 | + ->transformWith(new CallableBatchTransformer( |
| 371 | + function (array $items, EtlState $state) { |
| 372 | + if (in_array('baz', $items, true)) { |
| 373 | + $state->skip(); |
| 374 | + } |
| 375 | + |
| 376 | + return array_map(strtoupper(...), $items); |
| 377 | + }, |
| 378 | + )) |
| 379 | + ->withOptions(new EtlConfiguration(batchSize: 2)); |
| 380 | + |
| 381 | + // When |
| 382 | + $report = $executor->process($items); |
| 383 | + |
| 384 | + // Then - first batch loaded, second batch skipped entirely |
| 385 | + expect($report->output)->toBe(['FOO', 'BAR']); |
| 386 | +}); |
0 commit comments