Skip to content

Commit 9d83d9f

Browse files
Merge pull request ClickHouse#79777 from scanhex12/geo_parquet
Support geo parquet
2 parents 83ff78d + 019d768 commit 9d83d9f

File tree

14 files changed

+815
-11
lines changed

14 files changed

+815
-11
lines changed

src/IO/ReadHelpers.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,6 +1397,20 @@ inline void readBinaryEndian(T & x, ReadBuffer & buf)
13971397
transformEndianness<endian>(x);
13981398
}
13991399

1400+
template <typename T>
1401+
inline void readBinaryEndian(T & x, ReadBuffer & buf, std::endian endian)
1402+
{
1403+
switch (endian)
1404+
{
1405+
case std::endian::little:
1406+
readBinaryLittleEndian(x, buf);
1407+
break;
1408+
case std::endian::big:
1409+
readBinaryBigEndian(x, buf);
1410+
break;
1411+
}
1412+
}
1413+
14001414
template <typename T>
14011415
inline void readBinaryLittleEndian(T & x, ReadBuffer & buf)
14021416
{

src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "ArrowBlockInputFormat.h"
2+
#include <optional>
23

34
#if USE_ARROW
45

@@ -89,7 +90,7 @@ Chunk ArrowBlockInputFormat::read()
8990
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
9091
/// Otherwise fill the missing columns with zero values of its type.
9192
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
92-
res = arrow_column_to_ch_column->arrowTableToCHChunk(*table_result, (*table_result)->num_rows(), block_missing_values_ptr);
93+
res = arrow_column_to_ch_column->arrowTableToCHChunk(*table_result, (*table_result)->num_rows(), file_reader ? file_reader->metadata() : nullptr, block_missing_values_ptr);
9394

9495
/// There is no easy way to get original record batch size from Arrow metadata.
9596
/// Let's just use the number of bytes read from read buffer.
@@ -208,6 +209,7 @@ NamesAndTypesList ArrowSchemaReader::readSchema()
208209

209210
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
210211
*schema,
212+
file_reader ? file_reader->metadata() : nullptr,
211213
stream ? "ArrowStream" : "Arrow",
212214
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference,
213215
format_settings.schema_inference_make_columns_nullable != 0);

src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "ArrowColumnToCHColumn.h"
2+
#include "Common/Exception.h"
23

34
#if USE_ARROW || USE_ORC || USE_PARQUET
45

@@ -22,6 +23,7 @@
2223
#include <Common/DateLUTImpl.h>
2324
#include <Processors/Chunk.h>
2425
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
26+
#include <Processors/Formats/Impl/ArrowGeoTypes.h>
2527
#include <Columns/ColumnString.h>
2628
#include <Columns/ColumnNullable.h>
2729
#include <Columns/ColumnArray.h>
@@ -37,6 +39,7 @@
3739
#include <algorithm>
3840
#include <arrow/builder.h>
3941
#include <arrow/array.h>
42+
#include <arrow/util/key_value_metadata.h>
4043
#include <boost/algorithm/string/case_conv.hpp>
4144

4245

@@ -466,6 +469,39 @@ static ColumnPtr readByteMapFromArrowColumn(const std::shared_ptr<arrow::Chunked
466469
return nullmap_column;
467470
}
468471

472+
static ColumnWithTypeAndName readColumnWithGeoData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, GeoColumnMetadata geo_metadata)
473+
{
474+
auto column_builder = GeoColumnBuilder(column_name, geo_metadata.type);
475+
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
476+
{
477+
arrow::BinaryArray & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
478+
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
479+
const size_t chunk_length = chunk.length();
480+
481+
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
482+
{
483+
auto * raw_data = buffer->mutable_data() + chunk.value_offset(offset_i);
484+
if (chunk.IsNull(offset_i))
485+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Geometry nullable columns are not supported");
486+
487+
ReadBuffer in_buffer(reinterpret_cast<char*>(raw_data), chunk.value_length(offset_i), 0);
488+
ArrowGeometricObject result_object;
489+
switch (geo_metadata.encoding)
490+
{
491+
case GeoEncoding::WKB:
492+
result_object = parseWKBFormat(in_buffer);
493+
break;
494+
case GeoEncoding::WKT:
495+
result_object = parseWKTFormat(in_buffer);
496+
break;
497+
}
498+
column_builder.appendObject(result_object);
499+
}
500+
}
501+
502+
return column_builder.getResultColumn();
503+
}
504+
469505
template <typename T>
470506
struct ArrowOffsetArray;
471507

@@ -762,6 +798,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
762798
DataTypePtr type_hint,
763799
bool is_nullable_column,
764800
bool is_map_nested_column,
801+
std::optional<GeoColumnMetadata> geo_metadata,
765802
const ReadColumnFromArrowColumnSettings & settings);
766803

767804
static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
@@ -770,6 +807,7 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
770807
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
771808
DataTypePtr type_hint,
772809
bool is_map_nested_column,
810+
std::optional<GeoColumnMetadata> geo_metadata,
773811
const ReadColumnFromArrowColumnSettings & settings)
774812
{
775813
switch (arrow_column->type()->id())
@@ -808,6 +846,10 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
808846
break;
809847
}
810848
}
849+
if (geo_metadata)
850+
{
851+
return readColumnWithGeoData(arrow_column, column_name, *geo_metadata);
852+
}
811853
return readColumnWithStringData<arrow::BinaryArray>(arrow_column, column_name);
812854
}
813855
case arrow::Type::FIXED_SIZE_BINARY:
@@ -892,6 +934,7 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
892934
nested_type_hint,
893935
false /*is_nullable_column*/,
894936
true /*is_map_nested_column*/,
937+
geo_metadata,
895938
settings);
896939
if (!nested_column.column)
897940
return {};
@@ -991,6 +1034,7 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
9911034
nested_type_hint,
9921035
is_nested_nullable_column,
9931036
false /*is_map_nested_column*/,
1037+
geo_metadata,
9941038
settings);
9951039
if (!nested_column.column)
9961040
return {};
@@ -1075,6 +1119,7 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
10751119
nested_type_hint,
10761120
field->nullable(),
10771121
false /*is_map_nested_column*/,
1122+
geo_metadata,
10781123
settings);
10791124
if (!column_with_type_and_name.column)
10801125
return {};
@@ -1114,6 +1159,7 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
11141159
nullptr /*nested_type_hint*/,
11151160
false /*is_nullable_column*/,
11161161
false /*is_map_nested_column*/,
1162+
geo_metadata,
11171163
settings);
11181164

11191165
if (!dict_column.column)
@@ -1205,9 +1251,10 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
12051251
DataTypePtr type_hint,
12061252
bool is_nullable_column,
12071253
bool is_map_nested_column,
1254+
std::optional<GeoColumnMetadata> geo_metadata,
12081255
const ReadColumnFromArrowColumnSettings & settings)
12091256
{
1210-
bool read_as_nullable_column = (arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable())) && settings.allow_inferring_nullable_columns;
1257+
bool read_as_nullable_column = (arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable())) && !geo_metadata && settings.allow_inferring_nullable_columns;
12111258
if (read_as_nullable_column &&
12121259
arrow_column->type()->id() != arrow::Type::LIST &&
12131260
arrow_column->type()->id() != arrow::Type::LARGE_LIST &&
@@ -1225,6 +1272,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
12251272
dictionary_infos,
12261273
nested_type_hint,
12271274
is_map_nested_column,
1275+
geo_metadata,
12281276
settings);
12291277

12301278
if (!nested_column.column)
@@ -1242,6 +1290,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
12421290
dictionary_infos,
12431291
type_hint,
12441292
is_map_nested_column,
1293+
geo_metadata,
12451294
settings);
12461295
}
12471296

@@ -1271,6 +1320,7 @@ static std::shared_ptr<arrow::ChunkedArray> createArrowColumn(const std::shared_
12711320

12721321
Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
12731322
const arrow::Schema & schema,
1323+
std::shared_ptr<const arrow::KeyValueMetadata> metadata,
12741324
const std::string & format_name,
12751325
bool skip_columns_with_unsupported_types,
12761326
bool allow_inferring_nullable_columns,
@@ -1288,6 +1338,9 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
12881338

12891339
ColumnsWithTypeAndName sample_columns;
12901340

1341+
auto geo_json = extractGeoMetadata(metadata);
1342+
std::unordered_map<String, GeoColumnMetadata> geo_columns = parseGeoMetadataEncoding(geo_json);
1343+
12911344
for (const auto & field : schema.fields())
12921345
{
12931346
/// Create empty arrow column by it's type and convert it to ClickHouse column.
@@ -1302,6 +1355,7 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
13021355
nullptr /*nested_type_hint*/,
13031356
field->nullable() /*is_nullable_column*/,
13041357
false /*is_map_nested_column*/,
1358+
geo_columns.contains(field->name()) ? std::optional(geo_columns[field->name()]) : std::nullopt,
13051359
settings);
13061360

13071361
if (sample_column.column)
@@ -1329,7 +1383,11 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
13291383
{
13301384
}
13311385

1332-
Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(const std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values)
1386+
Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(
1387+
const std::shared_ptr<arrow::Table> & table,
1388+
size_t num_rows,
1389+
std::shared_ptr<const arrow::KeyValueMetadata> metadata,
1390+
BlockMissingValues * block_missing_values)
13331391
{
13341392
NameToArrowColumn name_to_arrow_column;
13351393

@@ -1347,10 +1405,14 @@ Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(const std::shared_ptr<arrow::Ta
13471405
name_to_arrow_column[std::move(column_name)] = {std::move(arrow_column), std::move(arrow_field)};
13481406
}
13491407

1350-
return arrowColumnsToCHChunk(name_to_arrow_column, num_rows, block_missing_values);
1408+
return arrowColumnsToCHChunk(name_to_arrow_column, num_rows, metadata, block_missing_values);
13511409
}
13521410

1353-
Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values)
1411+
Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(
1412+
const NameToArrowColumn & name_to_arrow_column,
1413+
size_t num_rows,
1414+
std::shared_ptr<const arrow::KeyValueMetadata> metadata,
1415+
BlockMissingValues * block_missing_values)
13541416
{
13551417
ReadColumnFromArrowColumnSettings settings
13561418
{
@@ -1367,6 +1429,9 @@ Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & nam
13671429

13681430
std::unordered_map<String, std::pair<BlockPtr, std::shared_ptr<NestedColumnExtractHelper>>> nested_tables;
13691431

1432+
auto geo_metadata = extractGeoMetadata(metadata);
1433+
std::unordered_map<String, GeoColumnMetadata> geo_columns = parseGeoMetadataEncoding(geo_metadata);
1434+
13701435
for (size_t column_i = 0, header_columns = header.columns(); column_i < header_columns; ++column_i)
13711436
{
13721437
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
@@ -1408,6 +1473,7 @@ Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & nam
14081473
nested_table_type,
14091474
arrow_column.field->nullable() /*is_nullable_column*/,
14101475
false /*is_map_nested_column*/,
1476+
geo_columns.contains(header_column.name) ? std::optional(geo_columns[header_column.name]) : std::nullopt,
14111477
settings)
14121478
};
14131479

@@ -1448,6 +1514,7 @@ Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & nam
14481514
header_column.type,
14491515
arrow_column.field->nullable(),
14501516
false /*is_map_nested_column*/,
1517+
geo_columns.contains(header_column.name) ? std::optional(geo_columns[header_column.name]) : std::nullopt,
14511518
settings);
14521519
}
14531520

src/Processors/Formats/Impl/ArrowColumnToCHColumn.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include <cstdint>
4+
35
#include "config.h"
46

57
#if USE_ARROW || USE_ORC || USE_PARQUET
@@ -28,11 +30,16 @@ class ArrowColumnToCHColumn
2830
bool case_insensitive_matching_ = false,
2931
bool is_stream_ = false);
3032

31-
Chunk arrowTableToCHChunk(const std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
33+
Chunk arrowTableToCHChunk(
34+
const std::shared_ptr<arrow::Table> & table,
35+
size_t num_rows,
36+
std::shared_ptr<const arrow::KeyValueMetadata> metadata,
37+
BlockMissingValues * block_missing_values = nullptr);
3238

3339
/// Transform arrow schema to ClickHouse header
3440
static Block arrowSchemaToCHHeader(
3541
const arrow::Schema & schema,
42+
std::shared_ptr<const arrow::KeyValueMetadata> metadata,
3643
const std::string & format_name,
3744
bool skip_columns_with_unsupported_types = false,
3845
bool allow_inferring_nullable_columns = true,
@@ -45,7 +52,6 @@ class ArrowColumnToCHColumn
4552
UInt64 dictionary_size;
4653
};
4754

48-
4955
private:
5056
struct ArrowColumn
5157
{
@@ -55,7 +61,11 @@ class ArrowColumnToCHColumn
5561

5662
using NameToArrowColumn = std::unordered_map<std::string, ArrowColumn>;
5763

58-
Chunk arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values);
64+
Chunk arrowColumnsToCHChunk(
65+
const NameToArrowColumn & name_to_arrow_column,
66+
size_t num_rows,
67+
std::shared_ptr<const arrow::KeyValueMetadata> metadata,
68+
BlockMissingValues * block_missing_values);
5969

6070
const Block & header;
6171
const std::string format_name;

0 commit comments

Comments
 (0)