Skip to content

Commit b2cf315

Browse files
scanhex12Enmk
authored andcommitted
Merge pull request ClickHouse#83132 from scanhex12/timestamps_glue
Support `TimestampTZ` in Glue catalog
1 parent d4f9aa1 commit b2cf315

File tree

5 files changed

+258
-29
lines changed

5 files changed

+258
-29
lines changed

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,10 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
153153
case DB::DatabaseDataLakeCatalogType::GLUE:
154154
{
155155
catalog_impl = std::make_shared<DataLake::GlueCatalog>(
156-
settings[DatabaseDataLakeSetting::aws_access_key_id].value,
157-
settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
158-
settings[DatabaseDataLakeSetting::region].value,
159156
url,
160-
Context::getGlobalContextInstance());
157+
Context::getGlobalContextInstance(),
158+
settings,
159+
table_engine_definition);
161160
break;
162161
}
163162
case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE:

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 142 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include <Databases/DataLake/GlueCatalog.h>
2+
#include <Poco/JSON/Object.h>
3+
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
24

35
#if USE_AWS_S3 && USE_AVRO
46

@@ -8,6 +10,7 @@
810
#include <aws/glue/model/GetDatabasesRequest.h>
911

1012
#include <Common/Exception.h>
13+
#include <Common/CurrentMetrics.h>
1114
#include <Core/Settings.h>
1215
#include <Interpreters/Context.h>
1316

@@ -29,12 +32,19 @@
2932
#include <IO/S3/Credentials.h>
3033
#include <IO/S3/Client.h>
3134
#include <IO/S3Settings.h>
35+
#include <IO/ReadBufferFromString.h>
36+
#include <IO/ReadHelpers.h>
3237
#include <Common/ProxyConfigurationResolverProvider.h>
3338
#include <Databases/DataLake/Common.h>
3439
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
40+
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
41+
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
42+
#include <Parsers/ASTCreateQuery.h>
43+
#include <Parsers/ASTFunction.h>
3544

3645
namespace DB::ErrorCodes
3746
{
47+
extern const int BAD_ARGUMENTS;
3848
extern const int DATALAKE_DATABASE_ERROR;
3949
}
4050

@@ -54,20 +64,36 @@ namespace DB::StorageObjectStorageSetting
5464
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
5565
}
5666

67+
namespace DB::DatabaseDataLakeSetting
68+
{
69+
extern const DatabaseDataLakeSettingsString storage_endpoint;
70+
extern const DatabaseDataLakeSettingsString aws_access_key_id;
71+
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
72+
extern const DatabaseDataLakeSettingsString region;
73+
}
74+
75+
namespace CurrentMetrics
76+
{
77+
extern const Metric MarkCacheBytes;
78+
extern const Metric MarkCacheFiles;
79+
}
80+
5781
namespace DataLake
5882
{
5983

6084
GlueCatalog::GlueCatalog(
61-
const String & access_key_id,
62-
const String & secret_access_key,
63-
const String & region_,
6485
const String & endpoint,
65-
DB::ContextPtr context_)
86+
DB::ContextPtr context_,
87+
const DB::DatabaseDataLakeSettings & settings_,
88+
DB::ASTPtr table_engine_definition_)
6689
: ICatalog("")
6790
, DB::WithContext(context_)
68-
, log(getLogger("GlueCatalog(" + region_ + ")"))
69-
, credentials(access_key_id, secret_access_key)
70-
, region(region_)
91+
, log(getLogger("GlueCatalog(" + settings_[DB::DatabaseDataLakeSetting::region].value + ")"))
92+
, credentials(settings_[DB::DatabaseDataLakeSetting::aws_access_key_id].value, settings_[DB::DatabaseDataLakeSetting::aws_secret_access_key].value)
93+
, region(settings_[DB::DatabaseDataLakeSetting::region].value)
94+
, settings(settings_)
95+
, table_engine_definition(table_engine_definition_)
96+
, metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024)
7197
{
7298
DB::S3::CredentialsConfiguration creds_config;
7399
creds_config.use_environment_credentials = true;
@@ -280,23 +306,10 @@ void GlueCatalog::getTableMetadata(
280306
database_name + "." + table_name, message_part, "ICEBERG"));
281307
}
282308

283-
if (result.requiresSchema())
284-
{
285-
DB::NamesAndTypesList schema;
286-
auto columns = table_outcome.GetStorageDescriptor().GetColumns();
287-
for (const auto & column : columns)
288-
{
289-
const auto column_params = column.GetParameters();
290-
bool can_be_nullable = column_params.contains("iceberg.field.optional") && column_params.at("iceberg.field.optional") == "true";
291-
schema.push_back({column.GetName(), getType(column.GetType(), can_be_nullable)});
292-
}
293-
result.setSchema(schema);
294-
}
295-
296309
if (result.requiresCredentials())
297310
setCredentials(result);
298311

299-
if (result.requiresDataLakeSpecificProperties())
312+
auto setup_specific_properties = [&]
300313
{
301314
const auto & table_params = table_outcome.GetParameters();
302315
if (table_params.contains("metadata_location"))
@@ -309,6 +322,38 @@ void GlueCatalog::getTableMetadata(
309322
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
310323
database_name + "." + table_name));
311324
}
325+
};
326+
327+
if (result.requiresDataLakeSpecificProperties())
328+
setup_specific_properties();
329+
330+
if (result.requiresSchema())
331+
{
332+
DB::NamesAndTypesList schema;
333+
auto columns = table_outcome.GetStorageDescriptor().GetColumns();
334+
for (const auto & column : columns)
335+
{
336+
const auto column_params = column.GetParameters();
337+
bool can_be_nullable = column_params.contains("iceberg.field.optional") && column_params.at("iceberg.field.optional") == "true";
338+
339+
/// Skip field if it's not "current" (for example Renamed). No idea how someone can utilize "non current fields" but for some reason
340+
/// they are returned by Glue API. So if you do "RENAME COLUMN a to new_a" glue will return two fields: a and new_a.
341+
/// And a will be marked as "non current" field.
342+
if (column_params.contains("iceberg.field.current") && column_params.at("iceberg.field.current") == "false")
343+
continue;
344+
345+
String column_type = column.GetType();
346+
if (column_type == "timestamp")
347+
{
348+
if (!result.requiresDataLakeSpecificProperties())
349+
setup_specific_properties();
350+
if (classifyTimestampTZ(column.GetName(), result))
351+
column_type = "timestamptz";
352+
}
353+
354+
schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
355+
}
356+
result.setSchema(schema);
312357
}
313358
}
314359
else
@@ -347,6 +392,82 @@ bool GlueCatalog::empty() const
347392
return true;
348393
}
349394

395+
bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const
396+
{
397+
String metadata_path;
398+
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
399+
table_specific_properties.has_value())
400+
{
401+
metadata_path = table_specific_properties->iceberg_metadata_file_location;
402+
if (metadata_path.starts_with("s3:/"))
403+
metadata_path = metadata_path.substr(5);
404+
405+
// Delete bucket
406+
std::size_t pos = metadata_path.find('/');
407+
if (pos != std::string::npos)
408+
metadata_path = metadata_path.substr(pos + 1);
409+
}
410+
else
411+
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined");
412+
413+
if (!metadata_objects.get(metadata_path))
414+
{
415+
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
416+
DB::ASTs args = storage->engine->arguments->children;
417+
418+
auto table_endpoint = settings[DB::DatabaseDataLakeSetting::storage_endpoint].value;
419+
if (args.empty())
420+
args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint));
421+
else
422+
args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint);
423+
424+
if (args.size() == 1 && table_metadata.hasStorageCredentials())
425+
{
426+
auto storage_credentials = table_metadata.getStorageCredentials();
427+
if (storage_credentials)
428+
storage_credentials->addCredentialsToEngineArgs(args);
429+
}
430+
431+
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
432+
storage_settings->loadFromSettingsChanges(settings.allChanged());
433+
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
434+
DB::StorageObjectStorage::Configuration::initialize(*configuration, args, getContext(), false);
435+
436+
auto object_storage = configuration->createObjectStorage(getContext(), true);
437+
const auto & read_settings = getContext()->getReadSettings();
438+
439+
DB::StoredObject metadata_stored_object(metadata_path);
440+
auto read_buf = object_storage->readObject(metadata_stored_object, read_settings);
441+
String metadata_file;
442+
readString(metadata_file, *read_buf);
443+
444+
Poco::JSON::Parser parser;
445+
Poco::Dynamic::Var result = parser.parse(metadata_file);
446+
auto metadata_object = result.extract<Poco::JSON::Object::Ptr>();
447+
metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
448+
}
449+
auto metadata_object = *metadata_objects.get(metadata_path);
450+
auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id");
451+
auto schemas = metadata_object->getArray(Iceberg::f_schemas);
452+
for (size_t i = 0; i < schemas->size(); ++i)
453+
{
454+
auto schema = schemas->getObject(static_cast<UInt32>(i));
455+
if (schema->getValue<Int64>("schema-id") == current_schema_id)
456+
{
457+
auto fields = schema->getArray(Iceberg::f_fields);
458+
for (size_t j = 0; j < fields->size(); ++j)
459+
{
460+
auto field = fields->getObject(static_cast<UInt32>(j));
461+
if (field->getValue<String>(Iceberg::f_name) == column_name)
462+
return field->getValue<String>(Iceberg::f_type) == Iceberg::f_timestamptz;
463+
}
464+
}
465+
}
466+
467+
return false;
468+
}
469+
470+
350471
}
351472

352473
#endif

src/Databases/DataLake/GlueCatalog.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#include <Databases/DataLake/ICatalog.h>
88
#include <Interpreters/Context_fwd.h>
99
#include <Poco/JSON/Object.h>
10+
#include <Poco/LRUCache.h>
11+
12+
#include <Common/CacheBase.h>
13+
#include <Databases/DataLake/DatabaseDataLakeSettings.h>
1014

1115
namespace Aws::Glue
1216
{
@@ -20,11 +24,10 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
2024
{
2125
public:
2226
GlueCatalog(
23-
const String & access_key_id,
24-
const String & secret_access_key,
25-
const String & region,
2627
const String & endpoint,
27-
DB::ContextPtr context_);
28+
DB::ContextPtr context_,
29+
const DB::DatabaseDataLakeSettings & settings_,
30+
DB::ASTPtr table_engine_definition_);
2831

2932
~GlueCatalog() override;
3033

@@ -60,10 +63,18 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
6063
const LoggerPtr log;
6164
Aws::Auth::AWSCredentials credentials;
6265
std::string region;
66+
DB::DatabaseDataLakeSettings settings;
67+
DB::ASTPtr table_engine_definition;
6368

6469
DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const;
6570
DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const;
6671
void setCredentials(TableMetadata & metadata) const;
72+
73+
/// The Glue catalog does not store detailed information about the types of timestamp columns, such as whether the column is timestamp or timestamptz.
74+
/// This method allows to clarify the actual type of the timestamp column.
75+
bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const;
76+
77+
mutable DB::CacheBase<String, Poco::JSON::Object::Ptr> metadata_objects;
6778
};
6879

6980
}

tests/integration/test_database_glue/test.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pyarrow as pa
88
import pytest
99
import urllib3
10+
import pytz
1011
from datetime import datetime, timedelta
1112
from minio import Minio
1213
from pyiceberg.catalog import load_catalog
@@ -22,6 +23,7 @@
2223
StringType,
2324
StructType,
2425
TimestampType,
26+
TimestamptzType,
2527
MapType,
2628
DecimalType,
2729
)
@@ -345,3 +347,50 @@ def test_hide_sensitive_info(started_cluster):
345347
)
346348
assert "SECRET_1" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}")
347349
assert "SECRET_2" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}")
350+
351+
352+
def test_timestamps(started_cluster):
353+
node = started_cluster.instances["node1"]
354+
355+
test_ref = f"test_list_tables_{uuid.uuid4()}"
356+
table_name = f"{test_ref}_table"
357+
root_namespace = f"{test_ref}_namespace"
358+
359+
catalog = load_catalog_impl(started_cluster)
360+
catalog.create_namespace(root_namespace)
361+
362+
schema = Schema(
363+
NestedField(
364+
field_id=1, name="timestamp", field_type=TimestampType(), required=False
365+
),
366+
NestedField(
367+
field_id=2,
368+
name="timestamptz",
369+
field_type=TimestamptzType(),
370+
required=False,
371+
),
372+
)
373+
table = create_table(catalog, root_namespace, table_name, schema)
374+
375+
create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME)
376+
377+
data = [
378+
{
379+
"timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0),
380+
"timestamptz": datetime(
381+
2024,
382+
1,
383+
1,
384+
hour=12,
385+
minute=0,
386+
second=0,
387+
microsecond=0,
388+
tzinfo=pytz.timezone("UTC"),
389+
)
390+
}
391+
]
392+
df = pa.Table.from_pylist(data)
393+
table.append(df)
394+
395+
assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-glue/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
396+
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n"

0 commit comments

Comments
 (0)