Skip to content

Commit 5975d07

Browse files
authored
Merge pull request #992 from Altinity/backports/25.6/83132_timestampTZ_in_glue
Antalya 25.6: Backport of ClickHouse#83132 - Support TimestampTZ in Glue catalog
2 parents 55a4f67 + 18dd22d commit 5975d07

File tree

5 files changed

+258
-34
lines changed

5 files changed

+258
-34
lines changed

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,10 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
154154
case DB::DatabaseDataLakeCatalogType::GLUE:
155155
{
156156
catalog_impl = std::make_shared<DataLake::GlueCatalog>(
157-
settings[DatabaseDataLakeSetting::aws_access_key_id].value,
158-
settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
159-
settings[DatabaseDataLakeSetting::region].value,
160157
url,
161-
Context::getGlobalContextInstance());
158+
Context::getGlobalContextInstance(),
159+
settings,
160+
table_engine_definition);
162161
break;
163162
}
164163
case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE:

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 140 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include <Databases/DataLake/GlueCatalog.h>
2+
#include <Poco/JSON/Object.h>
3+
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
24

35
#if USE_AWS_S3 && USE_AVRO
46

@@ -8,6 +10,7 @@
810
#include <aws/glue/model/GetDatabasesRequest.h>
911

1012
#include <Common/Exception.h>
13+
#include <Common/CurrentMetrics.h>
1114
#include <Core/Settings.h>
1215
#include <Interpreters/Context.h>
1316

@@ -29,12 +32,19 @@
2932
#include <IO/S3/Credentials.h>
3033
#include <IO/S3/Client.h>
3134
#include <IO/S3Settings.h>
35+
#include <IO/ReadBufferFromString.h>
36+
#include <IO/ReadHelpers.h>
3237
#include <Common/ProxyConfigurationResolverProvider.h>
3338
#include <Databases/DataLake/Common.h>
3439
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
40+
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
41+
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
42+
#include <Parsers/ASTCreateQuery.h>
43+
#include <Parsers/ASTFunction.h>
3544

3645
namespace DB::ErrorCodes
3746
{
47+
extern const int BAD_ARGUMENTS;
3848
extern const int DATALAKE_DATABASE_ERROR;
3949
}
4050

@@ -54,20 +64,36 @@ namespace DB::StorageObjectStorageSetting
5464
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
5565
}
5666

67+
namespace DB::DatabaseDataLakeSetting
68+
{
69+
extern const DatabaseDataLakeSettingsString storage_endpoint;
70+
extern const DatabaseDataLakeSettingsString aws_access_key_id;
71+
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
72+
extern const DatabaseDataLakeSettingsString region;
73+
}
74+
75+
namespace CurrentMetrics
76+
{
77+
extern const Metric MarkCacheBytes;
78+
extern const Metric MarkCacheFiles;
79+
}
80+
5781
namespace DataLake
5882
{
5983

6084
GlueCatalog::GlueCatalog(
61-
const String & access_key_id,
62-
const String & secret_access_key,
63-
const String & region_,
6485
const String & endpoint,
65-
DB::ContextPtr context_)
86+
DB::ContextPtr context_,
87+
const DB::DatabaseDataLakeSettings & settings_,
88+
DB::ASTPtr table_engine_definition_)
6689
: ICatalog("")
6790
, DB::WithContext(context_)
68-
, log(getLogger("GlueCatalog(" + region_ + ")"))
69-
, credentials(access_key_id, secret_access_key)
70-
, region(region_)
91+
, log(getLogger("GlueCatalog(" + settings_[DB::DatabaseDataLakeSetting::region].value + ")"))
92+
, credentials(settings_[DB::DatabaseDataLakeSetting::aws_access_key_id].value, settings_[DB::DatabaseDataLakeSetting::aws_secret_access_key].value)
93+
, region(settings_[DB::DatabaseDataLakeSetting::region].value)
94+
, settings(settings_)
95+
, table_engine_definition(table_engine_definition_)
96+
, metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024)
7197
{
7298
DB::S3::CredentialsConfiguration creds_config;
7399
creds_config.use_environment_credentials = true;
@@ -271,6 +297,27 @@ bool GlueCatalog::tryGetTableMetadata(
271297
database_name + "." + table_name, message_part, "ICEBERG"));
272298
}
273299

300+
if (result.requiresCredentials())
301+
setCredentials(result);
302+
303+
auto setup_specific_properties = [&]
304+
{
305+
const auto & table_params = table_outcome.GetParameters();
306+
if (table_params.contains("metadata_location"))
307+
{
308+
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")});
309+
}
310+
else
311+
{
312+
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
313+
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
314+
database_name + "." + table_name));
315+
}
316+
};
317+
318+
if (result.requiresDataLakeSpecificProperties())
319+
setup_specific_properties();
320+
274321
if (result.requiresSchema())
275322
{
276323
DB::NamesAndTypesList schema;
@@ -286,27 +333,18 @@ bool GlueCatalog::tryGetTableMetadata(
286333
if (column_params.contains("iceberg.field.current") && column_params.at("iceberg.field.current") == "false")
287334
continue;
288335

289-
schema.push_back({column.GetName(), getType(column.GetType(), can_be_nullable)});
290-
}
291-
result.setSchema(schema);
292-
}
336+
String column_type = column.GetType();
337+
if (column_type == "timestamp")
338+
{
339+
if (!result.requiresDataLakeSpecificProperties())
340+
setup_specific_properties();
341+
if (classifyTimestampTZ(column.GetName(), result))
342+
column_type = "timestamptz";
343+
}
293344

294-
if (result.requiresCredentials())
295-
setCredentials(result);
296-
297-
if (result.requiresDataLakeSpecificProperties())
298-
{
299-
const auto & table_params = table_outcome.GetParameters();
300-
if (table_params.contains("metadata_location"))
301-
{
302-
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")});
303-
}
304-
else
305-
{
306-
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
307-
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
308-
database_name + "." + table_name));
345+
schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
309346
}
347+
result.setSchema(schema);
310348
}
311349
}
312350
else
@@ -364,6 +402,82 @@ bool GlueCatalog::empty() const
364402
return true;
365403
}
366404

405+
bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const
406+
{
407+
String metadata_path;
408+
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
409+
table_specific_properties.has_value())
410+
{
411+
metadata_path = table_specific_properties->iceberg_metadata_file_location;
412+
if (metadata_path.starts_with("s3:/"))
413+
metadata_path = metadata_path.substr(5);
414+
415+
// Delete bucket
416+
std::size_t pos = metadata_path.find('/');
417+
if (pos != std::string::npos)
418+
metadata_path = metadata_path.substr(pos + 1);
419+
}
420+
else
421+
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined");
422+
423+
if (!metadata_objects.get(metadata_path))
424+
{
425+
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
426+
DB::ASTs args = storage->engine->arguments->children;
427+
428+
auto table_endpoint = settings[DB::DatabaseDataLakeSetting::storage_endpoint].value;
429+
if (args.empty())
430+
args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint));
431+
else
432+
args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint);
433+
434+
if (args.size() == 1 && table_metadata.hasStorageCredentials())
435+
{
436+
auto storage_credentials = table_metadata.getStorageCredentials();
437+
if (storage_credentials)
438+
storage_credentials->addCredentialsToEngineArgs(args);
439+
}
440+
441+
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
442+
storage_settings->loadFromSettingsChanges(settings.allChanged());
443+
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
444+
DB::StorageObjectStorage::Configuration::initialize(*configuration, args, getContext(), false);
445+
446+
auto object_storage = configuration->createObjectStorage(getContext(), true);
447+
const auto & read_settings = getContext()->getReadSettings();
448+
449+
DB::StoredObject metadata_stored_object(metadata_path);
450+
auto read_buf = object_storage->readObject(metadata_stored_object, read_settings);
451+
String metadata_file;
452+
readString(metadata_file, *read_buf);
453+
454+
Poco::JSON::Parser parser;
455+
Poco::Dynamic::Var result = parser.parse(metadata_file);
456+
auto metadata_object = result.extract<Poco::JSON::Object::Ptr>();
457+
metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
458+
}
459+
auto metadata_object = *metadata_objects.get(metadata_path);
460+
auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id");
461+
auto schemas = metadata_object->getArray(Iceberg::f_schemas);
462+
for (size_t i = 0; i < schemas->size(); ++i)
463+
{
464+
auto schema = schemas->getObject(static_cast<UInt32>(i));
465+
if (schema->getValue<Int64>("schema-id") == current_schema_id)
466+
{
467+
auto fields = schema->getArray(Iceberg::f_fields);
468+
for (size_t j = 0; j < fields->size(); ++j)
469+
{
470+
auto field = fields->getObject(static_cast<UInt32>(j));
471+
if (field->getValue<String>(Iceberg::f_name) == column_name)
472+
return field->getValue<String>(Iceberg::f_type) == Iceberg::f_timestamptz;
473+
}
474+
}
475+
}
476+
477+
return false;
478+
}
479+
480+
367481
}
368482

369483
#endif

src/Databases/DataLake/GlueCatalog.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#include <Databases/DataLake/ICatalog.h>
88
#include <Interpreters/Context_fwd.h>
99
#include <Poco/JSON/Object.h>
10+
#include <Poco/LRUCache.h>
11+
12+
#include <Common/CacheBase.h>
13+
#include <Databases/DataLake/DatabaseDataLakeSettings.h>
1014

1115
namespace Aws::Glue
1216
{
@@ -20,11 +24,10 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
2024
{
2125
public:
2226
GlueCatalog(
23-
const String & access_key_id,
24-
const String & secret_access_key,
25-
const String & region,
2627
const String & endpoint,
27-
DB::ContextPtr context_);
28+
DB::ContextPtr context_,
29+
const DB::DatabaseDataLakeSettings & settings_,
30+
DB::ASTPtr table_engine_definition_);
2831

2932
~GlueCatalog() override;
3033

@@ -60,10 +63,18 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
6063
const LoggerPtr log;
6164
Aws::Auth::AWSCredentials credentials;
6265
std::string region;
66+
DB::DatabaseDataLakeSettings settings;
67+
DB::ASTPtr table_engine_definition;
6368

6469
DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const;
6570
DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const;
6671
void setCredentials(TableMetadata & metadata) const;
72+
73+
/// The Glue catalog does not store detailed information about the types of timestamp columns, such as whether the column is timestamp or timestamptz.
74+
/// This method allows to clarify the actual type of the timestamp column.
75+
bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const;
76+
77+
mutable DB::CacheBase<String, Poco::JSON::Object::Ptr> metadata_objects;
6778
};
6879

6980
}

tests/integration/test_database_glue/test.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pyarrow as pa
88
import pytest
99
import urllib3
10+
import pytz
1011
from datetime import datetime, timedelta
1112
from minio import Minio
1213
from pyiceberg.catalog import load_catalog
@@ -22,6 +23,7 @@
2223
StringType,
2324
StructType,
2425
TimestampType,
26+
TimestamptzType,
2527
MapType,
2628
DecimalType,
2729
)
@@ -429,3 +431,50 @@ def test_non_existing_tables(started_cluster):
429431
except Exception as e:
430432
assert "DB::Exception: Table" in str(e)
431433
assert "doesn't exist" in str(e)
434+
435+
436+
def test_timestamps(started_cluster):
437+
node = started_cluster.instances["node1"]
438+
439+
test_ref = f"test_list_tables_{uuid.uuid4()}"
440+
table_name = f"{test_ref}_table"
441+
root_namespace = f"{test_ref}_namespace"
442+
443+
catalog = load_catalog_impl(started_cluster)
444+
catalog.create_namespace(root_namespace)
445+
446+
schema = Schema(
447+
NestedField(
448+
field_id=1, name="timestamp", field_type=TimestampType(), required=False
449+
),
450+
NestedField(
451+
field_id=2,
452+
name="timestamptz",
453+
field_type=TimestamptzType(),
454+
required=False,
455+
),
456+
)
457+
table = create_table(catalog, root_namespace, table_name, schema)
458+
459+
create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME)
460+
461+
data = [
462+
{
463+
"timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0),
464+
"timestamptz": datetime(
465+
2024,
466+
1,
467+
1,
468+
hour=12,
469+
minute=0,
470+
second=0,
471+
microsecond=0,
472+
tzinfo=pytz.timezone("UTC"),
473+
)
474+
}
475+
]
476+
df = pa.Table.from_pylist(data)
477+
table.append(df)
478+
479+
assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-glue/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
480+
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n"

0 commit comments

Comments
 (0)