Skip to content

Commit 841ad67

Browse files
authored
Allow to apply custom transformations before writing to a final destination (#1408)
* Allow to apply custom transformations before writing to a final destination * Updated DSL definitions
1 parent bd2c728 commit 841ad67

File tree

7 files changed

+140
-14
lines changed

7 files changed

+140
-14
lines changed

src/core/etl/src/Flow/ETL/DSL/functions.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@
129129
RandomValueGenerator,
130130
Row,
131131
Rows,
132+
Transformation,
132133
Transformer,
133134
Window};
134135
use Flow\Filesystem\Stream\Mode;
@@ -329,13 +330,13 @@ function to_stream(string $uri, int|bool $truncate = 20, Output $output = Output
329330
}
330331

331332
#[DocumentationDSL(module: Module::CORE, type: DSLType::LOADER)]
332-
function to_transformation(Transformer $transformer, Loader $loader) : TransformerLoader
333+
function to_transformation(Transformer|Transformation $transformer, Loader $loader) : TransformerLoader
333334
{
334335
return new TransformerLoader($transformer, $loader);
335336
}
336337

337338
#[DocumentationDSL(module: Module::CORE, type: DSLType::LOADER)]
338-
function to_branch(ScalarFunction $condition, Loader $loader) : Loader
339+
function to_branch(ScalarFunction $condition, Loader $loader) : Loader\BranchingLoader
339340
{
340341
return new Loader\BranchingLoader($condition, $loader);
341342
}

src/core/etl/src/Flow/ETL/Loader/BranchingLoader.php

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44

55
namespace Flow\ETL\Loader;
66

7+
use function Flow\ETL\DSL\{df, from_rows};
78
use Flow\ETL\Function\ScalarFunction;
89
use Flow\ETL\Transformer\ScalarFunctionFilterTransformer;
9-
use Flow\ETL\{FlowContext, Loader, Rows};
10+
use Flow\ETL\{FlowContext, Loader, Rows, Transformation};
1011

11-
final readonly class BranchingLoader implements Closure, Loader, OverridingLoader
12+
final class BranchingLoader implements Closure, Loader, OverridingLoader
1213
{
14+
private ?Transformation $transformation = null;
15+
1316
public function __construct(
14-
private ScalarFunction $condition,
15-
private Loader $loader,
17+
private readonly ScalarFunction $condition,
18+
private readonly Loader $loader,
1619
) {
1720
}
1821

@@ -25,8 +28,17 @@ public function closure(FlowContext $context) : void
2528

2629
public function load(Rows $rows, FlowContext $context) : void
2730
{
31+
$rows = (new ScalarFunctionFilterTransformer($this->condition))->transform($rows, $context);
32+
33+
if ($this->transformation) {
34+
$rows = df($context->config)
35+
->read(from_rows($rows))
36+
->with($this->transformation)
37+
->fetch();
38+
}
39+
2840
$this->loader->load(
29-
(new ScalarFunctionFilterTransformer($this->condition))->transform($rows, $context),
41+
$rows,
3042
$context
3143
);
3244
}
@@ -37,4 +49,11 @@ public function loaders() : array
3749
$this->loader,
3850
];
3951
}
52+
53+
public function withTransformation(Transformation $transformation) : self
54+
{
55+
$this->transformation = $transformation;
56+
57+
return $this;
58+
}
4059
}

src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,26 @@
44

55
namespace Flow\ETL\Loader;
66

7-
use Flow\ETL\{FlowContext, Loader, Rows, Transformer};
7+
use function Flow\ETL\DSL\{df, from_rows};
8+
use Flow\ETL\{FlowContext, Loader, Rows, Transformation, Transformer};
89

910
final readonly class TransformerLoader implements Loader, OverridingLoader
1011
{
1112
public function __construct(
12-
private Transformer $transformer,
13+
private Transformer|Transformation $transformer,
1314
private Loader $loader,
1415
) {
1516
}
1617

1718
public function load(Rows $rows, FlowContext $context) : void
1819
{
19-
$this->loader->load($this->transformer->transform($rows, $context), $context);
20+
if ($this->transformer instanceof Transformer) {
21+
$rows = $this->transformer->transform($rows, $context);
22+
} else {
23+
$rows = df()->from(from_rows($rows))->with($this->transformer)->fetch();
24+
}
25+
26+
$this->loader->load($rows, $context);
2027
}
2128

2229
public function loaders() : array

src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/BranchingTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use function Flow\ETL\DSL\{df, from_array, lit, ref, to_branch, to_memory};
88
use Flow\ETL\Memory\ArrayMemory;
99
use Flow\ETL\Tests\FlowIntegrationTestCase;
10+
use Flow\ETL\{DataFrame, Transformation};
1011

1112
final class BranchingTest extends FlowIntegrationTestCase
1213
{
@@ -52,4 +53,57 @@ public function test_branching() : void
5253
$memoryBC->dump(),
5354
);
5455
}
56+
57+
public function test_branching_with_transformation() : void
58+
{
59+
df()
60+
->read(from_array([
61+
['id' => 1, 'group' => 'A'],
62+
['id' => 2, 'group' => 'B'],
63+
['id' => 3, 'group' => 'A'],
64+
['id' => 4, 'group' => 'B'],
65+
['id' => 5, 'group' => 'A'],
66+
['id' => 6, 'group' => 'C'],
67+
]))
68+
->write(
69+
to_branch(
70+
ref('group')->equals(lit('A')),
71+
to_memory($memoryA = new ArrayMemory()),
72+
)->withTransformation(new class implements Transformation {
73+
public function transform(DataFrame $dataFrame) : DataFrame
74+
{
75+
return $dataFrame->withEntry('group_name', lit('A'));
76+
}
77+
})
78+
)
79+
->write(
80+
to_branch(
81+
ref('group')->isIn(lit(['B', 'C'])),
82+
to_memory($memoryBC = new ArrayMemory()),
83+
)->withTransformation(new class implements Transformation {
84+
public function transform(DataFrame $dataFrame) : DataFrame
85+
{
86+
return $dataFrame->withEntry('group_name', lit('BC'));
87+
}
88+
})
89+
)
90+
->run();
91+
92+
self::assertSame(
93+
[
94+
['id' => 1, 'group' => 'A', 'group_name' => 'A'],
95+
['id' => 3, 'group' => 'A', 'group_name' => 'A'],
96+
['id' => 5, 'group' => 'A', 'group_name' => 'A'],
97+
],
98+
$memoryA->dump(),
99+
);
100+
self::assertSame(
101+
[
102+
['id' => 2, 'group' => 'B', 'group_name' => 'BC'],
103+
['id' => 4, 'group' => 'B', 'group_name' => 'BC'],
104+
['id' => 6, 'group' => 'C', 'group_name' => 'BC'],
105+
],
106+
$memoryBC->dump(),
107+
);
108+
}
55109
}

src/core/etl/tests/Flow/ETL/Tests/Unit/Function/NotTest.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
namespace Flow\ETL\Tests\Unit\Function;
66

77
use function Flow\ETL\DSL\row;
8-
use function Flow\ETL\DSL\{int_entry, json_entry, lit, not, ref};
8+
use function Flow\ETL\DSL\{int_entry, json_entry, lit, not, ref, string_entry, type_integer};
99
use Flow\ETL\Tests\FlowTestCase;
1010

1111
final class NotTest extends FlowTestCase
@@ -30,4 +30,20 @@ public function test_not_expression_on_is_in_expression() : void
3030
not(ref('value')->isIn(ref('array')))->eval(row(json_entry('array', [1, 2, 3]), int_entry('value', 10)))
3131
);
3232
}
33+
34+
public function test_not_expression_with_and_operator() : void
35+
{
36+
self::assertTrue(
37+
not(ref('value')->isNull()->or(ref('value')->isType(type_integer())))->eval(row(string_entry('value', '10')))
38+
);
39+
self::assertFalse(
40+
not(ref('value')->isNull()->or(ref('value')->isType(type_integer())))->eval(row(string_entry('value', null)))
41+
);
42+
self::assertTrue(
43+
not(ref('value')->isNull()->and(ref('value')->size()->between(1, 10)))->eval(row(string_entry('value', 'abcd')))
44+
);
45+
self::assertTrue(
46+
not(ref('value')->isNull()->or(ref('value')->size()->equals(1)))->eval(row(string_entry('value', 'abcd')))
47+
);
48+
}
3349
}

src/core/etl/tests/Flow/ETL/Tests/Unit/Loader/TransformerLoaderTest.php

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
namespace Flow\ETL\Tests\Unit\Loader;
66

77
use function Flow\ETL\DSL\{config, rows};
8-
use function Flow\ETL\DSL\{flow_context, to_transformation};
9-
use Flow\ETL\{Loader, Tests\FlowTestCase, Transformer};
8+
use function Flow\ETL\DSL\{df, flow_context, from_array, ref, to_memory, to_transformation, type_string};
9+
use Flow\ETL\{DataFrame, Loader, Memory\ArrayMemory, Tests\FlowTestCase, Transformation, Transformer};
1010

1111
final class TransformerLoaderTest extends FlowTestCase
1212
{
@@ -28,4 +28,33 @@ public function test_transformer_loader() : void
2828

2929
$transformer->load(rows(), flow_context(config()));
3030
}
31+
32+
public function test_transformer_loader_with_transformation() : void
33+
{
34+
df()
35+
->read(from_array([
36+
['id' => 1],
37+
['id' => 2],
38+
['id' => 3],
39+
]))
40+
->write(to_transformation(
41+
new class implements Transformation {
42+
public function transform(DataFrame $dataFrame) : DataFrame
43+
{
44+
return $dataFrame->withEntry('id_string', ref('id')->cast(type_string()));
45+
}
46+
},
47+
to_memory($memory = new ArrayMemory())
48+
))
49+
->run();
50+
51+
self::assertEquals(
52+
[
53+
['id' => 1, 'id_string' => '1'],
54+
['id' => 2, 'id_string' => '2'],
55+
['id' => 3, 'id_string' => '3'],
56+
],
57+
$memory->dump()
58+
);
59+
}
3160
}

web/landing/resources/dsl.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)