Skip to content

Commit 5d2d902

Browse files
authored
Merge branch 'antalya-25.8' into backports/antalya-25.8/87220
2 parents c64498e + 615b144 commit 5d2d902

21 files changed

+297
-43
lines changed

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -434,17 +434,18 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
434434
configuration,
435435
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
436436
StorageID(getDatabaseName(), name),
437-
/* columns */columns,
438-
/* constraints */ConstraintsDescription{},
439-
/* partition_by */nullptr,
437+
/* columns */ columns,
438+
/* constraints */ ConstraintsDescription{},
439+
/* partition_by */ nullptr,
440440
context_copy,
441-
/* comment */"",
441+
/* comment */ "",
442442
getFormatSettings(context_copy),
443443
LoadingStrictnessLevel::CREATE,
444444
getCatalog(),
445-
/* if_not_exists*/true,
446-
/* is_datalake_query*/true,
447-
/* lazy_init */true);
445+
/* if_not_exists */ true,
446+
/* is_datalake_query */ true,
447+
/* is_table_function */ true,
448+
/* lazy_init */ true);
448449
}
449450

450451
void DatabaseDataLake::dropTable( /// NOLINT

src/Parsers/ASTSystemQuery.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,13 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
163163

164164
print_keyword("SYSTEM") << " ";
165165
print_keyword(typeToString(type));
166-
if (!cluster.empty())
166+
167+
std::unordered_set<Type> queries_with_on_cluster_at_end = {
168+
Type::DROP_FILESYSTEM_CACHE,
169+
Type::SYNC_FILESYSTEM_CACHE,
170+
};
171+
172+
if (!queries_with_on_cluster_at_end.contains(type) && !cluster.empty())
167173
formatOnCluster(ostr, settings);
168174

169175
switch (type)
@@ -519,6 +525,9 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
519525
case Type::END:
520526
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown SYSTEM command");
521527
}
528+
529+
if (queries_with_on_cluster_at_end.contains(type) && !cluster.empty())
530+
formatOnCluster(ostr, settings);
522531
}
523532

524533

src/Storages/IStorage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
412412
size_t /*max_block_size*/,
413413
size_t /*num_streams*/);
414414

415+
public:
415416
/// Should we process blocks of data returned by the storage in parallel
416417
/// even when the storage returned only one stream of data for reading?
417418
/// It is beneficial, for example, when you read from a file quickly,
@@ -422,7 +423,6 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
422423
/// useless).
423424
virtual bool parallelizeOutputAfterReading(ContextPtr) const { return !isSystemStorage(); }
424425

425-
public:
426426
/// Other version of read which adds reading step to query plan.
427427
/// Default implementation creates ReadFromStorageStep and uses usual read.
428428
/// Can be called after `shutdown`, but not after `drop`.

src/Storages/IStorageCluster.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ void IStorageCluster::read(
288288
return;
289289
}
290290

291+
updateConfigurationIfNeeded(context);
292+
291293
storage_snapshot->check(column_names);
292294

293295
const auto & settings = context->getSettingsRef();

src/Storages/IStorageCluster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class IStorageCluster : public IStorage
9797
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
9898
}
9999

100+
virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}
101+
100102
private:
101103
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);
102104

src/Storages/ObjectStorage/Local/Configuration.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co
8181
.ignore_non_existent_file = false};
8282
}
8383

84+
ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const
85+
{
86+
auto arguments = std::make_shared<ASTExpressionList>();
87+
88+
arguments->children.push_back(std::make_shared<ASTLiteral>(path.path));
89+
if (getFormat() != "auto")
90+
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
91+
if (getStructure() != "auto")
92+
arguments->children.push_back(std::make_shared<ASTLiteral>(getStructure()));
93+
if (getCompressionMethod() != "auto")
94+
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));
95+
96+
return arguments;
97+
}
98+
99+
84100
}

src/Storages/ObjectStorage/Local/Configuration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration
6060

6161
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }
6262

63+
ASTPtr createArgsWithAccessData() const override;
64+
6365
private:
6466
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
6567
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
9191
std::shared_ptr<DataLake::ICatalog> catalog,
9292
bool if_not_exists,
9393
bool is_datalake_query,
94+
bool is_table_function,
9495
bool lazy_init)
9596
: IStorageCluster(
9697
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
@@ -145,6 +146,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
145146
tryLogCurrentException(log_);
146147
}
147148

149+
// For tables need to update configuration on each read
150+
// because data can be changed after previous update
151+
update_configuration_on_read_write = !is_table_function;
152+
148153
ColumnsDescription columns{columns_in_table_or_function_definition};
149154
std::string sample_path;
150155
if (need_resolve_columns_or_format)
@@ -295,6 +300,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
295300
{"IcebergS3", "icebergS3"},
296301
{"IcebergAzure", "icebergAzure"},
297302
{"IcebergHDFS", "icebergHDFS"},
303+
{"IcebergLocal", "icebergLocal"},
298304
{"DeltaLake", "deltaLake"},
299305
{"DeltaLakeS3", "deltaLakeS3"},
300306
{"DeltaLakeAzure", "deltaLakeAzure"},
@@ -416,6 +422,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
416422
{"icebergS3", "icebergS3Cluster"},
417423
{"icebergAzure", "icebergAzureCluster"},
418424
{"icebergHDFS", "icebergHDFSCluster"},
425+
{"icebergLocal", "icebergLocalCluster"},
419426
{"deltaLake", "deltaLakeCluster"},
420427
{"deltaLakeS3", "deltaLakeS3Cluster"},
421428
{"deltaLakeAzure", "deltaLakeAzureCluster"},
@@ -741,6 +748,18 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr
741748
return configuration->getExternalMetadata();
742749
}
743750

751+
void StorageObjectStorageCluster::updateConfigurationIfNeeded(ContextPtr context)
752+
{
753+
if (update_configuration_on_read_write)
754+
{
755+
configuration->update(
756+
object_storage,
757+
context,
758+
/* if_not_updated_before */false,
759+
/* check_consistent_with_previous_metadata */false);
760+
}
761+
}
762+
744763
void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const
745764
{
746765
if (getClusterName(context).empty())
@@ -905,4 +924,74 @@ bool StorageObjectStorageCluster::prefersLargeBlocks() const
905924
return IStorageCluster::prefersLargeBlocks();
906925
}
907926

927+
bool StorageObjectStorageCluster::supportsPartitionBy() const
928+
{
929+
if (pure_storage)
930+
return pure_storage->supportsPartitionBy();
931+
return IStorageCluster::supportsPartitionBy();
932+
}
933+
934+
bool StorageObjectStorageCluster::supportsSubcolumns() const
935+
{
936+
if (pure_storage)
937+
return pure_storage->supportsSubcolumns();
938+
return IStorageCluster::supportsSubcolumns();
939+
}
940+
941+
bool StorageObjectStorageCluster::supportsDynamicSubcolumns() const
942+
{
943+
if (pure_storage)
944+
return pure_storage->supportsDynamicSubcolumns();
945+
return IStorageCluster::supportsDynamicSubcolumns();
946+
}
947+
948+
bool StorageObjectStorageCluster::supportsTrivialCountOptimization(const StorageSnapshotPtr & snapshot, ContextPtr context) const
949+
{
950+
if (pure_storage)
951+
return pure_storage->supportsTrivialCountOptimization(snapshot, context);
952+
return IStorageCluster::supportsTrivialCountOptimization(snapshot, context);
953+
}
954+
955+
bool StorageObjectStorageCluster::supportsPrewhere() const
956+
{
957+
if (pure_storage)
958+
return pure_storage->supportsPrewhere();
959+
return IStorageCluster::supportsPrewhere();
960+
}
961+
962+
bool StorageObjectStorageCluster::canMoveConditionsToPrewhere() const
963+
{
964+
if (pure_storage)
965+
return pure_storage->canMoveConditionsToPrewhere();
966+
return IStorageCluster::canMoveConditionsToPrewhere();
967+
}
968+
969+
std::optional<NameSet> StorageObjectStorageCluster::supportedPrewhereColumns() const
970+
{
971+
if (pure_storage)
972+
return pure_storage->supportedPrewhereColumns();
973+
return IStorageCluster::supportedPrewhereColumns();
974+
}
975+
976+
IStorageCluster::ColumnSizeByName StorageObjectStorageCluster::getColumnSizes() const
977+
{
978+
if (pure_storage)
979+
return pure_storage->getColumnSizes();
980+
return IStorageCluster::getColumnSizes();
981+
}
982+
983+
bool StorageObjectStorageCluster::parallelizeOutputAfterReading(ContextPtr context) const
984+
{
985+
if (pure_storage)
986+
return pure_storage->parallelizeOutputAfterReading(context);
987+
return IStorageCluster::parallelizeOutputAfterReading(context);
988+
}
989+
990+
bool StorageObjectStorageCluster::supportsDelete() const
991+
{
992+
if (pure_storage)
993+
return pure_storage->supportsDelete();
994+
return IStorageCluster::supportsDelete();
995+
}
996+
908997
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::shared_ptr<DataLake::ICatalog> catalog,
2626
bool if_not_exists,
2727
bool is_datalake_query,
28-
bool lazy_init = false);
28+
bool is_table_function,
29+
bool lazy_init);
2930

3031
std::string getName() const override;
3132

@@ -132,6 +133,24 @@ class StorageObjectStorageCluster : public IStorageCluster
132133
ContextPtr /* context */) override;
133134
bool prefersLargeBlocks() const override;
134135

136+
bool supportsPartitionBy() const override;
137+
138+
bool supportsSubcolumns() const override;
139+
140+
bool supportsDynamicSubcolumns() const override;
141+
142+
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override;
143+
144+
/// Things required for PREWHERE.
145+
bool supportsPrewhere() const override;
146+
bool canMoveConditionsToPrewhere() const override;
147+
std::optional<NameSet> supportedPrewhereColumns() const override;
148+
ColumnSizeByName getColumnSizes() const override;
149+
150+
bool parallelizeOutputAfterReading(ContextPtr context) const override;
151+
152+
bool supportsDelete() const override;
153+
135154
private:
136155
void updateQueryToSendIfNeeded(
137156
ASTPtr & query,
@@ -154,6 +173,8 @@ class StorageObjectStorageCluster : public IStorageCluster
154173
ContextPtr context,
155174
bool async_insert) override;
156175

176+
void updateConfigurationIfNeeded(ContextPtr context) override;
177+
157178
/*
158179
In case the table was created with `object_storage_cluster` setting,
159180
modify the AST query object so that it uses the table function implementation
@@ -176,6 +197,7 @@ class StorageObjectStorageCluster : public IStorageCluster
176197

177198
/// non-clustered storage to fall back on pure realisation if needed
178199
std::shared_ptr<StorageObjectStorage> pure_storage;
200+
bool update_configuration_on_read_write;
179201
};
180202

181203
}

src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition
155155
static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name;
156156
};
157157

158+
struct IcebergLocalClusterDefinition
159+
{
160+
static constexpr auto name = "icebergLocalCluster";
161+
static constexpr auto storage_engine_name = "IcebergLocalCluster";
162+
static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name;
163+
};
164+
158165
struct DeltaLakeClusterDefinition
159166
{
160167
static constexpr auto name = "deltaLakeCluster";

0 commit comments

Comments
 (0)