Skip to content

Commit 180202f

Browse files
authored
EntryReference expressions (#362)
* First draft of reference expressions * Allow to chain EntryReference expressions * Simplified usage of Literal values * Fixed example * CS Fixes
1 parent 10e05cb commit 180202f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1937
-117
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use function Flow\ETL\DSL\lit;
6+
use function Flow\ETL\DSL\ref;
7+
use Flow\ETL\DSL\Entry;
8+
use Flow\ETL\DSL\From;
9+
use Flow\ETL\DSL\To;
10+
use Flow\ETL\Flow;
11+
use Flow\ETL\Row;
12+
use Flow\ETL\Rows;
13+
14+
require __DIR__ . '/../../bootstrap.php';
15+
16+
(new Flow())
17+
->read(
18+
From::rows(new Rows(
19+
Row::with(Entry::int('a', 100), Entry::int('b', 100)),
20+
Row::with(Entry::int('a', 100), Entry::int('b', 200))
21+
))
22+
)
23+
->filter(ref('b')->divide(lit(2))->equals('a'))
24+
->withEntry('new_b', ref('b')->multiply(lit(2))->multiply(lit(5)))
25+
->write(To::output(false))
26+
->run();
27+
28+
(new Flow())
29+
->read(
30+
From::rows(new Rows(
31+
Row::with(Entry::int('a', 4), Entry::int('b', 5)),
32+
Row::with(Entry::int('a', 3), Entry::int('b', 6))
33+
))
34+
)
35+
->filter(ref('b')->mod(lit(2))->equals(lit(0)))
36+
->write(To::output(false))
37+
->run();
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use function Flow\ETL\DSL\lit;
6+
use Flow\ETL\DSL\Entry;
7+
use Flow\ETL\DSL\From;
8+
use Flow\ETL\DSL\To;
9+
use Flow\ETL\Flow;
10+
use Flow\ETL\Row;
11+
use Flow\ETL\Rows;
12+
13+
require __DIR__ . '/../../bootstrap.php';
14+
15+
(new Flow())
16+
->read(
17+
From::rows(new Rows(
18+
Row::create(Entry::string('name', 'Norbert'))
19+
))
20+
)
21+
->withEntry('number', lit(1))
22+
->write(To::output(false))
23+
->run();
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use function Flow\ETL\DSL\ref;
6+
use Flow\ETL\DSL\Entry;
7+
use Flow\ETL\DSL\From;
8+
use Flow\ETL\DSL\To;
9+
use Flow\ETL\Flow;
10+
use Flow\ETL\Row;
11+
use Flow\ETL\Rows;
12+
13+
require __DIR__ . '/../../bootstrap.php';
14+
15+
(new Flow())
16+
->read(
17+
From::rows(new Rows(
18+
Row::create(Entry::integer('a', 100), Entry::integer('b', 200))
19+
))
20+
)
21+
->write(To::output(false))
22+
->withEntry('c', ref('a')->plus('b'))
23+
->withEntry('d', ref('b')->minus('a'))
24+
->write(To::output(false))
25+
->run();

src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroLoader.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public function destination() : Path
6565

6666
public function load(Rows $rows, FlowContext $context) : void
6767
{
68-
if (\count($context->partitionEntries())) {
68+
if ($context->partitionEntries()->count()) {
6969
throw new RuntimeException('Partitioning is not supported yet');
7070
}
7171

src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVLoader.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public function load(Rows $rows, FlowContext $context) : void
8383

8484
$headers = $rows->first()->entries()->map(fn (Entry $entry) => $entry->name());
8585

86-
if (\count($context->partitionEntries())) {
87-
foreach ($rows->partitionBy(...$context->partitionEntries()) as $partition) {
86+
if ($context->partitionEntries()->count()) {
87+
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partition) {
8888
$this->write($partition->rows, $headers, $context, $partition->partitions);
8989
}
9090
} else {

src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JsonLoader.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public function destination() : Path
5959

6060
public function load(Rows $rows, FlowContext $context) : void
6161
{
62-
if (\count($context->partitionEntries())) {
63-
foreach ($rows->partitionBy(...$context->partitionEntries()) as $partitionedRows) {
62+
if ($context->partitionEntries()->count()) {
63+
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partitionedRows) {
6464
$this->write($partitionedRows->rows, $partitionedRows->partitions, $context);
6565
}
6666
} else {

src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextLoader.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public function destination() : Path
5858

5959
public function load(Rows $rows, FlowContext $context) : void
6060
{
61-
if (\count($context->partitionEntries())) {
62-
foreach ($rows->partitionBy(...$context->partitionEntries()) as $partition) {
61+
if ($context->partitionEntries()->count()) {
62+
foreach ($rows->partitionBy(...$context->partitionEntries()->all()) as $partition) {
6363
foreach ($partition->rows as $row) {
6464
if ($row->entries()->count() > 1) {
6565
throw new RuntimeException(\sprintf('Text data loader supports only a single entry rows, and you have %d rows.', $row->entries()->count()));

src/core/etl/README.md

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,15 @@ $flow->process(new Rows(...))
447447
->run();
448448
```
449449

450-
## Filter
450+
## Filtering
451451

452-
In order to quickly filter Rows `DataFrame::filter` shortcut function can be used.
452+
There are two ways to filter datasets in Flow.
453+
454+
### Callback Filtering
455+
456+
This is the most flexible but also the least developer friendly apporach.
457+
All you need to do is use callback `fn (Row $row) : bool` that will be executed against each row
458+
and that will tell filter to keep or skip the row.
453459

454460
```php
455461
<?php
@@ -462,6 +468,32 @@ $flow->process(new Rows(...))
462468
->run();
463469
```
464470

471+
### DSL Filtering
472+
473+
This approach is way more developer friendly as IDE can autocomplete all filtering functions for you.
474+
475+
```php
476+
<?php
477+
478+
(new Flow())
479+
->read(
480+
From::rows(new Rows(
481+
Row::with(Entry::int('a', 100), Entry::int('b', 100)),
482+
Row::with(Entry::int('a', 100), Entry::int('b', 200))
483+
))
484+
)
485+
->filter(ref('b')->divide(lit(2))->equals('a'))
486+
->withEntry('new_b', ref('b')->multiply(lit(2))->multiply(lit(5)))
487+
->write(To::output(false))
488+
->run();
489+
```
490+
491+
So in general all filtering functions are available as [Reference Expression](/src/core/etl/src/Flow/ETL/Row/Reference/Expression.php).
492+
This means you can chain them creating readable conditions that will significantly improve maintainability of your
493+
data processing pipelines.
494+
495+
All possible expressions are available through [EntryReference.php](/src/core/etl/src/Flow/ETL/Row/EntryReference.php).
496+
465497
## Group By
466498

467499
Flow allows grouping that is similar to the one known from database engines.

src/core/etl/src/Flow/ETL/Async/Socket/Communication/Message.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use Flow\ETL\FlowContext;
99
use Flow\ETL\Partition\PartitionFilter;
1010
use Flow\ETL\Pipeline\Pipes;
11-
use Flow\ETL\Row\EntryReference;
11+
use Flow\ETL\Row\References;
1212
use Flow\ETL\Rows;
1313
use Flow\Serializer\Serializable;
1414
use Flow\Serializer\Serializer;
@@ -22,7 +22,7 @@
2222
* cache?: Cache,
2323
* cache_id?: string,
2424
* rows?: Rows,
25-
* partition_entries?: array<EntryReference>,
25+
* partition_entries?: References,
2626
* partition_filter?: PartitionFilter
2727
* }
2828
* }>
@@ -37,7 +37,7 @@ final class Message implements Serializable
3737
* cache?: Cache,
3838
* cache_id?: string,
3939
* rows?: Rows,
40-
* partition_entries?: array<EntryReference>,
40+
* partition_entries?: References,
4141
* partition_filter?: PartitionFilter
4242
* } $payload
4343
*/
@@ -110,7 +110,7 @@ public static function setup(Pipes $pipes, FlowContext $context, string $cacheId
110110
* cache?: Cache,
111111
* cache_id?: string,
112112
* rows?: Rows,
113-
* partition_entries?: array<EntryReference>,
113+
* partition_entries?: References,
114114
* partition_filter?: PartitionFilter
115115
* }
116116
*/

src/core/etl/src/Flow/ETL/Async/Socket/Worker/ClientProtocol.php

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,22 @@ public function __construct(private readonly Processor $processor)
2626

2727
public function handle(string $id, Message $message, Server $server) : void
2828
{
29+
$payload = $message->payload();
30+
2931
switch ($message->type()) {
3032
case Protocol::SERVER_SETUP:
31-
$this->processor->setPipes($message->payload()['pipes'] ?? new Pipes([]));
32-
$this->processor->setPartitionEntries($message->payload()['partition_entries'] ?? []);
33-
$this->processor->setPartitionFilter($message->payload()['partition_filter'] ?? new NoopFilter());
34-
$this->cache = $message->payload()['cache'] ?? $this->cache;
35-
$this->cacheId = $message->payload()['cache_id'] ?? $this->cacheId;
33+
$refs = \array_key_exists('partition_entries', $payload) ? $payload['partition_entries']->all() : [];
34+
$this->processor->setPipes($payload['pipes'] ?? new Pipes([]));
35+
$this->processor->setPartitionEntries($refs);
36+
$this->processor->setPartitionFilter($payload['partition_filter'] ?? new NoopFilter());
37+
$this->cache = $payload['cache'] ?? $this->cache;
38+
$this->cacheId = $payload['cache_id'] ?? $this->cacheId;
3639

3740
$server->send(Message::fetch($id));
3841

3942
break;
4043
case Protocol::SERVER_PROCESS:
41-
$rows = $this->processor->process($message->payload()['rows'] ?? new Rows());
44+
$rows = $this->processor->process($payload['rows'] ?? new Rows());
4245

4346
$server->send(Message::fetch($id));
4447
$this->cache->add($this->cacheId, $rows);

0 commit comments

Comments
 (0)