Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ parameters:
- src/lib/parquet/src/Flow/Parquet/ThriftStream/*
- src/lib/parquet/src/Flow/Parquet/Thrift/*
- src/lib/parquet/src/Flow/Parquet/BinaryReader/*
- src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnData/DefinitionConverter.php

tmpDir: var/phpstan/cache
1 change: 1 addition & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<file name="src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchParams.php"/>

<file name="src/lib/parquet-viewer/src/Flow/ParquetViewer/Command/ReadMetadataCommand.php" />
<file name="src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnData/DefinitionConverter.php" />

<directory name="src/cli/src/Flow/CLI/Command" />
<directory name="src/lib/parquet/src/Flow/Parquet/ThriftStream/" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\array_to_rows;
use function Flow\ETL\DSL\{array_to_row, rows};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext};
use Flow\Filesystem\{Path, SourceStream};
Expand Down Expand Up @@ -64,7 +64,7 @@ public function extract(FlowContext $context) : \Generator
$row['_input_file_uri'] = $fileData['stream']->path()->uri();
}

$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['stream']->path()->partitions(), $flowSchema);
$signal = yield rows(array_to_row($row, $context->entryFactory(), $fileData['stream']->path()->partitions(), $flowSchema));

$this->incrementReturnedRows();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,43 @@ private function flowListToParquetList(ListType $type) : ListElement
case ScalarType::class:
switch ($element->type()) {
case ScalarType::FLOAT:
return ListElement::float(!$type->nullable());
return ListElement::float(!$element->nullable());
case ScalarType::INTEGER:
return ListElement::int64(!$type->nullable());
return ListElement::int64(!$element->nullable());
case ScalarType::STRING:
return ListElement::string(!$type->nullable());
return ListElement::string(!$element->nullable());
case ScalarType::BOOLEAN:
return ListElement::boolean(!$type->nullable());
return ListElement::boolean(!$element->nullable());
}

break;
case DateTimeType::class:
return ListElement::datetime(!$type->nullable());
return ListElement::datetime(!$element->nullable());
case UuidType::class:
return ListElement::uuid(!$type->nullable());
return ListElement::uuid(!$element->nullable());
case JsonType::class:
return ListElement::json(!$type->nullable());
return ListElement::json(!$element->nullable());
case XMLType::class:
case XMLElementType::class:
return ListElement::string(!$type->nullable());
return ListElement::string(!$element->nullable());
case ObjectType::class:
$class = $element->class;

if ($class === \DateInterval::class) {
return ListElement::time(!$type->nullable());
return ListElement::time(!$element->nullable());
}

throw new \Flow\Parquet\Exception\RuntimeException($class . ' can\'t be converted to any parquet columns.');
case ListType::class:
return ListElement::list($this->flowListToParquetList($element), !$type->nullable());
return ListElement::list($this->flowListToParquetList($element), !$element->nullable());
case MapType::class:
return ListElement::map(
$this->flowMapKeyToParquetMapKey($element->key()),
$this->flowMapValueToParquetMapValue($element->value()),
!$type->nullable()
);
case StructureType::class:
return ListElement::structure($this->flowStructureToParquetStructureElements($element), !$type->nullable());
return ListElement::structure($this->flowStructureToParquetStructureElements($element), !$element->nullable());
}

throw new RuntimeException($element::class . ' is not supported.');
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function __construct()

public function bench_extract_10k() : void
{
foreach (from_parquet(__DIR__ . '/../Fixtures/orders_flow.parquet')->extract($this->context) as $rows) {
foreach (from_parquet(__DIR__ . '/Fixtures/orders_10k.parquet')->extract($this->context) as $rows) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function __construct()
$this->outputPath = \tempnam(\sys_get_temp_dir(), 'etl_parquet_loader_bench') . '.parquet';
$this->rows = new Rows();

foreach (from_parquet(__DIR__ . '/../Fixtures/orders_flow.parquet')->extract($this->context) as $rows) {
foreach (from_parquet(__DIR__ . '/Fixtures/orders_10k.parquet')->extract($this->context) as $rows) {
$this->rows = $this->rows->merge($rows);
}
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public function test_multifile_pagination_from_offset_bigger_than_total_rows() :

public function test_reading_file_from_given_offset() : void
{
$totalRows = (new Reader())->read(__DIR__ . '/../Fixtures/orders_flow.parquet')->metadata()->rowsNumber();
$totalRows = (new Reader())->read(__DIR__ . '/Fixtures/orders_1k.parquet')->metadata()->rowsNumber();

$extractor = (new ParquetExtractor(
Path::realpath(__DIR__ . '/../Fixtures/orders_flow.parquet'),
Path::realpath(__DIR__ . '/Fixtures/orders_1k.parquet'),
))->withOffset($totalRows - 100);

self::assertCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ final class ParquetExtractorTest extends TestCase
{
public function test_limit() : void
{
$extractor = new ParquetExtractor(\Flow\Filesystem\DSL\path(__DIR__ . '/../Fixtures/orders_flow.parquet'), Options::default());
$extractor = new ParquetExtractor(\Flow\Filesystem\DSL\path(__DIR__ . '/Fixtures/orders_1k.parquet'), Options::default());
$extractor->changeLimit(2);

self::assertCount(
Expand All @@ -26,10 +26,10 @@ public function test_limit() : void

public function test_reading_file_from_given_offset() : void
{
$totalRows = (new Reader())->read(__DIR__ . '/../Fixtures/orders_flow.parquet')->metadata()->rowsNumber();
$totalRows = (new Reader())->read(__DIR__ . '/Fixtures/orders_1k.parquet')->metadata()->rowsNumber();

$extractor = (new ParquetExtractor(
Path::realpath(__DIR__ . '/../Fixtures/orders_flow.parquet'),
Path::realpath(__DIR__ . '/Fixtures/orders_1k.parquet'),
))->withOffset($totalRows - 100);

self::assertCount(
Expand All @@ -40,7 +40,7 @@ public function test_reading_file_from_given_offset() : void

public function test_signal_stop() : void
{
$extractor = new ParquetExtractor(\Flow\Filesystem\DSL\path(__DIR__ . '/../Fixtures/orders_flow.parquet'), Options::default());
$extractor = new ParquetExtractor(\Flow\Filesystem\DSL\path(__DIR__ . '/Fixtures/orders_1k.parquet'), Options::default());

$generator = $extractor->extract(new FlowContext(Config::default()));

Expand Down
2 changes: 2 additions & 0 deletions src/cli/flow
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use Flow\CLI\Command\PipelineRunCommand;
use Flow\CLI\Command\FileSchemaCommand;
use Flow\CLI\FlowVersion;
use Flow\ParquetViewer\Command\ReadDataCommand;
use Flow\ParquetViewer\Command\ReadDDLCommand;
use Flow\ParquetViewer\Command\ReadMetadataCommand;
use Symfony\Component\Console\Application;

Expand Down Expand Up @@ -45,6 +46,7 @@ $application = new Application('Flow PHP - Data processing framework', $pharRunt

$application->add((new ReadDataCommand())->setName('parquet:read')->setAliases(['parquet:read:data']));
$application->add((new ReadMetadataCommand())->setName('parquet:read:metadata'));
$application->add((new ReadDDLCommand())->setName('parquet:read:ddl'));
$application->add((new PipelineRunCommand())->setName('pipeline:run')->setAliases(['run']));
$application->add((new FileReadCommand())->setName('file:read')->setAliases(['read']));
$application->add((new FileSchemaCommand())->setName('file:schema')->setAliases(['schema']));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ public function test_read_rows_parquet() : void

self::assertStringContainsString(
<<<'OUTPUT'
+----------------------+----------------------+----------------------+--------------+-------------+------------+----------------------+----------------------+----------------------+
| order_id | created_at | updated_at | cancelled_at | total_price | discount | customer | address | notes |
+----------------------+----------------------+----------------------+--------------+-------------+------------+----------------------+----------------------+----------------------+
| 9354bda0-02b7-3820-9 | 2023-10-02T23:59:16+ | 2023-10-10T11:43:41+ | | 170.0500031 | 32.0900002 | {"name":"Adah","last | {"street":"73121 Swi | [null,null,null] |
| 8a7c3f29-e669-3aba-8 | 2023-05-20T08:59:30+ | 2023-10-12T03:24:25+ | | 239.9400024 | 47.7900009 | {"name":"Kasandra"," | {"street":"5864 Kael | [null,null,null] |
| fd35921c-85ca-30c1-a | 2023-05-13T13:56:02+ | 2023-09-28T10:27:33+ | | 148.3800049 | 3.0799999 | {"name":"Shaina","la | {"street":"651 Okune | [null] |
| a86b1747-73d4-3ed8-b | 2023-10-03T00:27:46+ | 2023-10-10T07:59:28+ | | 384.4899902 | 7.8800001 | {"name":"Dane","last | {"street":"7465 Spor | [null,null,null,null |
| 10544dfc-405a-3913-9 | 2023-08-06T21:54:08+ | 2023-10-05T13:15:17+ | | 265.4400024 | 32.3699989 | {"name":"Mireille"," | {"street":"671 Korbi | [null] |
+----------------------+----------------------+----------------------+--------------+-------------+------------+----------------------+----------------------+----------------------+
+----------------------+----------------------+----------------------+------------+----------------------+-------------------+----------------------+----------------------+----------------------+
| order_id | created_at | updated_at | discount | email | customer | address | notes | items |
+----------------------+----------------------+----------------------+------------+----------------------+-------------------+----------------------+----------------------+----------------------+
| 1e4544ab-7c94-3d39-b | 2024-10-02T23:01:19+ | 2024-11-18T12:21:16+ | 26.2099991 | [email protected] | Rafaela Hartmann | {"street":"64610 Kat | ["Deleniti vitae dol | [{"sku":"SKU_0005"," |
| f1aba27a-3387-3e10-b | 2024-09-04T01:26:42+ | 2024-11-15T05:27:03+ | | [email protected] | Marjolaine Kohler | {"street":"14054 Ker | ["Nulla exercitation | [{"sku":"SKU_0003"," |
| c8d23f0b-c157-323f-8 | 2024-02-14T14:07:07+ | 2024-11-06T13:38:25+ | | rolfson.noble@hotmai | Loyce McLaughlin | {"street":"9058 Kess | ["Laborum molestiae | [{"sku":"SKU_0005"," |
| d6215090-cea0-3fd9-a | 2024-10-12T09:18:12+ | 2024-11-21T09:38:15+ | | [email protected] | Estelle Schinner | {"street":"68058 Dav | ["In dolore nam et s | [{"sku":"SKU_0003"," |
| ac622a00-7de2-3eb0-b | 2024-06-07T10:27:53+ | 2024-11-23T21:03:36+ | | [email protected] | Ethan Hodkiewicz | {"street":"84594 Vla | ["Vel ipsam id quos | [{"sku":"SKU_0004"," |
+----------------------+----------------------+----------------------+------------+----------------------+-------------------+----------------------+----------------------+----------------------+
5 rows
OUTPUT,
$tester->getDisplay()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function test_count_rows_parquet() : void

$tester->assertCommandIsSuccessful();

self::assertSame('10000', $tester->getDisplay());
self::assertSame('1000', $tester->getDisplay());
}

public function test_count_rows_text() : void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,19 @@ public function test_run_schema_with_table_output_on_parquet() : void

self::assertSame(
<<<'OUTPUT'
+--------------+-----------+----------+-------------+----------+
| name | type | nullable | scalar_type | metadata |
+--------------+-----------+----------+-------------+----------+
| order_id | uuid | false | | [] |
| created_at | datetime | false | | [] |
| updated_at | datetime | false | | [] |
| cancelled_at | scalar | true | string | [] |
| total_price | scalar | false | float | [] |
| discount | scalar | false | float | [] |
| customer | structure | false | | [] |
| address | structure | false | | [] |
| notes | json | false | | [] |
+--------------+-----------+----------+-------------+----------+
+------------+----------+----------+-------------+----------+
| name | type | nullable | scalar_type | metadata |
+------------+----------+----------+-------------+----------+
| order_id | uuid | false | | [] |
| created_at | datetime | false | | [] |
| updated_at | datetime | false | | [] |
| discount | scalar | true | string | [] |
| email | scalar | false | string | [] |
| customer | scalar | false | string | [] |
| address | map | false | | [] |
| notes | list | false | | [] |
| items | list | false | | [] |
+------------+----------+----------+-------------+----------+
9 rows

OUTPUT,
Expand Down
Binary file not shown.
83 changes: 75 additions & 8 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
if (!\function_exists('dd')) {
function dd(...$args) : void
{
try {
throw new RuntimeException();
} catch (Exception $e) {
$header = $e->getTrace()[0]['file'] . ':' . $e->getTrace()[0]['line'];
}
print PHP_EOL . $header . PHP_EOL;

foreach ($args as $arg) {
\var_dump($arg);
}
Expand All @@ -14,23 +21,83 @@ function dd(...$args) : void
}

if (!\function_exists('dj')) {
function dj(...$args) : void
function dj(mixed $args, int $indention = 0, ?string $header = null) : void
{
$output = [];
if (!\is_array($args)) {
$args = [$args];
}

foreach ($args as $arg) {
$output[] = \json_encode($arg);
if ($header === null) {
try {
throw new RuntimeException();
} catch (Exception $e) {
$header = $e->getTrace()[0]['file'] . ':' . $e->getTrace()[0]['line'];
}
}

print PHP_EOL . \str_repeat(' ', $indention) . $header . PHP_EOL;
print \str_repeat(' ', $indention) . '[' . PHP_EOL;

if (\array_is_list($args)) {
foreach ($args as $i => $v) {
if (\is_object($v)) {
if (method_exists($v, '__debugInfo')) {
print \str_repeat(' ', $indention + 2) . \json_encode($v->__debugInfo()) . PHP_EOL;

continue;
}

if (method_exists($v, '__toString')) {
print \str_repeat(' ', $indention + 2) . \json_encode($v->__toString()) . PHP_EOL;

continue;
}
}

print \str_repeat(' ', $indention + 2) . \json_encode($v) . PHP_EOL;
}

print PHP_EOL;

return;
}

\var_dump($output);
foreach ($args as $i => $v) {
if (\is_object($v)) {
if (method_exists($v, '__debugInfo')) {
print \str_repeat(' ', $indention + 2) . $i . ': ' . \json_encode($v->__debugInfo()) . PHP_EOL;

continue;
}

if (method_exists($v, '__toString')) {
print \str_repeat(' ', $indention + 2) . $i . ': ' . \json_encode($v->__toString()) . PHP_EOL;

continue;
}
}

print \str_repeat(' ', $indention + 2) . $i . ': ' . \json_encode($v) . PHP_EOL;
}

print \str_repeat(' ', $indention) . ']' . PHP_EOL;
}
}

if (!\function_exists('djd')) {
function djd(...$args) : void
if (!\function_exists('ddj')) {
function ddj(mixed $args, int $indention = 0) : void
{
try {
throw new RuntimeException();
} catch (Exception $e) {
$header = $e->getTrace()[0]['file'] . ':' . $e->getTrace()[0]['line'];
}

if (!\is_array($args)) {
$args = [$args];
}

\dj(...$args);
dj($args, $indention, $header);

exit(1);
}
Expand Down
11 changes: 9 additions & 2 deletions src/lib/dremel/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
# Dremel

Dremel is a library that provides pure PHP implementation of two algorithms described in the paper "Dremel: Interactive Analysis of Web-Scale Datasets".
The algorithms are used to shred and assemble nested data structures in a columnar format.
> [!WARNING]
> This library is not ready yet. Currently, the implementation of the algorithms described in [this document](http://www-cs-students.stanford.edu/~adityagp/courses/cs598/papers/dremel.pdf) is located directly in the flow-php/parquet repository.
> To be able to extract the algorithms that allow flattening and rebuilding nested data structures, we will need to move several elements from the parquet library to the dremel library.
> Here are some of them:
>
> - Schema
> - Repetitions
> - DremelShredder
> - DremelAssembler

> [!IMPORTANT]
> This repository is a subtree split from our monorepo. If you'd like to contribute, please visit our main monorepo [flow-php/flow](https://github.com/flow-php/flow).
Expand Down
15 changes: 0 additions & 15 deletions src/lib/dremel/src/Flow/Dremel/DataShredded.php

This file was deleted.

Loading
Loading