Skip to content

Commit 446df3b

Browse files
authored
Chore/thrift performance (#1867)
* Reorganized Thrift related namespaces * Thrift building blocks cleanup * Simplified Thrift Protocol and Transport * Improved debug code precision * Optimize MemoryBuffer and Compact Protocol * Optimized Thrift PhpFileStream and BufferedTransports * Fixed cleanining page containers after flush * Fixed issue with non cleaning page coumn chunk buidlers properly after flushing * Covered Thrift objects with unit tests * Added parquet schema conversion to DSL
1 parent 49f2cc7 commit 446df3b

File tree

110 files changed

+2743
-234
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+2743
-234
lines changed

.php-cs-fixer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
__DIR__ . '/tools/rector/src',
3333
])
3434
->exclude([
35-
'Flow/Parquet/Thrift',
35+
'Flow/Parquet/ThriftModel',
3636
'Flow/CLI/Tests/Integration',
3737
'Flow/ETL/Tests/Unit/Loader',
3838
'Flow/ETL/Tests/Unit/Exception'

composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,8 @@
456456
"./tools/phpdocumentor/vendor/bin/phpdoc --config=./phpdoc/bridge.symfony.http-foundation.xml"
457457
],
458458
"build:parquet:thrift": [
459-
"grep -q 'namespace php Flow.Parquet.Thrift' src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift || { echo \"Flow php namespace not found in thrift definition!\"; exit 1; }\n",
460-
"rm src/lib/parquet/src/Flow/Parquet/Thrift/*.php",
459+
"grep -q 'namespace php Flow.Parquet.ThriftModel' src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift || { echo \"Flow php namespace not found in thrift definition!\"; exit 1; }\n",
460+
"rm src/lib/parquet/src/Flow/Parquet/ThriftModel/*.php",
461461
"thrift --gen php --out src/lib/parquet/src src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift",
462462
"@cs:php:fix"
463463
],

phpstan.neon

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ parameters:
6565
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchParams.php
6666
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/PointInTime.php
6767
- src/adapter/etl-adapter-excel/src/Flow/ETL/Adapter/Excel/Sheet/SheetsManager.php
68-
- src/lib/parquet/src/Flow/Parquet/ThriftStream/*
6968
- src/lib/parquet/src/Flow/Parquet/Thrift/*
69+
- src/lib/parquet/src/Flow/Parquet/ThriftModel/*
7070
- src/lib/parquet/src/Flow/Parquet/BinaryReader/*
7171
- src/lib/parquet/src/Flow/Parquet/Dremel/ColumnData/DefinitionConverter.php
7272

rector.src.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
])
1818
->withSkip([
1919
StringClassNameToClassConstantRector::class,
20-
__DIR__ . '/src/lib/parquet/src/Flow/Parquet/Thrift/*',
20+
__DIR__ . '/src/lib/parquet/src/Flow/Parquet/ThriftModel/*',
2121
])
2222
->withCache(__DIR__ . '/var/rector/src')
2323
->withImportNames(importShortClasses: false, removeUnusedImports: true)

src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/functions.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,15 @@ function empty_generator() : \Generator
9393
{
9494
yield from [];
9595
}
96+
97+
#[DocumentationDSL(module: Module::PARQUET, type: DSLType::HELPER)]
98+
function schema_to_parquet(Schema $schema) : \Flow\Parquet\ParquetFile\Schema
99+
{
100+
return (new SchemaConverter())->toParquet($schema);
101+
}
102+
103+
#[DocumentationDSL(module: Module::PARQUET, type: DSLType::HELPER)]
104+
function schema_from_parquet(\Flow\Parquet\ParquetFile\Schema $schema) : Schema
105+
{
106+
return (new SchemaConverter())->toFlow($schema);
107+
}

src/core/etl/tests/Flow/ETL/Tests/Double/FakeStaticOrdersExtractor.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,15 @@
44

55
namespace Flow\ETL\Tests\Double;
66

7-
use function Flow\ETL\DSL\{array_to_rows, datetime_schema, float_schema, list_schema, schema, string_schema, structure_schema, uuid_schema};
7+
use function Flow\ETL\DSL\{array_to_rows,
8+
datetime_schema,
9+
float_schema,
10+
integer_schema,
11+
list_schema,
12+
schema,
13+
string_schema,
14+
structure_schema,
15+
uuid_schema};
816
use function Flow\Types\DSL\{type_float, type_integer, type_list, type_string, type_structure};
917
use Flow\ETL\{Extractor, FlowContext, Schema};
1018

@@ -17,6 +25,7 @@ public function __construct(private int $count = 1_000)
1725
public static function schema() : Schema
1826
{
1927
return schema(
28+
integer_schema('index'),
2029
uuid_schema('order_id'),
2130
datetime_schema('created_at'),
2231
datetime_schema('updated_at', true),
@@ -65,6 +74,7 @@ public function rawData() : \Generator
6574

6675
for ($i = 0; $i < $this->count; $i++) {
6776
yield [
77+
'index' => $i,
6878
'order_id' => '254d61c5-22c8-4407-83a2-76f1cab53af2',
6979
'created_at' => new \DateTimeImmutable('2025-01-01 12:00:00'),
7080
'updated_at' => \random_int(0, 1) === 1 ? new \DateTimeImmutable('2025-01-01 12:10:00') : null,

src/lib/parquet/src/Flow/Parquet/ParquetFile.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
ParquetFile\Metadata,
1212
ParquetFile\Page\ColumnPageHeader,
1313
ParquetFile\Schema,
14-
Reader\PageReader};
14+
Reader\PageReader,
15+
Thrift\CompactProtocol,
16+
Thrift\MemoryBuffer};
1517
use Flow\Parquet\Exception\{InvalidArgumentException, RuntimeException};
1618
use Flow\Parquet\ParquetFile\Data\DataConverter;
1719
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn};
1820
use Flow\Parquet\ParquetFile\Schema\NestedColumn;
1921
use Flow\Parquet\Reader\{ColumnChunkReader, ColumnChunkViewer};
20-
use Flow\Parquet\Thrift\FileMetaData;
21-
use Thrift\Protocol\TCompactProtocol;
22-
use Thrift\Transport\TMemoryBuffer;
22+
use Flow\Parquet\ThriftModel\FileMetaData;
2323

2424
final class ParquetFile
2525
{
@@ -64,8 +64,8 @@ public function metadata() : Metadata
6464

6565
$thriftMetadata = new FileMetaData();
6666
$thriftMetadata->read(
67-
new TCompactProtocol(
68-
new TMemoryBuffer($metadata)
67+
new CompactProtocol(
68+
new MemoryBuffer($metadata)
6969
)
7070
);
7171

src/lib/parquet/src/Flow/Parquet/ParquetFile/Metadata.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
use Flow\Parquet\Options;
88
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
9-
use Flow\Parquet\Thrift\FileMetaData;
9+
use Flow\Parquet\ThriftModel\FileMetaData;
1010

1111
final readonly class Metadata
1212
{

src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeader.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public function __construct(
1616
) {
1717
}
1818

19-
public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeader $thrift) : self
19+
public static function fromThrift(\Flow\Parquet\ThriftModel\DataPageHeader $thrift) : self
2020
{
2121
return new self(
2222
Encodings::from($thrift->encoding),
@@ -41,9 +41,9 @@ public function repetitionLevelEncoding() : Encodings
4141
return $this->repetitionLevelEncoding;
4242
}
4343

44-
public function toThrift() : \Flow\Parquet\Thrift\DataPageHeader
44+
public function toThrift() : \Flow\Parquet\ThriftModel\DataPageHeader
4545
{
46-
return new \Flow\Parquet\Thrift\DataPageHeader([
46+
return new \Flow\Parquet\ThriftModel\DataPageHeader([
4747
'num_values' => $this->valuesCount,
4848
'encoding' => $this->encoding->value,
4949
'definition_level_encoding' => $this->definitionLevelEncoding->value,

src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public function __construct(
2222
) {
2323
}
2424

25-
public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift, Options $options) : self
25+
public static function fromThrift(\Flow\Parquet\ThriftModel\DataPageHeaderV2 $thrift, Options $options) : self
2626
{
2727
return new self(
2828
$thrift->num_values,
@@ -61,9 +61,9 @@ public function statistics(Options $options) : ?StatisticsReader
6161
return new StatisticsReader($this->statistics, $options);
6262
}
6363

64-
public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2
64+
public function toThrift() : \Flow\Parquet\ThriftModel\DataPageHeaderV2
6565
{
66-
return new \Flow\Parquet\Thrift\DataPageHeaderV2([
66+
return new \Flow\Parquet\ThriftModel\DataPageHeaderV2([
6767
'num_values' => $this->valuesCount,
6868
'num_nulls' => $this->nullsCount,
6969
'num_rows' => $this->rowsCount,

0 commit comments

Comments
 (0)