Skip to content

Commit 410f2f3

Browse files
authored
Merge pull request #1083 from Altinity/antalya_25_8_always_support_import_on_object_storage
2 parents 693da52 + 3f18cdc commit 410f2f3

9 files changed

+110
-12
lines changed

src/Storages/MergeTree/IMergeTreeDataPart.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,42 @@ String IMergeTreeDataPart::MinMaxIndex::getFileColumnName(const String & column_
279279
return stream_name;
280280
}
281281

282+
Block IMergeTreeDataPart::MinMaxIndex::getBlock(const MergeTreeData & data) const
283+
{
284+
if (!initialized)
285+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to get block from uninitialized MinMax index.");
286+
287+
Block block;
288+
289+
const auto metadata_snapshot = data.getInMemoryMetadataPtr();
290+
const auto & partition_key = metadata_snapshot->getPartitionKey();
291+
292+
const auto minmax_column_names = data.getMinMaxColumnsNames(partition_key);
293+
const auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
294+
const auto minmax_idx_size = minmax_column_types.size();
295+
296+
for (size_t i = 0; i < minmax_idx_size; ++i)
297+
{
298+
const auto & data_type = minmax_column_types[i];
299+
const auto & column_name = minmax_column_names[i];
300+
301+
const auto column = data_type->createColumn();
302+
303+
auto range = hyperrectangle.at(i);
304+
range.shrinkToIncludedIfPossible();
305+
306+
const auto & min_val = range.left;
307+
const auto & max_val = range.right;
308+
309+
column->insert(min_val);
310+
column->insert(max_val);
311+
312+
block.insert(ColumnWithTypeAndName(column->getPtr(), data_type, column_name));
313+
}
314+
315+
return block;
316+
}
317+
282318
void IMergeTreeDataPart::incrementStateMetric(MergeTreeDataPartState state_) const
283319
{
284320
switch (state_)

src/Storages/MergeTree/IMergeTreeDataPart.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
362362
static String getFileColumnName(const String & column_name, const MergeTreeSettingsPtr & storage_settings_);
363363
/// For Load
364364
static String getFileColumnName(const String & column_name, const Checksums & checksums_);
365+
366+
Block getBlock(const MergeTreeData & data) const;
365367
};
366368

367369
using MinMaxIndexPtr = std::shared_ptr<MinMaxIndex>;

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6268,16 +6268,13 @@ void MergeTreeData::exportPartToTableImpl(
62686268

62696269
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
62706270

6271-
NamesAndTypesList partition_columns;
6271+
Block block_with_partition_values;
62726272
if (metadata_snapshot->hasPartitionKey())
62736273
{
6274-
const auto & partition_key = metadata_snapshot->getPartitionKey();
6275-
if (!partition_key.column_names.empty())
6276-
partition_columns = partition_key.expression->getRequiredColumnsWithTypes();
6274+
/// todo arthur do I need to init minmax_idx?
6275+
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(*this);
62776276
}
62786277

6279-
auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns);
6280-
62816278
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, getContext());
62826279
if (!destination_storage)
62836280
{

src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Storages/PartitionedSink.h>
55
#include <Poco/String.h>
66
#include <Functions/generateSnowflakeID.h>
7+
#include <boost/algorithm/string/replace.hpp>
78

89
namespace DB
910
{
@@ -19,12 +20,15 @@ namespace DB
1920

2021
struct ObjectStorageWildcardFilePathGenerator : ObjectStorageFilePathGenerator
2122
{
23+
static constexpr const char * FILE_WILDCARD = "{_file}";
2224
explicit ObjectStorageWildcardFilePathGenerator(const std::string & raw_path_) : raw_path(raw_path_) {}
2325

2426
using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope
25-
std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const override
27+
std::string getPathForWrite(const std::string & partition_id, const std::string & file_name_override) const override
2628
{
27-
return PartitionedSink::replaceWildcards(raw_path, partition_id);
29+
const auto partition_replaced_path = PartitionedSink::replaceWildcards(raw_path, partition_id);
30+
const auto final_path = boost::replace_all_copy(partition_replaced_path, FILE_WILDCARD, file_name_override);
31+
return final_path;
2832
}
2933

3034
std::string getPathForRead() const override

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,13 @@ bool StorageObjectStorage::optimize(
468468

469469
bool StorageObjectStorage::supportsImport() const
470470
{
471-
return configuration->partition_strategy != nullptr && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE;
471+
if (!configuration->partition_strategy)
472+
return false;
473+
474+
if (configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::WILDCARD)
475+
return configuration->getRawPath().hasExportFilenameWildcard();
476+
477+
return configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE;
472478
}
473479

474480
SinkToStoragePtr StorageObjectStorage::import(

src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ bool StorageObjectStorageConfiguration::Path::hasPartitionWildcard() const
159159
return path.find(PARTITION_ID_WILDCARD) != String::npos;
160160
}
161161

162+
bool StorageObjectStorageConfiguration::Path::hasExportFilenameWildcard() const
163+
{
164+
return path.find(ObjectStorageWildcardFilePathGenerator::FILE_WILDCARD) != String::npos;
165+
}
166+
167+
162168
bool StorageObjectStorageConfiguration::Path::hasGlobsIgnorePartitionWildcard() const
163169
{
164170
if (!hasPartitionWildcard())

src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class StorageObjectStorageConfiguration
6262
std::string path;
6363

6464
bool hasPartitionWildcard() const;
65+
bool hasExportFilenameWildcard() const;
6566
bool hasGlobsIgnorePartitionWildcard() const;
6667
bool hasGlobs() const;
6768
std::string cutGlobs(bool supports_partial_prefix) const;

tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,20 @@
1414
2 2020
1515
3 2020
1616
4 2021
17+
---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table
18+
---- Both data parts should appear
19+
1 2020
20+
2 2020
21+
3 2020
22+
4 2021
23+
---- Export the same part again, it should be idempotent
24+
1 2020
25+
2 2020
26+
3 2020
27+
4 2021
28+
---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table with partition expression with function
29+
---- Both data parts should appear
30+
1 2020
31+
2 2020
32+
3 2020
33+
4 2021

tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
#!/usr/bin/env bash
2-
# Tags: no-fasttest
32

43
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
54
# shellcheck source=../shell_config.sh
65
. "$CURDIR"/../shell_config.sh
76

87
mt_table="mt_table_${RANDOM}"
8+
mt_table_partition_expression_with_function="mt_table_partition_expression_with_function_${RANDOM}"
99
s3_table="s3_table_${RANDOM}"
10+
s3_table_wildcard="s3_table_wildcard_${RANDOM}"
11+
s3_table_wildcard_partition_expression_with_function="s3_table_wildcard_partition_expression_with_function_${RANDOM}"
1012
mt_table_roundtrip="mt_table_roundtrip_${RANDOM}"
1113

1214
query() {
1315
$CLICKHOUSE_CLIENT --query "$1"
1416
}
1517

16-
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip"
18+
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function"
1719

1820
query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()"
1921
query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year"
@@ -36,4 +38,31 @@ query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year O
3638
echo "---- Data in roundtrip MergeTree table (should match s3_table)"
3739
query "SELECT * FROM $s3_table ORDER BY id"
3840

39-
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip"
41+
query "CREATE TABLE $s3_table_wildcard (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table_wildcard/{_partition_id}/{_file}.parquet', format=Parquet, partition_strategy='wildcard') PARTITION BY year"
42+
43+
echo "---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table"
44+
query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"
45+
query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"
46+
47+
echo "---- Both data parts should appear"
48+
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard/**.parquet') ORDER BY id"
49+
50+
echo "---- Export the same part again, it should be idempotent"
51+
query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"
52+
53+
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard/**.parquet') ORDER BY id"
54+
55+
query "CREATE TABLE $mt_table_partition_expression_with_function (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY toString(year) ORDER BY tuple()"
56+
query "CREATE TABLE $s3_table_wildcard_partition_expression_with_function (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/{_partition_id}/{_file}.parquet', format=Parquet, partition_strategy='wildcard') PARTITION BY toString(year)"
57+
58+
# insert
59+
query "INSERT INTO $mt_table_partition_expression_with_function VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)"
60+
61+
echo "---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table with partition expression with function"
62+
query "ALTER TABLE $mt_table_partition_expression_with_function EXPORT PART 'cb217c742dc7d143b61583011996a160_1_1_0' TO TABLE $s3_table_wildcard_partition_expression_with_function SETTINGS allow_experimental_export_merge_tree_part = 1"
63+
query "ALTER TABLE $mt_table_partition_expression_with_function EXPORT PART '3be6d49ecf9749a383964bc6fab22d10_2_2_0' TO TABLE $s3_table_wildcard_partition_expression_with_function SETTINGS allow_experimental_export_merge_tree_part = 1"
64+
65+
echo "---- Both data parts should appear"
66+
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/**.parquet') ORDER BY id"
67+
68+
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function"

0 commit comments

Comments
 (0)