Skip to content

Commit ee84f2b

Browse files
committed
icebergLocalCluster table function
1 parent 1a190de commit ee84f2b

10 files changed

+65
-11
lines changed

src/Storages/ObjectStorage/Local/Configuration.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co
8181
.ignore_non_existent_file = false};
8282
}
8383

84+
ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const
85+
{
86+
auto arguments = std::make_shared<ASTExpressionList>();
87+
88+
arguments->children.push_back(std::make_shared<ASTLiteral>(path.path));
89+
if (getFormat() != "auto")
90+
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
91+
if (getStructure() != "auto")
92+
arguments->children.push_back(std::make_shared<ASTLiteral>(getStructure()));
93+
if (getCompressionMethod() != "auto")
94+
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));
95+
96+
return arguments;
97+
}
98+
99+
84100
}

src/Storages/ObjectStorage/Local/Configuration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration
6060

6161
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }
6262

63+
ASTPtr createArgsWithAccessData() const override;
64+
6365
private:
6466
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
6567
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
295295
{"IcebergS3", "icebergS3"},
296296
{"IcebergAzure", "icebergAzure"},
297297
{"IcebergHDFS", "icebergHDFS"},
298+
{"IcebergLocal", "icebergLocal"},
298299
{"DeltaLake", "deltaLake"},
299300
{"DeltaLakeS3", "deltaLakeS3"},
300301
{"DeltaLakeAzure", "deltaLakeAzure"},
@@ -416,6 +417,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
416417
{"icebergS3", "icebergS3Cluster"},
417418
{"icebergAzure", "icebergAzureCluster"},
418419
{"icebergHDFS", "icebergHDFSCluster"},
420+
{"icebergLocal", "icebergLocalCluster"},
419421
{"deltaLake", "deltaLakeCluster"},
420422
{"deltaLakeS3", "deltaLakeS3Cluster"},
421423
{"deltaLakeAzure", "deltaLakeAzureCluster"},

src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition
155155
static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name;
156156
};
157157

158+
struct IcebergLocalClusterDefinition
159+
{
160+
static constexpr auto name = "icebergLocalCluster";
161+
static constexpr auto storage_engine_name = "IcebergLocalCluster";
162+
static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name;
163+
};
164+
158165
struct DeltaLakeClusterDefinition
159166
{
160167
static constexpr auto name = "deltaLakeCluster";

src/TableFunctions/TableFunctionObjectStorage.cpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
296296

297297
#if USE_AVRO
298298
template class TableFunctionObjectStorage<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
299+
template class TableFunctionObjectStorage<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
299300
#endif
300301

301302
#if USE_AVRO && USE_AWS_S3
@@ -334,13 +335,4 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
334335
.allow_readonly = false});
335336
}
336337
#endif
337-
338-
339-
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
340-
{
341-
UNUSED(factory);
342-
#if USE_AVRO
343-
registerTableFunctionIceberg(factory);
344-
#endif
345-
}
346338
}

src/TableFunctions/TableFunctionObjectStorageCluster.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
155155
.category = FunctionDocumentation::Category::TableFunction},
156156
.allow_readonly = false});
157157

158+
factory.registerFunction<TableFunctionIcebergLocalCluster>(
159+
{.documentation
160+
= {.description = R"(The table function can be used to read the Iceberg table stored on shared storage in parallel for many nodes in a specified cluster.)",
161+
.examples{{IcebergLocalClusterDefinition::name, "SELECT * FROM icebergLocalCluster(cluster, filename, format, [,compression])", ""}},
162+
.category = FunctionDocumentation::Category::TableFunction},
163+
.allow_readonly = false});
164+
158165
# if USE_AWS_S3
159166
factory.registerFunction<TableFunctionIcebergS3Cluster>(
160167
{.documentation

src/TableFunctions/TableFunctionObjectStorageCluster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDe
6262

6363
#if USE_AVRO
6464
using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
65+
using TableFunctionIcebergLocalCluster = TableFunctionObjectStorageCluster<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
6566
#endif
6667

6768
#if USE_AVRO && USE_AWS_S3

src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ struct IcebergHDFSClusterFallbackDefinition
6767
static constexpr auto storage_engine_cluster_name = "IcebergHDFSCluster";
6868
};
6969

70+
struct IcebergLocalClusterFallbackDefinition
71+
{
72+
static constexpr auto name = "icebergLocal";
73+
static constexpr auto storage_engine_name = "Local";
74+
static constexpr auto storage_engine_cluster_name = "IcebergLocalCluster";
75+
};
76+
7077
struct DeltaLakeClusterFallbackDefinition
7178
{
7279
static constexpr auto name = "deltaLake";
@@ -163,6 +170,7 @@ using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallba
163170

164171
#if USE_AVRO
165172
using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback<IcebergClusterFallbackDefinition, TableFunctionIcebergCluster>;
173+
using TableFunctionIcebergLocalClusterFallback = TableFunctionObjectStorageClusterFallback<IcebergLocalClusterFallbackDefinition, TableFunctionIcebergLocalCluster>;
166174
#endif
167175

168176
#if USE_AVRO && USE_AWS_S3
@@ -286,6 +294,27 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
286294
.allow_readonly = false
287295
}
288296
);
297+
298+
factory.registerFunction<TableFunctionIcebergLocalClusterFallback>(
299+
{
300+
.documentation = {
301+
.description=R"(The table function can be used to read the Iceberg table stored on shared disk in parallel for many nodes in a specified cluster or from single node.)",
302+
.examples{
303+
{
304+
"icebergLocal",
305+
"SELECT * FROM icebergLocal(filename)", ""
306+
},
307+
{
308+
"icebergLocal",
309+
"SELECT * FROM icebergLocal(filename) "
310+
"SETTINGS object_storage_cluster='cluster'", ""
311+
},
312+
},
313+
.category = FunctionDocumentation::Category::TableFunction
314+
},
315+
.allow_readonly = false
316+
}
317+
);
289318
#endif
290319

291320
#if USE_AVRO && USE_AWS_S3

src/TableFunctions/registerTableFunctions.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ void registerTableFunctions()
6969
registerTableFunctionObjectStorage(factory);
7070
registerTableFunctionObjectStorageCluster(factory);
7171
registerTableFunctionObjectStorageClusterFallback(factory);
72-
registerDataLakeTableFunctions(factory);
7372
registerDataLakeClusterTableFunctions(factory);
7473

7574
#if USE_YTSAURUS

src/TableFunctions/registerTableFunctions.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
7070
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
7171
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
7272
void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory);
73-
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
7473
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
7574

7675
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);

0 commit comments

Comments
 (0)