Skip to content

Commit a081b60

Browse files
authored
RetryLoader (#1845)
* Added Retry Loader * Updated DSL definitions
1 parent f3f2a4f commit a081b60

File tree

30 files changed

+1669
-6
lines changed

30 files changed

+1669
-6
lines changed

documentation/components/core/core.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,8 @@ For detailed information about specific DataFrame operations, see the following
105105
- **[Constraints](constraints.md)** - Data integrity constraints and business rules
106106
- **[Error Handling](error-handling.md)** - Error management strategies
107107

108+
### Reliability & Recovery
109+
- **[Retry Mechanisms](retry.md)** - Automatic retry for transient failures
110+
108111
### Output & Display
109112
- **[Display](display.md)** - Data visualization and output
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
# Retry Mechanisms
2+
3+
- [⬅️ Back](core.md)
4+
- [📚 API Reference](/documentation/api/core)
5+
6+
The Flow ETL framework provides robust retry mechanisms to handle transient failures during data loading operations.
7+
This is essential for building resilient data pipelines that can recover from temporary network issues, database
8+
connection problems, or resource availability conflicts.
9+
10+
## Overview
11+
12+
The retry system focuses on **loader operations** - the final step where processed data is written to its destination.
13+
When a loader encounters a temporary failure, the retry mechanism can automatically reattempt the operation according to
14+
configurable strategies.
15+
16+
## Key Components
17+
18+
### RetryLoader
19+
20+
The `RetryLoader` is a decorator that wraps any existing loader with retry capabilities. It implements the same `Loader`
21+
interface, making it transparent to use in your data pipelines.
22+
23+
```php
24+
<?php
25+
26+
use function Flow\ETL\DSL\{
27+
data_frame,
28+
from_array,
29+
write_with_retries,
30+
retry_any_throwable,
31+
delay_fixed,
32+
duration_milliseconds
33+
};
34+
35+
$dataFrame = data_frame()
36+
->read(from_array([
37+
['id' => 1, 'name' => 'John'],
38+
['id' => 2, 'name' => 'Jane']
39+
]))
40+
->write(write_with_retries(
41+
to_some_service(...),
42+
retry_any_throwable(3), // Retry up to 3 times
43+
delay_fixed(duration_milliseconds(500)) // Wait 500ms between retries
44+
))
45+
->run();
46+
```
47+
48+
## Retry Strategies
49+
50+
Retry strategies determine **when** to retry an operation based on the type of exception thrown.
51+
52+
### AnyThrowable Strategy
53+
54+
Retries on any thrown exception up to the specified limit:
55+
56+
```php
57+
use function Flow\ETL\DSL\retry_any_throwable;
58+
59+
$strategy = retry_any_throwable(5); // Retry up to 5 times on any exception
60+
```
61+
62+
### Specific Exception Types Strategy
63+
64+
Retries only for specified exception types, allowing you to be selective about which failures should trigger retries:
65+
66+
```php
67+
use function Flow\ETL\DSL\retry_on_exception_types;
68+
69+
$strategy = retry_on_exception_types([
70+
\PDOException::class, // Database connection issues
71+
\RuntimeException::class, // Runtime problems
72+
ConnectException::class, // Network connectivity issues
73+
], 3);
74+
```
75+
76+
This is useful when you want to retry transient failures but immediately fail on logic errors or data validation issues.
77+
78+
## Delay Factories
79+
80+
Delay factories determine **how long** to wait between retry attempts. Different strategies help avoid overwhelming
81+
failing services while providing appropriate backoff behavior.
82+
83+
### Fixed Delay
84+
85+
Wait a consistent amount of time between each retry:
86+
87+
```php
88+
use function Flow\ETL\DSL\{delay_fixed, duration_milliseconds, duration_seconds};
89+
90+
$delay = delay_fixed(duration_milliseconds(200)); // Wait 200ms between retries
91+
$delay = delay_fixed(duration_seconds(1)); // Wait 1 second between retries
92+
```
93+
94+
### Linear Backoff
95+
96+
Increase the delay by a fixed increment on each retry:
97+
98+
```php
99+
use function Flow\ETL\DSL\delay_linear;
100+
101+
// Start with 100ms, add 50ms each retry: 100ms, 150ms, 200ms, 250ms...
102+
$delay = delay_linear(
103+
duration_milliseconds(100), // Initial delay
104+
duration_milliseconds(50) // Increment per retry
105+
);
106+
```
107+
108+
### Exponential Backoff
109+
110+
Double (or multiply by a factor) the delay on each retry:
111+
112+
```php
113+
use function Flow\ETL\DSL\delay_exponential;
114+
115+
// Start with 100ms, double each retry: 100ms, 200ms, 400ms, 800ms...
116+
$delay = delay_exponential(
117+
duration_milliseconds(100), // Base delay
118+
2, // Multiplier
119+
duration_seconds(5) // Maximum delay cap
120+
);
121+
```
122+
123+
### Jitter
124+
125+
Add randomness to any delay strategy to prevent "thundering herd" problems when multiple processes retry simultaneously:
126+
127+
```php
128+
use function Flow\ETL\DSL\delay_jitter;
129+
130+
// Add ±20% random variation to a fixed delay
131+
$delay = delay_jitter(
132+
delay_fixed(duration_milliseconds(500)),
133+
0.2 // 20% jitter factor (0.0 to 1.0)
134+
);
135+
```
136+
137+
## Idempotent vs Non-Idempotent Operations
138+
139+
Understanding the difference between idempotent and non-idempotent operations is crucial for designing reliable retry
140+
mechanisms.
141+
142+
### Idempotent Operations (Recommended)
143+
144+
Idempotent operations can be safely repeated without causing unintended side effects. The same operation executed
145+
multiple times produces the same result.
146+
147+
**Examples of idempotent loader operations:**
148+
149+
- Database `UPSERT` (INSERT ON CONFLICT UPDATE)
150+
- File overwrites
151+
- HTTP PUT requests
152+
- Database UPDATE with specific WHERE clauses
153+
154+
```php
155+
// Idempotent: Safe to retry
156+
$loader = new DatabaseUpsertLoader($connection, 'users');
157+
$retryLoader = write_with_retries($loader, retry_any_throwable(5));
158+
```
159+
160+
### Non-Idempotent Operations (Use with Caution)
161+
162+
Non-idempotent operations may produce different results or unintended side effects when repeated.
163+
164+
**Examples of non-idempotent operations:**
165+
166+
- Database `INSERT` without conflict resolution
167+
- File appends
168+
- Counter increments
169+
170+
## Advanced Configuration
171+
172+
### Custom Sleep Implementation
173+
174+
For testing or special requirements, you can provide a custom sleep implementation:
175+
176+
```php
177+
use Flow\ETL\Time\FakeSleep;
178+
179+
$sleep = new FakeSleep(); // For testing - doesn't actually sleep
180+
$retryLoader = write_with_retries(
181+
$loader,
182+
retry_any_throwable(3),
183+
delay_fixed(duration_milliseconds(100)),
184+
$sleep
185+
);
186+
```
187+
188+
### Complete Configuration Example
189+
190+
```php
191+
<?php
192+
193+
use function Flow\ETL\DSL\{
194+
data_frame,
195+
from_array,
196+
write_with_retries,
197+
retry_on_exception_types,
198+
delay_jitter,
199+
delay_exponential,
200+
duration_milliseconds,
201+
duration_seconds
202+
};
203+
204+
$result = data_frame()
205+
->read(from_array($largeDataset))
206+
->write(write_with_retries(
207+
to_database($connection, 'transactions'),
208+
209+
// Only retry on specific transient failures
210+
retry_on_exception_types([
211+
\PDOException::class,
212+
\RuntimeException::class
213+
], 5),
214+
215+
// Exponential backoff with jitter
216+
delay_jitter(
217+
delay_exponential(
218+
duration_milliseconds(200), // Start with 200ms
219+
2, // Double each time
220+
duration_seconds(10) // Cap at 10 seconds
221+
),
222+
0.3 // 30% jitter to prevent thundering herd
223+
)
224+
))
225+
->run();
226+
```
227+
228+
## Error Information
229+
230+
When all retries are exhausted, a `FailedRetryException` is thrown containing detailed information about all attempts:
231+
232+
```php
233+
use Flow\ETL\Exception\FailedRetryException;
234+
235+
try {
236+
$dataFrame->write($retryLoader)->run();
237+
} catch (FailedRetryException $e) {
238+
echo "Failed after {$e->getRetriesRecord()->count()} attempts\n";
239+
240+
// Access individual retry attempts
241+
foreach ($e->getRetriesRecord()->all() as $retry) {
242+
echo "Attempt {$retry->attempt()}: {$retry->exception()->getMessage()}\n";
243+
echo "Timestamp: {$retry->timestamp()->format('Y-m-d H:i:s')}\n";
244+
}
245+
}
246+
```

src/core/etl/src/Flow/ETL/DSL/functions.php

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
types as types_new
3838
};
3939
use Flow\Calculator\Rounding;
40-
use Flow\ETL\{
41-
Analyze,
40+
use Flow\ETL\{Analyze,
4241
Attribute\DocumentationDSL,
4342
Attribute\DocumentationExample,
4443
Attribute\Module,
@@ -63,6 +62,8 @@
6362
NativePHPRandomValueGenerator,
6463
Pipeline,
6564
RandomValueGenerator,
65+
Retry\DelayFactory,
66+
Retry\RetryStrategy,
6667
Row,
6768
Rows,
6869
Schema,
@@ -72,8 +73,7 @@
7273
Transformation,
7374
Transformer,
7475
Window,
75-
WithEntry
76-
};
76+
WithEntry};
7777
use Flow\ETL\ErrorHandler\{IgnoreError, SkipRows, ThrowError};
7878
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException, SchemaDefinitionNotFoundException};
7979
use Flow\ETL\Extractor\{CacheExtractor, ChainExtractor, ChunkExtractor, DataFrameExtractor, FilesExtractor, MemoryExtractor, PathPartitionsExtractor, PipelineExtractor, RowsExtractor, SequenceExtractor};
@@ -153,10 +153,13 @@
153153
use Flow\ETL\Function\ArraySort\Sort;
154154
use Flow\ETL\Function\Between\Boundary;
155155
use Flow\ETL\Function\MatchCases\MatchCondition;
156-
use Flow\ETL\Loader\{ArrayLoader, CallbackLoader, MemoryLoader, StreamLoader, TransformerLoader};
156+
use Flow\ETL\Loader\{ArrayLoader, CallbackLoader, MemoryLoader, RetryLoader, StreamLoader, TransformerLoader};
157157
use Flow\ETL\Loader\BranchingLoader;
158158
use Flow\ETL\Loader\StreamLoader\Output;
159159
use Flow\ETL\Memory\Memory;
160+
use Flow\ETL\Retry\DelayFactory\{Exponential, Jitter, Linear};
161+
use Flow\ETL\Retry\DelayFactory\{Fixed, Fixed\FixedMilliseconds};
162+
use Flow\ETL\Retry\RetryStrategy\{AnyThrowable, OnExceptionTypes};
160163
use Flow\ETL\Row\{Entries, EntryFactory, SortOrder};
161164
use Flow\ETL\Row\Entry\{BooleanEntry, DateEntry, DateTimeEntry, EnumEntry, FloatEntry, IntegerEntry, JsonEntry, ListEntry, MapEntry, StringEntry, StructureEntry, TimeEntry, UuidEntry, XMLElementEntry, XMLEntry};
162165
use Flow\ETL\Row\{Entry, EntryReference, Reference, References};
@@ -165,6 +168,7 @@
165168
use Flow\ETL\Schema\Formatter\{JsonSchemaFormatter, PHPSchemaFormatter};
166169
use Flow\ETL\Schema\Metadata;
167170
use Flow\ETL\Schema\Validator\{EvolvingValidator, SelectiveValidator, StrictValidator};
171+
use Flow\ETL\Time\{Duration, Sleep, SystemSleep};
168172
use Flow\ETL\Transformer\OrderEntries\{CombinedComparator, Comparator, NameComparator, Order, TypeComparator, TypePriorities};
169173
use Flow\ETL\Transformer\Rename\{RenameCaseEntryStrategy, RenameReplaceEntryStrategy};
170174
use Flow\Filesystem\{Filesystem, Local\NativeLocalFilesystem, Partition, Partitions, Path};
@@ -2270,3 +2274,79 @@ function match_condition(mixed $condition, mixed $then) : MatchCondition
22702274
{
22712275
return new MatchCondition($condition, $then);
22722276
}
2277+
2278+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2279+
function retry_any_throwable(int $limit) : AnyThrowable
2280+
{
2281+
return new AnyThrowable($limit);
2282+
}
2283+
2284+
/**
2285+
* @param array<class-string<\Throwable>> $exception_types
2286+
*/
2287+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2288+
function retry_on_exception_types(array $exception_types, int $limit) : OnExceptionTypes
2289+
{
2290+
return new OnExceptionTypes($exception_types, $limit);
2291+
}
2292+
2293+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2294+
function delay_linear(Duration $delay, Duration $increment) : Linear
2295+
{
2296+
return new Linear($delay, $increment);
2297+
}
2298+
2299+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2300+
function delay_exponential(Duration $base, int $multiplier = 2, ?Duration $max_delay = null) : Exponential
2301+
{
2302+
return new Exponential($base, $multiplier, $max_delay);
2303+
}
2304+
2305+
/**
2306+
* @param float $jitter_factor a value between 0 and 1 representing the maximum percentage of jitter to apply
2307+
*/
2308+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2309+
function delay_jitter(DelayFactory $delay, float $jitter_factor) : Jitter
2310+
{
2311+
return new Jitter($delay, $jitter_factor);
2312+
}
2313+
2314+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2315+
function delay_fixed(Duration $delay) : Fixed
2316+
{
2317+
return new Fixed($delay);
2318+
}
2319+
2320+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2321+
function duration_seconds(int $seconds) : Duration
2322+
{
2323+
return Duration::fromSeconds($seconds);
2324+
}
2325+
2326+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2327+
function duration_milliseconds(int $milliseconds) : Duration
2328+
{
2329+
return Duration::fromMilliseconds($milliseconds);
2330+
}
2331+
2332+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2333+
function duration_microseconds(int $microseconds) : Duration
2334+
{
2335+
return Duration::fromMicroseconds($microseconds);
2336+
}
2337+
2338+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
2339+
function duration_minutes(int $minutes) : Duration
2340+
{
2341+
return Duration::fromMinutes($minutes);
2342+
}
2343+
2344+
#[DocumentationDSL(module: Module::CORE, type: DSLType::LOADER)]
2345+
function write_with_retries(
2346+
Loader $loader,
2347+
RetryStrategy $retry_strategy = new AnyThrowable(3),
2348+
DelayFactory $delay_factory = new FixedMilliseconds(200),
2349+
Sleep $sleep = new SystemSleep(),
2350+
) : RetryLoader {
2351+
return new RetryLoader($loader, $retry_strategy, $delay_factory, $sleep);
2352+
}

0 commit comments

Comments
 (0)