Skip to content

Commit 267f840

Browse files
authored
Merge pull request ClickHouse#78475 from ClickHouse/divanik/add_some_more_convenient_metadata_resolving_settings
Add some more convenient Iceberg metadata resolving settings
2 parents 1ec0277 + 48e2958 commit 267f840

File tree

25 files changed

+661
-39
lines changed

25 files changed

+661
-39
lines changed

docs/en/engines/table-engines/integrations/iceberg.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,43 @@ The second one is that while doing time travel you can't get state of table befo
247247

248248
In Clickhouse the behavior is consistent with Spark. You can mentally replace Spark Select queries with Clickhouse Select queries and it will work the same way.
249249

250+
## Metadata File Resolution {#metadata-file-resolution}
251+
When using the `Iceberg` table engine in ClickHouse, the system needs to locate the correct metadata.json file that describes the Iceberg table structure. Here's how this resolution process works:
252+
253+
### Candidates search (in Priority Order) {#candidate-search}
254+
255+
1. **Direct Path Specification**:
256+
* If you set `iceberg_metadata_file_path`, the system will use this exact path by combining it with the Iceberg table directory path.
257+
* When this setting is provided, all other resolution settings are ignored.
258+
259+
2. **Table UUID Matching**:
260+
* If `iceberg_metadata_table_uuid` is specified, the system will:
261+
* Look only at `.metadata.json` files in the `metadata` directory
262+
* Filter for files containing a `table-uuid` field matching your specified UUID (case-insensitive)
263+
264+
3. **Default Search**:
265+
* If neither of the above settings are provided, all `.metadata.json` files in the `metadata` directory become candidates
266+
267+
### Selecting the Most Recent File {#most-recent-file}
268+
269+
After identifying candidate files using the above rules, the system determines which one is the most recent:
270+
271+
* If `iceberg_recent_metadata_file_by_last_updated_ms_field` is enabled:
272+
* The file with the largest `last-updated-ms` value is selected
273+
274+
* Otherwise:
275+
* The file with the highest version number is selected
276+
* (Version appears as `V` in filenames formatted as `V.metadata.json` or `V-uuid.metadata.json`)
277+
278+
**Note**: All mentioned settings are engine-level settings and must be specified during table creation as shown below:
279+
280+
```sql
281+
CREATE TABLE example_table ENGINE = Iceberg(
282+
's3://bucket/path/to/iceberg_table'
283+
) SETTINGS iceberg_metadata_table_uuid = '6f6f6407-c6a5-465f-a808-ea8900e35a38';
284+
```
285+
286+
**Note**: While Iceberg Catalogs typically handle metadata resolution, the `Iceberg` table engine in ClickHouse directly interprets files stored in S3 as Iceberg tables, which is why understanding these resolution rules is important.
250287

251288
## Data cache {#data-cache}
252289

docs/en/sql-reference/table-functions/iceberg.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,44 @@ The second one is that while doing time travel you can't get state of table befo
239239

240240
In Clickhouse the behavior is consistent with Spark. You can mentally replace Spark Select queries with Clickhouse Select queries and it will work the same way.
241241

242+
## Metadata File Resolution {#metadata-file-resolution}
243+
244+
When using the `iceberg` table function in ClickHouse, the system needs to locate the correct metadata.json file that describes the Iceberg table structure. Here's how this resolution process works:
245+
246+
### Candidate Search (in Priority Order) {#candidate-search}
247+
248+
1. **Direct Path Specification**:
249+
* If you set `iceberg_metadata_file_path`, the system will use this exact path by combining it with the Iceberg table directory path.
250+
* When this setting is provided, all other resolution settings are ignored.
251+
252+
2. **Table UUID Matching**:
253+
* If `iceberg_metadata_table_uuid` is specified, the system will:
254+
* Look only at `.metadata.json` files in the `metadata` directory
255+
* Filter for files containing a `table-uuid` field matching your specified UUID (case-insensitive)
256+
257+
3. **Default Search**:
258+
* If neither of the above settings are provided, all `.metadata.json` files in the `metadata` directory become candidates
259+
260+
### Selecting the Most Recent File {#most-recent-file}
261+
262+
After identifying candidate files using the above rules, the system determines which one is the most recent:
263+
264+
* If `iceberg_recent_metadata_file_by_last_updated_ms_field` is enabled:
265+
* The file with the largest `last-updated-ms` value is selected
266+
267+
* Otherwise:
268+
* The file with the highest version number is selected
269+
* (Version appears as `V` in filenames formatted as `V.metadata.json` or `V-uuid.metadata.json`)
270+
271+
**Note**: All mentioned settings are table function settings (not global or query-level settings) and must be specified as shown below:
272+
273+
```sql
274+
SELECT * FROM iceberg('s3://bucket/path/to/iceberg_table',
275+
SETTINGS iceberg_metadata_table_uuid = 'a90eed4c-f74b-4e5b-b630-096fb9d09021');
276+
```
277+
278+
**Note**: While Iceberg Catalogs typically handle metadata resolution, the `iceberg` table function in ClickHouse directly interprets files stored in S3 as Iceberg tables, which is why understanding these resolution rules is important.
279+
242280
## Metadata cache {#metadata-cache}
243281

244282
`Iceberg` table engine and table function support metadata cache storing the information of manifest files, manifest list and metadata json. The cache is stored in memory. This feature is controlled by setting `use_iceberg_metadata_files_cache`, which is enabled by default.

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 133 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ namespace DB
3535
namespace StorageObjectStorageSetting
3636
{
3737
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
38+
extern const StorageObjectStorageSettingsString iceberg_metadata_table_uuid;
39+
extern const StorageObjectStorageSettingsBool iceberg_recent_metadata_file_by_last_updated_ms_field;
3840
}
3941

4042
namespace ErrorCodes
@@ -67,7 +69,10 @@ constexpr const char * SNAPSHOT_LOG_FIELD = "snapshot-log";
6769
constexpr const char * TIMESTAMP_FIELD_INSIDE_SNAPSHOT = "timestamp-ms";
6870
constexpr const char * TABLE_LOCATION_FIELD = "location";
6971
constexpr const char * SNAPSHOTS_FIELD = "snapshots";
72+
constexpr const char * LAST_UPDATED_MS_FIELD = "last-updated-ms";
7073

74+
namespace
75+
{
7176

7277
std::pair<Int32, Poco::JSON::Object::Ptr>
7378
parseTableSchemaFromManifestFile(const AvroForIcebergDeserializer & deserializer, const String & manifest_file_name)
@@ -86,6 +91,38 @@ parseTableSchemaFromManifestFile(const AvroForIcebergDeserializer & deserializer
8691
}
8792

8893

94+
std::string normalizeUuid(const std::string & uuid)
95+
{
96+
std::string result;
97+
result.reserve(uuid.size());
98+
for (char c : uuid)
99+
{
100+
if (std::isalnum(c))
101+
{
102+
result.push_back(std::tolower(c));
103+
}
104+
}
105+
return result;
106+
}
107+
108+
Poco::JSON::Object::Ptr
109+
readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log)
110+
{
111+
ObjectInfo object_info(metadata_file_path);
112+
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
113+
114+
String json_str;
115+
readJSONObjectPossiblyInvalid(json_str, *buf);
116+
117+
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
118+
Poco::Dynamic::Var json = parser.parse(json_str);
119+
return json.extract<Poco::JSON::Object::Ptr>();
120+
}
121+
122+
123+
}
124+
125+
89126
IcebergMetadata::IcebergMetadata(
90127
ObjectStoragePtr object_storage_,
91128
ConfigurationObserverPtr configuration_,
@@ -244,38 +281,114 @@ static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & pa
244281
return std::make_pair(std::stoi(version_str), path);
245282
}
246283

284+
enum class MostRecentMetadataFileSelectionWay
285+
{
286+
BY_LAST_UPDATED_MS_FIELD,
287+
BY_METADATA_FILE_VERSION
288+
};
289+
290+
struct ShortMetadataFileInfo
291+
{
292+
UInt32 version;
293+
UInt64 last_updated_ms;
294+
String path;
295+
};
296+
297+
247298
/**
248299
* Each version of table metadata is stored in a `metadata` directory and
249300
* has one of 2 formats:
250301
* 1) v<V>.metadata.json, where V - metadata version.
251302
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
252303
*/
253-
static std::pair<Int32, String>
254-
getLatestMetadataFileAndVersion(const ObjectStoragePtr & object_storage, const StorageObjectStorage::Configuration & configuration)
304+
static std::pair<Int32, String> getLatestMetadataFileAndVersion(
305+
const ObjectStoragePtr & object_storage,
306+
const StorageObjectStorage::Configuration & configuration,
307+
const ContextPtr & local_context,
308+
const std::optional<String> & table_uuid)
255309
{
310+
auto log = getLogger("IcebergMetadataFileResolver");
311+
MostRecentMetadataFileSelectionWay selection_way
312+
= configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value
313+
? MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD
314+
: MostRecentMetadataFileSelectionWay::BY_METADATA_FILE_VERSION;
315+
bool need_all_metadata_files_parsing
316+
= (selection_way == MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD) || table_uuid.has_value();
256317
const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json");
257318
if (metadata_files.empty())
258319
{
259320
throw Exception(
260321
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.getPath());
261322
}
262-
std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
323+
std::vector<ShortMetadataFileInfo> metadata_files_with_versions;
263324
metadata_files_with_versions.reserve(metadata_files.size());
264325
for (const auto & path : metadata_files)
265326
{
266-
metadata_files_with_versions.emplace_back(getMetadataFileAndVersion(path));
327+
auto [version, metadata_file_path] = getMetadataFileAndVersion(path);
328+
if (need_all_metadata_files_parsing)
329+
{
330+
auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log);
331+
if (table_uuid.has_value())
332+
{
333+
if (metadata_file_object->has("table-uuid"))
334+
{
335+
auto current_table_uuid = metadata_file_object->getValue<String>("table-uuid");
336+
if (normalizeUuid(table_uuid.value()) == normalizeUuid(current_table_uuid))
337+
{
338+
metadata_files_with_versions.emplace_back(
339+
version, metadata_file_object->getValue<UInt64>(LAST_UPDATED_MS_FIELD), metadata_file_path);
340+
}
341+
}
342+
else
343+
{
344+
Int64 format_version = metadata_file_object->getValue<Int64>(FORMAT_VERSION_FIELD);
345+
throw Exception(
346+
format_version == 1 ? ErrorCodes::BAD_ARGUMENTS : ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION,
347+
"Table UUID is not specified in some metadata files for table by path {}",
348+
metadata_file_path);
349+
}
350+
}
351+
else
352+
{
353+
metadata_files_with_versions.emplace_back(version, metadata_file_object->getValue<UInt64>(LAST_UPDATED_MS_FIELD), metadata_file_path);
354+
}
355+
}
356+
else
357+
{
358+
metadata_files_with_versions.emplace_back(version, 0, metadata_file_path);
359+
}
267360
}
268361

269362
/// Get the latest version of metadata file: v<V>.metadata.json
270-
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
363+
const ShortMetadataFileInfo & latest_metadata_file_info = [&]()
364+
{
365+
if (selection_way == MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD)
366+
{
367+
return *std::max_element(
368+
metadata_files_with_versions.begin(),
369+
metadata_files_with_versions.end(),
370+
[](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.last_updated_ms < b.last_updated_ms; });
371+
}
372+
else
373+
{
374+
return *std::max_element(
375+
metadata_files_with_versions.begin(),
376+
metadata_files_with_versions.end(),
377+
[](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; });
378+
}
379+
}();
380+
return {latest_metadata_file_info.version, latest_metadata_file_info.path};
271381
}
272382

273-
static std::pair<Int32, String> getLatestOrExplicitMetadataFileAndVersion(const ObjectStoragePtr & object_storage, const StorageObjectStorage::Configuration & configuration, Poco::Logger * log)
383+
static std::pair<Int32, String> getLatestOrExplicitMetadataFileAndVersion(
384+
const ObjectStoragePtr & object_storage,
385+
const StorageObjectStorage::Configuration & configuration,
386+
const ContextPtr & local_context,
387+
Poco::Logger * log)
274388
{
275-
auto explicit_metadata_path = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].value;
276-
std::pair<Int32, String> result;
277-
if (!explicit_metadata_path.empty())
389+
if (configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].changed)
278390
{
391+
auto explicit_metadata_path = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].value;
279392
try
280393
{
281394
LOG_TEST(log, "Explicit metadata file path is specified {}, will read from this metadata file", explicit_metadata_path);
@@ -289,55 +402,37 @@ static std::pair<Int32, String> getLatestOrExplicitMetadataFileAndVersion(const
289402
auto prefix_storage_path = configuration.getPath();
290403
if (!explicit_metadata_path.starts_with(prefix_storage_path))
291404
explicit_metadata_path = std::filesystem::path(prefix_storage_path) / explicit_metadata_path;
292-
result = getMetadataFileAndVersion(explicit_metadata_path);
405+
return getMetadataFileAndVersion(explicit_metadata_path);
293406
}
294407
catch (const std::exception & ex)
295408
{
296409
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid path {} specified for iceberg_metadata_file_path: '{}'", explicit_metadata_path, ex.what());
297410
}
298411
}
299-
else
412+
else if (configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_table_uuid].changed)
300413
{
301-
result = getLatestMetadataFileAndVersion(object_storage, configuration);
414+
std::optional<String> table_uuid = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_table_uuid].value;
415+
return getLatestMetadataFileAndVersion(object_storage, configuration, local_context, table_uuid);
302416
}
303-
304-
return result;
305-
}
306-
307-
308-
Poco::JSON::Object::Ptr IcebergMetadata::readJSON(const String & metadata_file_path, const ContextPtr & local_context) const
309-
{
310-
auto configuration_ptr = configuration.lock();
311-
auto create_fn = [&]()
312-
{
313-
ObjectInfo object_info(metadata_file_path);
314-
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
315-
316-
String json_str;
317-
readJSONObjectPossiblyInvalid(json_str, *buf);
318-
319-
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
320-
Poco::Dynamic::Var json = parser.parse(json_str);
321-
return std::make_pair(json.extract<Poco::JSON::Object::Ptr>(), json.size());
322-
};
323-
if (manifest_cache)
417+
else
324418
{
325-
return manifest_cache->getOrSetTableMetadata(IcebergMetadataFilesCache::getKey(configuration_ptr, metadata_file_path), create_fn);
419+
return getLatestMetadataFileAndVersion(object_storage, configuration, local_context, std::nullopt);
326420
}
327-
return create_fn().first;
328421
}
329422

423+
330424
bool IcebergMetadata::update(const ContextPtr & local_context)
331425
{
332426
auto configuration_ptr = configuration.lock();
333427

334-
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, log.get());
428+
const auto [metadata_version, metadata_file_path]
429+
= getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
335430

336431
bool metadata_file_changed = false;
337432
if (last_metadata_version != metadata_version)
338433
{
339434
last_metadata_version = metadata_version;
340-
last_metadata_object = readJSON(metadata_file_path, local_context);
435+
last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log);
341436
metadata_file_changed = true;
342437
}
343438

@@ -499,7 +594,7 @@ DataLakeMetadataPtr IcebergMetadata::create(
499594
else
500595
LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false.");
501596

502-
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, log.get());
597+
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
503598

504599
auto create_fn = [&]()
505600
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
135135

136136
std::optional<String> getRelevantManifestList(const Poco::JSON::Object::Ptr & metadata);
137137

138-
Poco::JSON::Object::Ptr readJSON(const String & metadata_file_path, const ContextPtr & local_context) const;
138+
Strings getDataFilesImpl(const ActionsDAG * filter_dag) const;
139+
140+
Iceberg::ManifestFilePtr tryGetManifestFile(const String & filename) const;
139141
};
140142
}
141143

src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ Whether delta-lake read schema is the same as table schema.
2626
DECLARE(String, iceberg_metadata_file_path, "", R"(
2727
Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only.
2828
)", 0) \
29+
DECLARE(String, iceberg_metadata_table_uuid, "", R"(
30+
Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set.
31+
)", 0) \
32+
DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"(
33+
If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path.
34+
)", 0)
2935

3036
// clang-format on
3137

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
1 test1
2+
2 test2
3+
3 test3
4+
4 test4
5+
5 test5
6+
6 test6
7+
0
8+
0
9+
0
10+
5 test5
11+
6 test6
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- Tags: no-fasttest
2+
-- Tag no-fasttest: Depends on AWS
3+
4+
SELECT * FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_metadata_table_uuid = 'ea8d1178-7756-4b89-b21f-00e9f31fe03e') ORDER BY id;
5+
SELECT * FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_metadata_table_uuid = 'A90EED4CF74B4E5BB630096FB9D09021') ORDER BY id;
6+
SELECT * FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_metadata_table_uuid = '6f6f6407_c6A5465f_A808ea8900_e35a38') ORDER BY id;
7+
8+
SELECT count() FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_metadata_file_path = 'metadata/00001-aec4e034-3f73-48f7-87ad-51b7b42a8db7.metadata.json');
9+
SELECT count() FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_metadata_file_path = 'metadata/00001-2aad93a8-a893-4943-8504-f6021f83ecab.metadata.json');
10+
SELECT count() FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_metadata_file_path = 'metadata/00001-aec4e034-3f73-48f7-87ad-51b7b42a8db7.metadata.json');
11+
12+
13+
SELECT * FROM icebergS3(s3_conn, filename='merged_several_tables_test', SETTINGS iceberg_recent_metadata_file_by_last_updated_ms_field = true) ORDER BY id;

0 commit comments

Comments
 (0)