Skip to content

Commit 58eb474

Browse files
committed
timezone_for_iceberg_timestamptz setting
1 parent edfc8d0 commit 58eb474

File tree

10 files changed

+45
-17
lines changed

10 files changed

+45
-17
lines changed

src/Core/Settings.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6873,6 +6873,16 @@ Use roaring bitmap for iceberg positional deletes.
68736873
)", 0) \
68746874
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
68756875
Overwrite file if it already exists when exporting a merge tree part
6876+
)", 0) \
6877+
DECLARE(Timezone, timezone_for_iceberg_timestamptz, "UTC", R"(
6878+
Timezone for Iceberg timestamptz field.
6879+
6880+
Possible values:
6881+
6882+
- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
6883+
- `` (empty value) - use session timezone
6884+
6885+
Default value is `UTC`.
68766886
)", 0) \
68776887
\
68786888
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4747
{"allow_retries_in_cluster_requests", false, false, "New setting"},
4848
{"object_storage_remote_initiator", false, false, "New setting."},
4949
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
50+
{"timezone_for_iceberg_timestamptz", "UTC", "UTC", "New setting."}
5051
});
5152
addSettingsChanges(settings_changes_history, "25.8",
5253
{

src/Databases/DataLake/Common.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ std::vector<String> splitTypeArguments(const String & type_str)
6161
return args;
6262
}
6363

64-
DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix)
64+
DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix)
6565
{
6666
String name = trim(type_name);
6767

6868
if (name.starts_with("array<") && name.ends_with(">"))
6969
{
7070
String inner = name.substr(6, name.size() - 7);
71-
return std::make_shared<DB::DataTypeArray>(getType(inner, nullable));
71+
return std::make_shared<DB::DataTypeArray>(getType(inner, nullable, context));
7272
}
7373

7474
if (name.starts_with("map<") && name.ends_with(">"))
@@ -79,7 +79,7 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
7979
if (args.size() != 2)
8080
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Invalid data type {}", type_name);
8181

82-
return std::make_shared<DB::DataTypeMap>(getType(args[0], false), getType(args[1], nullable));
82+
return std::make_shared<DB::DataTypeMap>(getType(args[0], false, context), getType(args[1], nullable, context));
8383
}
8484

8585
if (name.starts_with("struct<") && name.ends_with(">"))
@@ -101,13 +101,13 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
101101
String full_field_name = prefix.empty() ? field_name : prefix + "." + field_name;
102102

103103
field_names.push_back(full_field_name);
104-
field_types.push_back(getType(field_type, nullable, full_field_name));
104+
field_types.push_back(getType(field_type, nullable, context, full_field_name));
105105
}
106106
return std::make_shared<DB::DataTypeTuple>(field_types, field_names);
107107
}
108108

109-
return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name))
110-
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name);
109+
return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context))
110+
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context);
111111
}
112112

113113
std::pair<std::string, std::string> parseTableName(const std::string & name)

src/Databases/DataLake/Common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Core/NamesAndTypes.h>
44
#include <Core/Types.h>
5+
#include <Interpreters/Context_fwd.h>
56

67
namespace DataLake
78
{
@@ -10,7 +11,7 @@ String trim(const String & str);
1011

1112
std::vector<String> splitTypeArguments(const String & type_str);
1213

13-
DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix = "");
14+
DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix = "");
1415

1516
/// Parse a string, containing at least one dot, into a two substrings:
1617
/// A.B.C.D.E -> A.B.C.D and E, where

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ bool GlueCatalog::tryGetTableMetadata(
372372
column_type = "timestamptz";
373373
}
374374

375-
schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
375+
schema.push_back({column.GetName(), getType(column_type, can_be_nullable, getContext())});
376376
}
377377
result.setSchema(schema);
378378
}

src/Databases/DataLake/HiveCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const
130130
auto columns = table.sd.cols;
131131
for (const auto & column : columns)
132132
{
133-
schema.push_back({column.name, getType(column.type, true)});
133+
schema.push_back({column.name, getType(column.type, true, getContext())});
134134
}
135135
result.setSchema(schema);
136136
}

src/Databases/DataLake/RestCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ bool RestCatalog::getTableMetadataImpl(
623623
if (result.requiresSchema())
624624
{
625625
// int format_version = metadata_object->getValue<int>("format-version");
626-
auto schema_processor = DB::Iceberg::IcebergSchemaProcessor();
626+
auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(getContext());
627627
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
628628
auto schema = schema_processor.getClickhouseTableSchemaById(id);
629629
result.setSchema(*schema);

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ IcebergMetadata::IcebergMetadata(
135135
: object_storage(std::move(object_storage_))
136136
, configuration(std::move(configuration_))
137137
, persistent_components(PersistentTableComponents{
138-
.schema_processor = std::make_shared<IcebergSchemaProcessor>(),
138+
.schema_processor = std::make_shared<IcebergSchemaProcessor>(context_),
139139
.metadata_cache = cache_ptr,
140140
.format_version = format_version_,
141141
.table_location = metadata_object_->getValue<String>(f_location)

src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <base/types.h>
1818
#include <Common/SharedLockGuard.h>
1919
#include <base/scope_guard.h>
20+
#include <Core/Settings.h>
2021

2122
#include <DataTypes/DataTypeArray.h>
2223
#include <DataTypes/DataTypeDate.h>
@@ -32,6 +33,8 @@
3233
#include <DataTypes/NestedUtils.h>
3334
#include <DataTypes/DataTypesNumber.h>
3435
#include <Formats/FormatFactory.h>
36+
#include <Interpreters/Context_fwd.h>
37+
#include <Interpreters/Context.h>
3538

3639
#include <IO/ReadHelpers.h>
3740

@@ -46,6 +49,10 @@ extern const int LOGICAL_ERROR;
4649
extern const int BAD_ARGUMENTS;
4750
}
4851

52+
namespace Setting
53+
{
54+
extern const SettingsTimezone timezone_for_iceberg_timestamptz;
55+
}
4956

5057
namespace
5158
{
@@ -221,7 +228,7 @@ NamesAndTypesList IcebergSchemaProcessor::tryGetFieldsCharacteristics(Int32 sche
221228
return fields;
222229
}
223230

224-
DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name)
231+
DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, ContextPtr context_)
225232
{
226233
if (type_name == f_boolean)
227234
return DataTypeFactory::instance().get("Bool");
@@ -240,7 +247,10 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name)
240247
if (type_name == f_timestamp)
241248
return std::make_shared<DataTypeDateTime64>(6);
242249
if (type_name == f_timestamptz)
243-
return std::make_shared<DataTypeDateTime64>(6, "UTC");
250+
{
251+
std::string timezone = context_->getSettingsRef()[Setting::timezone_for_iceberg_timestamptz];
252+
return std::make_shared<DataTypeDateTime64>(6, timezone);
253+
}
244254
if (type_name == f_string || type_name == f_binary)
245255
return std::make_shared<DataTypeString>();
246256
if (type_name == f_uuid)
@@ -323,7 +333,11 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr &
323333
}
324334

325335
DataTypePtr IcebergSchemaProcessor::getFieldType(
326-
const Poco::JSON::Object::Ptr & field, const String & type_key, bool required, String & current_full_name, bool is_subfield_of_root)
336+
const Poco::JSON::Object::Ptr & field,
337+
const String & type_key,
338+
bool required,
339+
String & current_full_name,
340+
bool is_subfield_of_root)
327341
{
328342
if (field->isObject(type_key))
329343
return getComplexTypeFromObject(field->getObject(type_key), current_full_name, is_subfield_of_root);
@@ -332,7 +346,7 @@ DataTypePtr IcebergSchemaProcessor::getFieldType(
332346
if (type.isString())
333347
{
334348
const String & type_name = type.extract<String>();
335-
auto data_type = getSimpleType(type_name);
349+
auto data_type = getSimpleType(type_name, getContext());
336350
return required ? data_type : makeNullable(data_type);
337351
}
338352

src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,15 @@ ColumnMapperPtr createColumnMapper(Poco::JSON::Object::Ptr schema_object);
7575
* }
7676
* }
7777
*/
78-
class IcebergSchemaProcessor
78+
class IcebergSchemaProcessor : private WithContext
7979
{
8080
static std::string default_link;
8181

8282
using Node = ActionsDAG::Node;
8383

8484
public:
85+
IcebergSchemaProcessor(ContextPtr context_) : WithContext(context_) {}
86+
8587
void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr);
8688
std::shared_ptr<NamesAndTypesList> getClickhouseTableSchemaById(Int32 id);
8789
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
@@ -92,7 +94,7 @@ class IcebergSchemaProcessor
9294
Poco::JSON::Object::Ptr getIcebergTableSchemaById(Int32 id) const;
9395
bool hasClickhouseTableSchemaById(Int32 id) const;
9496

95-
static DataTypePtr getSimpleType(const String & type_name);
97+
static DataTypePtr getSimpleType(const String & type_name, ContextPtr context_);
9698

9799
static std::unordered_map<String, Int64> traverseSchema(Poco::JSON::Array::Ptr schema);
98100

0 commit comments

Comments
 (0)