Skip to content

Commit 254f0aa

Browse files
committed
Fix deltaLakeS3/deltaLakeAzure
1 parent 167e2c2 commit 254f0aa

File tree

10 files changed

+118
-9
lines changed

10 files changed

+118
-9
lines changed

src/Access/Common/AccessType.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ namespace AccessTypeObjects
2323
enum class Source : uint8_t
2424
{
2525
#define APPLY_FOR_SOURCE(M) \
26-
M(UNDEFINED, "") \
2726
M(FILE, "File") \
2827
M(URL, "") \
2928
M(REMOTE, "Distributed") \

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
288288
{"IcebergAzure", "icebergAzure"},
289289
{"IcebergHDFS", "icebergHDFS"},
290290
{"DeltaLake", "deltaLake"},
291+
{"DeltaLakeS3", "deltaLakeS3"},
292+
{"DeltaLakeAzure", "deltaLakeAzure"},
291293
{"Hudi", "hudi"}
292294
};
293295

@@ -407,6 +409,8 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
407409
{"icebergAzure", "icebergAzureCluster"},
408410
{"icebergHDFS", "icebergHDFSCluster"},
409411
{"deltaLake", "deltaLakeCluster"},
412+
{"deltaLakeS3", "deltaLakeS3Cluster"},
413+
{"deltaLakeAzure", "deltaLakeAzureCluster"},
410414
{"hudi", "hudiCluster"},
411415
};
412416

src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,20 @@ struct DeltaLakeClusterDefinition
162162
static constexpr auto non_clustered_storage_engine_name = DeltaLakeDefinition::storage_engine_name;
163163
};
164164

165+
struct DeltaLakeS3ClusterDefinition
166+
{
167+
static constexpr auto name = "deltaLakeS3Cluster";
168+
static constexpr auto storage_engine_name = "DeltaLakeS3Cluster";
169+
static constexpr auto non_clustered_storage_engine_name = DeltaLakeS3Definition::storage_engine_name;
170+
};
171+
172+
struct DeltaLakeAzureClusterDefinition
173+
{
174+
static constexpr auto name = "deltaLakeAzureCluster";
175+
static constexpr auto storage_engine_name = "DeltaLakeAzureCluster";
176+
static constexpr auto non_clustered_storage_engine_name = DeltaLakeAzureDefinition::storage_engine_name;
177+
};
178+
165179
struct HudiClusterDefinition
166180
{
167181
static constexpr auto name = "hudiCluster";

src/Storages/ObjectStorage/registerStorageObjectStorage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ void registerStorageIceberg(StorageFactory & factory)
220220
.supports_settings = true,
221221
.supports_sort_order = true,
222222
.supports_schema_inference = true,
223-
.source_access_type = AccessTypeObjects::Source::UNDEFINED,
223+
.source_access_type = AccessTypeObjects::Source::S3,
224224
.has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin,
225225
});
226226

@@ -298,7 +298,7 @@ void registerStorageIceberg(StorageFactory & factory)
298298
#if USE_PARQUET && USE_DELTA_KERNEL_RS
299299
void registerStorageDeltaLake(StorageFactory & factory)
300300
{
301-
#if USE_AWS_S3
301+
# if USE_AWS_S3
302302
factory.registerStorage(
303303
DeltaLakeDefinition::storage_engine_name,
304304
[&](const StorageFactory::Arguments & args)

src/TableFunctions/TableFunctionObjectStorage.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,11 @@ template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageH
312312

313313
#if USE_PARQUET && USE_AWS_S3 && USE_DELTA_KERNEL_RS
314314
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration, true>;
315+
template class TableFunctionObjectStorage<DeltaLakeS3ClusterDefinition, StorageS3DeltaLakeConfiguration, true>;
316+
#endif
317+
318+
#if USE_PARQUET && USE_AWS_S3 && USE_DELTA_KERNEL_RS
319+
template class TableFunctionObjectStorage<DeltaLakeAzureClusterDefinition, StorageAzureDeltaLakeConfiguration, true>;
315320
#endif
316321

317322
#if USE_AWS_S3

src/TableFunctions/TableFunctionObjectStorage.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,13 @@ using TableFunctionIcebergHDFS = TableFunctionObjectStorage<IcebergHDFSDefinitio
138138
using TableFunctionIcebergLocal = TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration, true>;
139139
#endif
140140
#if USE_PARQUET && USE_DELTA_KERNEL_RS
141-
#if USE_AWS_S3
141+
# if USE_AWS_S3
142142
using TableFunctionDeltaLake = TableFunctionObjectStorage<DeltaLakeDefinition, StorageS3DeltaLakeConfiguration, true>;
143143
using TableFunctionDeltaLakeS3 = TableFunctionObjectStorage<DeltaLakeS3Definition, StorageS3DeltaLakeConfiguration, true>;
144-
#endif
145-
#if USE_AZURE_BLOB_STORAGE
144+
# endif
145+
# if USE_AZURE_BLOB_STORAGE
146146
using TableFunctionDeltaLakeAzure = TableFunctionObjectStorage<DeltaLakeAzureDefinition, StorageAzureDeltaLakeConfiguration, true>;
147-
#endif
147+
# endif
148148
// New alias for local Delta Lake table function
149149
using TableFunctionDeltaLakeLocal = TableFunctionObjectStorage<DeltaLakeLocalDefinition, StorageLocalDeltaLakeConfiguration, true>;
150150
#endif

src/TableFunctions/TableFunctionObjectStorageCluster.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,19 +184,37 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
184184
}
185185
#endif
186186

187-
#if USE_AWS_S3
188187
#if USE_PARQUET && USE_DELTA_KERNEL_RS
189188
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
190189
{
190+
# if USE_AWS_S3
191191
factory.registerFunction<TableFunctionDeltaLakeCluster>(
192192
{.documentation
193193
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
194194
.examples{{DeltaLakeClusterDefinition::name, "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
195195
.category = FunctionDocumentation::Category::TableFunction},
196196
.allow_readonly = false});
197+
198+
factory.registerFunction<TableFunctionDeltaLakeS3Cluster>(
199+
{.documentation
200+
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
201+
.examples{{DeltaLakeS3ClusterDefinition::name, "SELECT * FROM deltaLakeS3Cluster(cluster, url, access_key_id, secret_access_key)", ""}},
202+
.category = FunctionDocumentation::Category::TableFunction},
203+
.allow_readonly = false});
204+
# endif
205+
206+
# if USE_AZURE_BLOB_STORAGE
207+
factory.registerFunction<TableFunctionDeltaLakeAzureCluster>(
208+
{.documentation
209+
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
210+
.examples{{DeltaLakeAzureClusterDefinition::name, "SELECT * FROM deltaLakeAzureCluster(cluster, url, access_key_id, secret_access_key)", ""}},
211+
.category = FunctionDocumentation::Category::TableFunction},
212+
.allow_readonly = false});
213+
# endif
197214
}
198215
#endif
199216

217+
#if USE_AWS_S3
200218
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
201219
{
202220
factory.registerFunction<TableFunctionHudiCluster>(

src/TableFunctions/TableFunctionObjectStorageCluster.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<Iceber
7878

7979
#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS
8080
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration, true>;
81+
using TableFunctionDeltaLakeS3Cluster = TableFunctionObjectStorageCluster<DeltaLakeS3ClusterDefinition, StorageS3DeltaLakeConfiguration, true>;
82+
#endif
83+
84+
#if USE_AZURE_BLOB_STORAGE && USE_PARQUET && USE_DELTA_KERNEL_RS
85+
using TableFunctionDeltaLakeAzureCluster = TableFunctionObjectStorageCluster<DeltaLakeAzureClusterDefinition, StorageAzureDeltaLakeConfiguration, true>;
8186
#endif
8287

8388
#if USE_AWS_S3

src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ struct DeltaLakeClusterFallbackDefinition
7474
static constexpr auto storage_engine_cluster_name = "DeltaLakeS3Cluster";
7575
};
7676

77+
struct DeltaLakeS3ClusterFallbackDefinition
78+
{
79+
static constexpr auto name = "deltaLakeS3";
80+
static constexpr auto storage_engine_name = "S3";
81+
static constexpr auto storage_engine_cluster_name = "DeltaLakeS3Cluster";
82+
};
83+
84+
struct DeltaLakeAzureClusterFallbackDefinition
85+
{
86+
static constexpr auto name = "deltaLakeAzure";
87+
static constexpr auto storage_engine_name = "Azure";
88+
static constexpr auto storage_engine_cluster_name = "DeltaLakeAzureCluster";
89+
};
90+
7791
struct HudiClusterFallbackDefinition
7892
{
7993
static constexpr auto name = "hudi";
@@ -165,6 +179,11 @@ using TableFunctionIcebergHDFSClusterFallback = TableFunctionObjectStorageCluste
165179

166180
#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS
167181
using TableFunctionDeltaLakeClusterFallback = TableFunctionObjectStorageClusterFallback<DeltaLakeClusterFallbackDefinition, TableFunctionDeltaLakeCluster>;
182+
using TableFunctionDeltaLakeS3ClusterFallback = TableFunctionObjectStorageClusterFallback<DeltaLakeS3ClusterFallbackDefinition, TableFunctionDeltaLakeS3Cluster>;
183+
#endif
184+
185+
#if USE_AZURE_BLOB_STORAGE && USE_PARQUET && USE_DELTA_KERNEL_RS
186+
using TableFunctionDeltaLakeAzureClusterFallback = TableFunctionObjectStorageClusterFallback<DeltaLakeAzureClusterFallbackDefinition, TableFunctionDeltaLakeAzureCluster>;
168187
#endif
169188

170189
#if USE_AWS_S3
@@ -337,7 +356,8 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
337356
);
338357
#endif
339358

340-
#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS
359+
#if USE_PARQUET && USE_DELTA_KERNEL_RS
360+
# if USE_AWS_S3
341361
factory.registerFunction<TableFunctionDeltaLakeClusterFallback>(
342362
{
343363
.documentation = {
@@ -358,6 +378,49 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
358378
.allow_readonly = false
359379
}
360380
);
381+
factory.registerFunction<TableFunctionDeltaLakeS3ClusterFallback>(
382+
{
383+
.documentation = {
384+
.description=R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster or from single node.)",
385+
.examples{
386+
{
387+
"deltaLakeS3",
388+
"SELECT * FROM deltaLakeS3(url, access_key_id, secret_access_key)", ""
389+
},
390+
{
391+
"deltaLakeS3",
392+
"SELECT * FROM deltaLakeS3(url, access_key_id, secret_access_key) "
393+
"SETTINGS object_storage_cluster='cluster'", ""
394+
},
395+
},
396+
.category = FunctionDocumentation::Category::TableFunction
397+
},
398+
.allow_readonly = false
399+
}
400+
);
401+
# endif
402+
# if USE_AZURE_BLOB_STORAGE
403+
factory.registerFunction<TableFunctionDeltaLakeAzureClusterFallback>(
404+
{
405+
.documentation = {
406+
.description=R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster or from single node.)",
407+
.examples{
408+
{
409+
"deltaLakeAzure",
410+
"SELECT * FROM deltaLakeAzure(url, access_key_id, secret_access_key)", ""
411+
},
412+
{
413+
"deltaLakeAzure",
414+
"SELECT * FROM deltaLakeAzure(url, access_key_id, secret_access_key) "
415+
"SETTINGS object_storage_cluster='cluster'", ""
416+
},
417+
},
418+
.category = FunctionDocumentation::Category::TableFunction
419+
},
420+
.allow_readonly = false
421+
}
422+
);
423+
# endif
361424
#endif
362425

363426
#if USE_AWS_S3

tests/integration/test_storage_iceberg/test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import json
23
import os
34
import subprocess
45
import time

0 commit comments

Comments
 (0)