Skip to content

Commit 693da52

Browse files
authored
Merge pull request #1070 from Altinity/zvonand-fix-glue-again
Glue: Deduce Iceberg table metadata location if `metadata_location` not specified
2 parents cf4d137 + 0402d8d commit 693da52

File tree

2 files changed

+160
-15
lines changed

2 files changed

+160
-15
lines changed

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 158 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,31 @@ bool GlueCatalog::tryGetTableMetadata(
317317
{
318318
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")});
319319
}
320+
else if (table_outcome.GetStorageDescriptor().LocationHasBeenSet())
321+
{
322+
const auto & location = table_outcome.GetStorageDescriptor().GetLocation();
323+
324+
std::string location_with_slash = location;
325+
if (!location_with_slash.ends_with('/'))
326+
location_with_slash += '/';
327+
328+
// Resolve the actual metadata file path based on table location
329+
std::string resolved_metadata_path = resolveMetadataPathFromTableLocation(location_with_slash, result);
330+
if (resolved_metadata_path.empty())
331+
{
332+
result.setTableIsNotReadable(fmt::format("Could not determine metadata_location of table `{}`. ",
333+
database_name + "." + table_name));
334+
}
335+
else
336+
{
337+
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = resolved_metadata_path});
338+
}
339+
}
320340
else
321341
{
322-
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
323-
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
324-
database_name + "." + table_name));
342+
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
343+
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
344+
database_name + "." + table_name));
325345
}
326346
};
327347

@@ -415,37 +435,41 @@ bool GlueCatalog::empty() const
415435
bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const
416436
{
417437
String metadata_path;
438+
String metadata_uri;
418439
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
419440
table_specific_properties.has_value())
420441
{
421442
metadata_path = table_specific_properties->iceberg_metadata_file_location;
443+
metadata_uri = metadata_path;
422444
if (metadata_path.starts_with("s3:/"))
423445
metadata_path = metadata_path.substr(5);
424446

425-
// Delete bucket
447+
// Delete bucket from path
426448
std::size_t pos = metadata_path.find('/');
427449
if (pos != std::string::npos)
428450
metadata_path = metadata_path.substr(pos + 1);
429451
}
430452
else
431-
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined");
453+
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to read table metadata, reason why table is unreadable: {}", table_metadata.getReasonWhyTableIsUnreadable());
432454

433-
if (!metadata_objects.get(metadata_path))
455+
if (!metadata_objects.get(metadata_uri))
434456
{
435457
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
436458
DB::ASTs args = storage->engine->arguments->children;
437459

438-
auto table_endpoint = settings.storage_endpoint;
460+
String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : metadata_uri;
461+
439462
if (args.empty())
440-
args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint));
463+
args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint));
441464
else
442-
args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint);
465+
args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint);
443466

444-
if (args.size() == 1 && table_metadata.hasStorageCredentials())
467+
if (args.size() == 1)
445468
{
446-
auto storage_credentials = table_metadata.getStorageCredentials();
447-
if (storage_credentials)
448-
storage_credentials->addCredentialsToEngineArgs(args);
469+
if (table_metadata.hasStorageCredentials())
470+
table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args);
471+
else if (!credentials.IsExpiredOrEmpty())
472+
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
449473
}
450474

451475
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
@@ -464,9 +488,9 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
464488
Poco::JSON::Parser parser;
465489
Poco::Dynamic::Var result = parser.parse(metadata_file);
466490
auto metadata_object = result.extract<Poco::JSON::Object::Ptr>();
467-
metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
491+
metadata_objects.set(metadata_uri, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
468492
}
469-
auto metadata_object = *metadata_objects.get(metadata_path);
493+
auto metadata_object = *metadata_objects.get(metadata_uri);
470494
auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id");
471495
auto schemas = metadata_object->getArray(DB::Iceberg::f_schemas);
472496
for (size_t i = 0; i < schemas->size(); ++i)
@@ -487,6 +511,125 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
487511
return false;
488512
}
489513

514+
/// This function tries two resolve the metadata file path by following means:
515+
/// 1. Tries to read version-hint.text to get the latest version.
516+
/// 2. Lists all *.metadata.json files in the metadata directory and takes the most recent one.
517+
String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const
518+
{
519+
// Construct path to version-hint.text
520+
String version_hint_path = table_location + "metadata/version-hint.text";
521+
522+
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
523+
DB::ASTs args = storage->engine->arguments->children;
524+
525+
String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : table_location;
526+
if (args.empty())
527+
args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint));
528+
else
529+
args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint);
530+
531+
if (args.size() == 1 && table_metadata.hasStorageCredentials())
532+
{
533+
auto storage_credentials = table_metadata.getStorageCredentials();
534+
if (storage_credentials)
535+
storage_credentials->addCredentialsToEngineArgs(args);
536+
}
537+
538+
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
539+
storage_settings->loadFromSettingsChanges(settings.allChanged());
540+
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
541+
configuration->initialize(args, getContext(), false);
542+
543+
auto object_storage = configuration->createObjectStorage(getContext(), true);
544+
const auto & read_settings = getContext()->getReadSettings();
545+
546+
try
547+
{
548+
// Try to read version-hint.text to get the latest version
549+
String version_hint_object_path = version_hint_path;
550+
if (version_hint_object_path.starts_with("s3://"))
551+
{
552+
version_hint_object_path = version_hint_object_path.substr(5);
553+
// Remove bucket from path
554+
std::size_t pos = version_hint_object_path.find('/');
555+
if (pos != std::string::npos)
556+
version_hint_object_path = version_hint_object_path.substr(pos + 1);
557+
}
558+
559+
DB::StoredObject version_hint_stored_object(version_hint_object_path);
560+
auto version_hint_buf = object_storage->readObject(version_hint_stored_object, read_settings);
561+
String version_str;
562+
readString(version_str, *version_hint_buf);
563+
564+
boost::algorithm::trim(version_str);
565+
566+
LOG_TRACE(log, "Read version {} from version-hint.text for table location '{}'", version_str, table_location);
567+
568+
return table_location + "metadata/v" + version_str + "-metadata.json";
569+
}
570+
catch (...)
571+
{
572+
LOG_TRACE(log, "Could not read version-hint.text from '{}', trying to find latest metadata file", version_hint_path);
573+
574+
try
575+
{
576+
String bucket_with_prefix;
577+
String metadata_dir = table_location + "metadata/";
578+
String metadata_dir_path = metadata_dir;
579+
580+
if (metadata_dir_path.starts_with("s3://"))
581+
{
582+
metadata_dir_path = metadata_dir_path.substr(5);
583+
// Remove bucket from path
584+
std::size_t pos = metadata_dir_path.find('/');
585+
if (pos != std::string::npos)
586+
{
587+
metadata_dir_path = metadata_dir_path.substr(pos + 1);
588+
bucket_with_prefix = table_location.substr(0, pos + 6);
589+
}
590+
}
591+
else
592+
return "";
593+
594+
// List all files in metadata directory
595+
DB::RelativePathsWithMetadata files;
596+
object_storage->listObjects(metadata_dir_path, files, 0);
597+
598+
// Filter for .metadata.json files and find the most recent one
599+
String latest_metadata_file;
600+
std::optional<DB::ObjectMetadata> latest_metadata;
601+
602+
for (const auto & file : files)
603+
{
604+
if (file->getPath().ends_with(".metadata.json"))
605+
{
606+
// Get file metadata to check last modified time
607+
if (!latest_metadata.has_value() ||
608+
(file->metadata->last_modified > latest_metadata->last_modified))
609+
{
610+
latest_metadata_file = file->getPath();
611+
latest_metadata = file->metadata;
612+
}
613+
}
614+
}
615+
616+
if (!latest_metadata_file.empty())
617+
{
618+
LOG_TRACE(log, "Found latest metadata file: {}", latest_metadata_file);
619+
return bucket_with_prefix + latest_metadata_file;
620+
}
621+
622+
LOG_TRACE(log, "No <...>.metadata.json files found,");
623+
return "";
624+
}
625+
catch (...)
626+
{
627+
LOG_TRACE(log, "Failed to list metadata directory");
628+
return "";
629+
}
630+
}
631+
}
632+
490633
void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) const
491634
{
492635
Aws::Glue::Model::CreateDatabaseRequest create_request;

src/Databases/DataLake/GlueCatalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
8181
/// This method allows to clarify the actual type of the timestamp column.
8282
bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const;
8383

84+
String resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const;
85+
8486
mutable DB::CacheBase<String, Poco::JSON::Object::Ptr> metadata_objects;
8587
};
8688

0 commit comments

Comments
 (0)