Skip to content

Commit 3842dfd

Browse files
authored
Merge branch 'antalya-25.8' into feature/antalya-25.8/iceberg_local_cluster
2 parents 54ce5bc + 91e6a82 commit 3842dfd

33 files changed

+293
-91
lines changed

src/Client/MultiplexedConnections.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
232232
void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFunctionReadTaskResponse & response)
233233
{
234234
std::lock_guard lock(cancel_mutex);
235-
if (cancelled)
235+
if (cancelled || !current_connection || !current_connection->isConnected())
236236
return;
237237
current_connection->sendClusterFunctionReadTaskResponse(response);
238238
}
@@ -241,7 +241,7 @@ void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFu
241241
void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
242242
{
243243
std::lock_guard lock(cancel_mutex);
244-
if (cancelled)
244+
if (cancelled || !current_connection || !current_connection->isConnected())
245245
return;
246246
current_connection->sendMergeTreeReadTaskResponse(response);
247247
}
@@ -527,9 +527,12 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
527527

528528
void MultiplexedConnections::invalidateReplica(ReplicaState & state)
529529
{
530+
Connection * old_connection = state.connection;
530531
state.connection = nullptr;
531532
state.pool_entry = IConnectionPool::Entry();
532533
--active_connection_count;
534+
if (current_connection == old_connection)
535+
current_connection = nullptr;
533536
}
534537

535538
void MultiplexedConnections::setAsyncCallback(AsyncCallback async_callback)

src/Common/ProfileEvents.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,11 @@
299299
M(IcebergTrivialCountOptimizationApplied, "Trivial count optimization applied while reading from Iceberg", ValueType::Number) \
300300
M(IcebergVersionHintUsed, "Number of times version-hint.text has been used.", ValueType::Number) \
301301
M(IcebergMinMaxIndexPrunedFiles, "Number of skipped files by using MinMax index in Iceberg", ValueType::Number) \
302+
M(IcebergAvroFileParsing, "Number of times avro metadata files have been parsed.", ValueType::Number) \
303+
M(IcebergAvroFileParsingMicroseconds, "Time spent for parsing avro metadata files for Iceberg tables.", ValueType::Microseconds) \
304+
M(IcebergJsonFileParsing, "Number of times json metadata files have been parsed.", ValueType::Number) \
305+
M(IcebergJsonFileParsingMicroseconds, "Time spent for parsing json metadata files for Iceberg tables.", ValueType::Microseconds) \
306+
\
302307
M(JoinBuildTableRowCount, "Total number of rows in the build table for a JOIN operation.", ValueType::Number) \
303308
M(JoinProbeTableRowCount, "Total number of rows in the probe table for a JOIN operation.", ValueType::Number) \
304309
M(JoinResultRowCount, "Total number of rows in the result of a JOIN operation.", ValueType::Number) \
@@ -580,7 +585,9 @@ The server successfully detected this situation and will download merged part fr
580585
M(S3DeleteObjects, "Number of S3 API DeleteObject(s) calls.", ValueType::Number) \
581586
M(S3CopyObject, "Number of S3 API CopyObject calls.", ValueType::Number) \
582587
M(S3ListObjects, "Number of S3 API ListObjects calls.", ValueType::Number) \
588+
M(S3ListObjectsMicroseconds, "Time of S3 API ListObjects execution.", ValueType::Microseconds) \
583589
M(S3HeadObject, "Number of S3 API HeadObject calls.", ValueType::Number) \
590+
M(S3HeadObjectMicroseconds, "Time of S3 API HeadObject execution.", ValueType::Microseconds) \
584591
M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.", ValueType::Number) \
585592
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.", ValueType::Number) \
586593
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.", ValueType::Number) \
@@ -634,6 +641,7 @@ The server successfully detected this situation and will download merged part fr
634641
M(AzureCopyObject, "Number of Azure blob storage API CopyObject calls", ValueType::Number) \
635642
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.", ValueType::Number) \
636643
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.", ValueType::Number) \
644+
M(AzureListObjectsMicroseconds, "Time of Azure blob storage API ListObjects execution.", ValueType::Microseconds) \
637645
M(AzureGetProperties, "Number of Azure blob storage API GetProperties calls.", ValueType::Number) \
638646
M(AzureCreateContainer, "Number of Azure blob storage API CreateContainer calls.", ValueType::Number) \
639647
\

src/Core/Settings.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6873,6 +6873,16 @@ Use roaring bitmap for iceberg positional deletes.
68736873
)", 0) \
68746874
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
68756875
Overwrite file if it already exists when exporting a merge tree part
6876+
)", 0) \
6877+
DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"(
6878+
Timezone for Iceberg timestamptz field.
6879+
6880+
Possible values:
6881+
6882+
- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
6883+
- `` (empty value) - use session timezone
6884+
6885+
Default value is `UTC`.
68766886
)", 0) \
68776887
\
68786888
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4747
{"allow_retries_in_cluster_requests", false, false, "New setting"},
4848
{"object_storage_remote_initiator", false, false, "New setting."},
4949
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
50+
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
5051
});
5152
addSettingsChanges(settings_changes_history, "25.8",
5253
{

src/Databases/DataLake/Common.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ std::vector<String> splitTypeArguments(const String & type_str)
6161
return args;
6262
}
6363

64-
DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix)
64+
DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix)
6565
{
6666
String name = trim(type_name);
6767

6868
if (name.starts_with("array<") && name.ends_with(">"))
6969
{
7070
String inner = name.substr(6, name.size() - 7);
71-
return std::make_shared<DB::DataTypeArray>(getType(inner, nullable));
71+
return std::make_shared<DB::DataTypeArray>(getType(inner, nullable, context));
7272
}
7373

7474
if (name.starts_with("map<") && name.ends_with(">"))
@@ -79,7 +79,7 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
7979
if (args.size() != 2)
8080
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Invalid data type {}", type_name);
8181

82-
return std::make_shared<DB::DataTypeMap>(getType(args[0], false), getType(args[1], nullable));
82+
return std::make_shared<DB::DataTypeMap>(getType(args[0], false, context), getType(args[1], nullable, context));
8383
}
8484

8585
if (name.starts_with("struct<") && name.ends_with(">"))
@@ -101,13 +101,13 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
101101
String full_field_name = prefix.empty() ? field_name : prefix + "." + field_name;
102102

103103
field_names.push_back(full_field_name);
104-
field_types.push_back(getType(field_type, nullable, full_field_name));
104+
field_types.push_back(getType(field_type, nullable, context, full_field_name));
105105
}
106106
return std::make_shared<DB::DataTypeTuple>(field_types, field_names);
107107
}
108108

109-
return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name))
110-
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name);
109+
return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context))
110+
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context);
111111
}
112112

113113
std::pair<std::string, std::string> parseTableName(const std::string & name)

src/Databases/DataLake/Common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Core/NamesAndTypes.h>
44
#include <Core/Types.h>
5+
#include <Interpreters/Context_fwd.h>
56

67
namespace DataLake
78
{
@@ -10,7 +11,7 @@ String trim(const String & str);
1011

1112
std::vector<String> splitTypeArguments(const String & type_str);
1213

13-
DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix = "");
14+
DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix = "");
1415

1516
/// Parse a string, containing at least one dot, into a two substrings:
1617
/// A.B.C.D.E -> A.B.C.D and E, where

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
326326

327327
auto [namespace_name, table_name] = DataLake::parseTableName(name);
328328

329-
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
329+
if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
330330
return nullptr;
331331

332332
if (ignore_if_not_iceberg && !table_metadata.isDefaultReadableTable())
@@ -633,15 +633,15 @@ ASTPtr DatabaseDataLake::getCreateDatabaseQuery() const
633633

634634
ASTPtr DatabaseDataLake::getCreateTableQueryImpl(
635635
const String & name,
636-
ContextPtr /* context_ */,
636+
ContextPtr context_,
637637
bool /* throw_on_error */) const
638638
{
639639
auto catalog = getCatalog();
640640
auto table_metadata = DataLake::TableMetadata().withLocation().withSchema();
641641

642642
const auto [namespace_name, table_name] = DataLake::parseTableName(name);
643643

644-
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
644+
if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
645645
{
646646
throw Exception(
647647
ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, "Table `{}` doesn't exist", name);

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ bool GlueCatalog::existsTable(const std::string & database_name, const std::stri
276276
bool GlueCatalog::tryGetTableMetadata(
277277
const std::string & database_name,
278278
const std::string & table_name,
279+
DB::ContextPtr /* context_ */,
279280
TableMetadata & result) const
280281
{
281282
Aws::Glue::Model::GetTableRequest request;
@@ -372,7 +373,7 @@ bool GlueCatalog::tryGetTableMetadata(
372373
column_type = "timestamptz";
373374
}
374375

375-
schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
376+
schema.push_back({column.GetName(), getType(column_type, can_be_nullable, getContext())});
376377
}
377378
result.setSchema(schema);
378379
}
@@ -394,9 +395,10 @@ bool GlueCatalog::tryGetTableMetadata(
394395
void GlueCatalog::getTableMetadata(
395396
const std::string & database_name,
396397
const std::string & table_name,
398+
DB::ContextPtr context_,
397399
TableMetadata & result) const
398400
{
399-
if (!tryGetTableMetadata(database_name, table_name, result))
401+
if (!tryGetTableMetadata(database_name, table_name, context_, result))
400402
{
401403
throw DB::Exception(
402404
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,

src/Databases/DataLake/GlueCatalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
4040
void getTableMetadata(
4141
const std::string & database_name,
4242
const std::string & table_name,
43+
DB::ContextPtr context_,
4344
TableMetadata & result) const override;
4445

4546
bool tryGetTableMetadata(
4647
const std::string & database_name,
4748
const std::string & table_name,
49+
DB::ContextPtr context_,
4850
TableMetadata & result) const override;
4951

5052
std::optional<StorageType> getStorageType() const override

src/Databases/DataLake/HiveCatalog.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,21 @@ bool HiveCatalog::existsTable(const std::string & namespace_name, const std::str
9696
return true;
9797
}
9898

99-
void HiveCatalog::getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
99+
void HiveCatalog::getTableMetadata(
100+
const std::string & namespace_name,
101+
const std::string & table_name,
102+
DB::ContextPtr context_,
103+
TableMetadata & result) const
100104
{
101-
if (!tryGetTableMetadata(namespace_name, table_name, result))
105+
if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
102106
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
103107
}
104108

105-
bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
109+
bool HiveCatalog::tryGetTableMetadata(
110+
const std::string & namespace_name,
111+
const std::string & table_name,
112+
DB::ContextPtr context_,
113+
TableMetadata & result) const
106114
{
107115
Apache::Hadoop::Hive::Table table;
108116

@@ -130,7 +138,7 @@ bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const
130138
auto columns = table.sd.cols;
131139
for (const auto & column : columns)
132140
{
133-
schema.push_back({column.name, getType(column.type, true)});
141+
schema.push_back({column.name, getType(column.type, true, context_)});
134142
}
135143
result.setSchema(schema);
136144
}

0 commit comments

Comments
 (0)