Skip to content

Commit 10f1a23

Browse files
authored
Merge pull request #1120 from Altinity/feature/antalya-25.8/iceberg_local_cluster
icebergLocalCluster table function
2 parents 91e6a82 + 3842dfd commit 10f1a23

17 files changed

+165
-41
lines changed

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -434,17 +434,18 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
434434
configuration,
435435
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
436436
StorageID(getDatabaseName(), name),
437-
/* columns */columns,
438-
/* constraints */ConstraintsDescription{},
439-
/* partition_by */nullptr,
437+
/* columns */ columns,
438+
/* constraints */ ConstraintsDescription{},
439+
/* partition_by */ nullptr,
440440
context_copy,
441-
/* comment */"",
441+
/* comment */ "",
442442
getFormatSettings(context_copy),
443443
LoadingStrictnessLevel::CREATE,
444444
getCatalog(),
445-
/* if_not_exists*/true,
446-
/* is_datalake_query*/true,
447-
/* lazy_init */true);
445+
/* if_not_exists */ true,
446+
/* is_datalake_query */ true,
447+
/* is_table_function */ true,
448+
/* lazy_init */ true);
448449
}
449450

450451
void DatabaseDataLake::dropTable( /// NOLINT

src/Storages/IStorageCluster.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ void IStorageCluster::read(
288288
return;
289289
}
290290

291+
updateConfigurationIfNeeded(context);
292+
291293
storage_snapshot->check(column_names);
292294

293295
const auto & settings = context->getSettingsRef();

src/Storages/IStorageCluster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class IStorageCluster : public IStorage
9797
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
9898
}
9999

100+
virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}
101+
100102
private:
101103
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);
102104

src/Storages/ObjectStorage/Local/Configuration.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co
8181
.ignore_non_existent_file = false};
8282
}
8383

84+
ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const
85+
{
86+
auto arguments = std::make_shared<ASTExpressionList>();
87+
88+
arguments->children.push_back(std::make_shared<ASTLiteral>(path.path));
89+
if (getFormat() != "auto")
90+
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
91+
if (getStructure() != "auto")
92+
arguments->children.push_back(std::make_shared<ASTLiteral>(getStructure()));
93+
if (getCompressionMethod() != "auto")
94+
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));
95+
96+
return arguments;
97+
}
98+
99+
84100
}

src/Storages/ObjectStorage/Local/Configuration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration
6060

6161
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }
6262

63+
ASTPtr createArgsWithAccessData() const override;
64+
6365
private:
6466
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
6567
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
9191
std::shared_ptr<DataLake::ICatalog> catalog,
9292
bool if_not_exists,
9393
bool is_datalake_query,
94+
bool is_table_function,
9495
bool lazy_init)
9596
: IStorageCluster(
9697
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
@@ -145,6 +146,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
145146
tryLogCurrentException(log_);
146147
}
147148

149+
// For tables need to update configuration on each read
150+
// because data can be changed after previous update
151+
update_configuration_on_read_write = !is_table_function;
152+
148153
ColumnsDescription columns{columns_in_table_or_function_definition};
149154
std::string sample_path;
150155
if (need_resolve_columns_or_format)
@@ -295,6 +300,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
295300
{"IcebergS3", "icebergS3"},
296301
{"IcebergAzure", "icebergAzure"},
297302
{"IcebergHDFS", "icebergHDFS"},
303+
{"IcebergLocal", "icebergLocal"},
298304
{"DeltaLake", "deltaLake"},
299305
{"DeltaLakeS3", "deltaLakeS3"},
300306
{"DeltaLakeAzure", "deltaLakeAzure"},
@@ -416,6 +422,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
416422
{"icebergS3", "icebergS3Cluster"},
417423
{"icebergAzure", "icebergAzureCluster"},
418424
{"icebergHDFS", "icebergHDFSCluster"},
425+
{"icebergLocal", "icebergLocalCluster"},
419426
{"deltaLake", "deltaLakeCluster"},
420427
{"deltaLakeS3", "deltaLakeS3Cluster"},
421428
{"deltaLakeAzure", "deltaLakeAzureCluster"},
@@ -741,6 +748,18 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr
741748
return configuration->getExternalMetadata();
742749
}
743750

751+
void StorageObjectStorageCluster::updateConfigurationIfNeeded(ContextPtr context)
752+
{
753+
if (update_configuration_on_read_write)
754+
{
755+
configuration->update(
756+
object_storage,
757+
context,
758+
/* if_not_updated_before */false,
759+
/* check_consistent_with_previous_metadata */false);
760+
}
761+
}
762+
744763
void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const
745764
{
746765
if (getClusterName(context).empty())

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::shared_ptr<DataLake::ICatalog> catalog,
2626
bool if_not_exists,
2727
bool is_datalake_query,
28-
bool lazy_init = false);
28+
bool is_table_function,
29+
bool lazy_init);
2930

3031
std::string getName() const override;
3132

@@ -154,6 +155,8 @@ class StorageObjectStorageCluster : public IStorageCluster
154155
ContextPtr context,
155156
bool async_insert) override;
156157

158+
void updateConfigurationIfNeeded(ContextPtr context) override;
159+
157160
/*
158161
In case the table was created with `object_storage_cluster` setting,
159162
modify the AST query object so that it uses the table function implementation
@@ -176,6 +179,7 @@ class StorageObjectStorageCluster : public IStorageCluster
176179

177180
/// non-clustered storage to fall back on pure realisation if needed
178181
std::shared_ptr<StorageObjectStorage> pure_storage;
182+
bool update_configuration_on_read_write;
179183
};
180184

181185
}

src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition
155155
static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name;
156156
};
157157

158+
struct IcebergLocalClusterDefinition
159+
{
160+
static constexpr auto name = "icebergLocalCluster";
161+
static constexpr auto storage_engine_name = "IcebergLocalCluster";
162+
static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name;
163+
};
164+
158165
struct DeltaLakeClusterDefinition
159166
{
160167
static constexpr auto name = "deltaLakeCluster";

src/Storages/ObjectStorage/registerStorageObjectStorage.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
9898
args.mode,
9999
configuration->getCatalog(context, args.query.attach),
100100
args.query.if_not_exists,
101-
/* is_datalake_query*/ false);
101+
/* is_datalake_query */ false,
102+
/* is_table_function */ false,
103+
/* lazy_init */ false);
102104
}
103105

104106
#endif

src/TableFunctions/TableFunctionObjectStorage.cpp

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,11 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration, is_data_lake>::
204204
/* comment */ String{},
205205
/* format_settings */ std::nullopt, /// No format_settings
206206
/* mode */ LoadingStrictnessLevel::CREATE,
207-
configuration->getCatalog(context, /*attach*/ false),
207+
configuration->getCatalog(context, /* attach */ false),
208208
/* if_not_exists */ false,
209-
/* is_datalake_query*/ false);
209+
/* is_datalake_query */ false,
210+
/* is_table_function */ true,
211+
/* lazy_init */ false);
210212

211213
storage->startup();
212214
return storage;
@@ -296,6 +298,7 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
296298

297299
#if USE_AVRO
298300
template class TableFunctionObjectStorage<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
301+
template class TableFunctionObjectStorage<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
299302
#endif
300303

301304
#if USE_AVRO && USE_AWS_S3
@@ -334,13 +337,4 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
334337
.allow_readonly = false});
335338
}
336339
#endif
337-
338-
339-
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
340-
{
341-
UNUSED(factory);
342-
#if USE_AVRO
343-
registerTableFunctionIceberg(factory);
344-
#endif
345-
}
346340
}

0 commit comments

Comments
 (0)