-
Notifications
You must be signed in to change notification settings - Fork 17
Feature: Support TRUNCATE TABLE for Iceberg engine #1529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: antalya-26.1
Are you sure you want to change the base?
Changes from 14 commits
de68832
601250f
27a8467
84d76f1
c81fc7f
876577b
cc1c7db
5a7e014
2148724
4681e4e
12657bb
bd1d16e
11810d8
2b66954
cf85627
e70e425
4cfc411
47c8acc
2155e32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,9 @@ | |
| #include <Core/UUID.h> | ||
| #include <DataTypes/DataTypeSet.h> | ||
| #include <Formats/FormatFilterInfo.h> | ||
| #include <Formats/FormatParserSharedResources.h> | ||
| #include <Formats/ReadSchemaUtils.h> | ||
| #include <Functions/FunctionFactory.h> | ||
| #include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h> | ||
| #include <Functions/IFunctionAdaptors.h> | ||
| #include <Functions/tuple.h> | ||
| #include <Processors/Formats/ISchemaReader.h> | ||
|
|
@@ -100,6 +100,7 @@ extern const int NOT_IMPLEMENTED; | |
| extern const int ICEBERG_SPECIFICATION_VIOLATION; | ||
| extern const int TABLE_ALREADY_EXISTS; | ||
| extern const int SUPPORT_IS_DISABLED; | ||
| extern const int INCORRECT_DATA; | ||
| } | ||
|
|
||
| namespace Setting | ||
|
|
@@ -531,6 +532,97 @@ void IcebergMetadata::mutate( | |
| ); | ||
| } | ||
|
|
||
| void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) | ||
| { | ||
| if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) | ||
| throw Exception( | ||
| ErrorCodes::SUPPORT_IS_DISABLED, | ||
| "Iceberg truncate is experimental. " | ||
| "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); | ||
|
|
||
| // Bug 1 fix: REMOVE the isTransactional() guard entirely. | ||
| // REST/transactional catalogs are the primary target of this feature. | ||
|
|
||
| auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context); | ||
| auto metadata_object = getMetadataJSONObject( | ||
| actual_table_state_snapshot.metadata_file_path, | ||
| object_storage, | ||
| persistent_components.metadata_cache, | ||
| context, | ||
| log, | ||
| persistent_components.metadata_compression_method, | ||
| persistent_components.table_uuid); | ||
|
|
||
| // Bug 4 fix: use -1 as the Iceberg "no parent" sentinel | ||
| Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1); | ||
|
|
||
| auto config_path = persistent_components.table_path; | ||
| if (!config_path.starts_with('/')) config_path = '/' + config_path; | ||
| if (!config_path.ends_with('/')) config_path += "/"; | ||
|
|
||
| // Bug 3 fix: restore isTransactional flag in FileNamesGenerator | ||
| bool is_transactional = (catalog != nullptr && catalog->isTransactional()); | ||
| FileNamesGenerator filename_generator; | ||
| // Transactional catalogs (REST) require full S3 URIs — force location-based path. | ||
| // Non-transactional respects the write_full_path_in_iceberg_metadata setting. | ||
| if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]) | ||
| { | ||
| String location = metadata_object->getValue<String>(Iceberg::f_location); | ||
| if (!location.ends_with("/")) location += "/"; | ||
| filename_generator = FileNamesGenerator( | ||
| location, config_path, is_transactional, | ||
| persistent_components.metadata_compression_method, write_format); | ||
| } | ||
| else | ||
| { | ||
| filename_generator = FileNamesGenerator( | ||
| config_path, config_path, false, | ||
| persistent_components.metadata_compression_method, write_format); | ||
| } | ||
|
|
||
| Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1; | ||
| filename_generator.setVersion(new_metadata_version); | ||
|
|
||
| auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); | ||
|
|
||
| auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata( | ||
| filename_generator, metadata_name, parent_snapshot_id, | ||
| 0, 0, 0, 0, 0, 0, std::nullopt, std::nullopt, /*is_truncate=*/true); | ||
|
||
|
|
||
| auto write_settings = context->getWriteSettings(); | ||
| auto buf = object_storage->writeObject( | ||
| StoredObject(storage_manifest_list_name), | ||
| WriteMode::Rewrite, std::nullopt, | ||
| DBMS_DEFAULT_BUFFER_SIZE, write_settings); | ||
|
|
||
| generateManifestList(filename_generator, metadata_object, object_storage, context, | ||
| {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false); | ||
| buf->finalize(); | ||
|
|
||
| String metadata_content = dumpMetadataObjectToString(metadata_object); | ||
| writeMessageToFile(metadata_content, storage_metadata_name, object_storage, | ||
| context, "*", "", persistent_components.metadata_compression_method); | ||
|
|
||
| // Bug 2 fix: restore the catalog commit, matching the pattern from IcebergWrites.cpp | ||
| if (catalog) | ||
| { | ||
| // Build the catalog-visible path (blob URI for transactional, bare path otherwise) | ||
| String catalog_filename = metadata_name; | ||
| if (is_transactional) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see in IcebergWrites the check is different, but leads to the same result. To the reader like me, it is not very clear why transactional needs a different URI. I suggest one of the below:
|
||
| { | ||
| const String blob_storage_type_name = Poco::toLower(String(magic_enum::enum_name(object_storage->getType()))); | ||
| const auto blob_storage_namespace_name = persistent_components.table_path; | ||
| catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; | ||
| } | ||
|
|
||
| const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); | ||
| // Pass metadata_object (not new_snapshot) — matches the fix already applied in | ||
| // IcebergWrites.cpp and Mutations.cpp | ||
| if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) | ||
| throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to commit Iceberg truncate update to catalog."); | ||
| } | ||
| } | ||
|
|
||
| void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands) | ||
| { | ||
| for (const auto & command : commands) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -451,6 +451,53 @@ void generateManifestList( | |
| else | ||
| throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version); | ||
|
|
||
| // For empty manifest list (e.g. TRUNCATE), write a valid Avro container | ||
| // file manually so we can embed the full schema JSON with field-ids intact, | ||
| // without triggering the DataFileWriter constructor's eager writeHeader() | ||
| // which commits encoder state before we can override avro.schema. | ||
| if (manifest_entry_names.empty() && !use_previous_snapshots) | ||
| { | ||
| auto write_avro_long = [](WriteBuffer & out, int64_t val) | ||
| { | ||
| uint64_t n = (static_cast<uint64_t>(val) << 1) ^ static_cast<uint64_t>(val >> 63); | ||
|
||
| while (n & ~0x7fULL) | ||
| { | ||
| char c = static_cast<char>((n & 0x7f) | 0x80); | ||
| out.write(&c, 1); | ||
| n >>= 7; | ||
| } | ||
| char c = static_cast<char>(n); | ||
| out.write(&c, 1); | ||
| }; | ||
|
|
||
| auto write_avro_bytes = [&](WriteBuffer & out, const String & s) | ||
| { | ||
| write_avro_long(out, static_cast<int64_t>(s.size())); | ||
| out.write(s.data(), s.size()); | ||
| }; | ||
|
|
||
| // Avro Object Container File header | ||
| buf.write("Obj\x01", 4); | ||
|
|
||
| // Metadata map: 2 entries (codec + schema) | ||
| write_avro_long(buf, 2); | ||
| write_avro_bytes(buf, "avro.codec"); | ||
| write_avro_bytes(buf, "null"); | ||
| write_avro_bytes(buf, "avro.schema"); | ||
| write_avro_bytes(buf, schema_representation); // full JSON, field-ids intact | ||
|
|
||
| // End of metadata map | ||
| write_avro_long(buf, 0); | ||
|
|
||
| // Sync marker (16 zero bytes — valid, no data blocks follow) | ||
| static const char sync_marker[16] = {}; | ||
| buf.write(sync_marker, 16); | ||
|
|
||
| // No data blocks for empty manifest list | ||
| buf.finalize(); | ||
| return; | ||
| } | ||
|
|
||
| auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT | ||
|
|
||
| auto adapter = std::make_unique<OutputStreamWriteBufferAdapter>(buf); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,7 +113,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( | |
| Int32 added_delete_files, | ||
| Int32 num_deleted_rows, | ||
| std::optional<Int64> user_defined_snapshot_id, | ||
| std::optional<Int64> user_defined_timestamp) | ||
| std::optional<Int64> user_defined_timestamp, | ||
| bool is_truncate) | ||
| { | ||
| int format_version = metadata_object->getValue<Int32>(Iceberg::f_format_version); | ||
| Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object; | ||
|
|
@@ -137,7 +138,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( | |
|
|
||
| auto parent_snapshot = getParentSnapshot(parent_snapshot_id); | ||
| Poco::JSON::Object::Ptr summary = new Poco::JSON::Object; | ||
| if (num_deleted_rows == 0) | ||
| if (is_truncate) | ||
| { | ||
| summary->set(Iceberg::f_operation, Iceberg::f_overwrite); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On AbstractionI DID try avro::DataFileWriter first and it's the organic one. But, unfortunately in the vendored Avro C++ version, writeHeader() is called eagerly in the constructor committing the binary encoder state. Calling setMetadata("avro.schema", ...) post-construction corrupts StreamWriter::next_ causing a NULL dereference on close(). I have a confirmed server segfault stack trace from this exact path. The manual OCF serialization exists because the abstraction is broken for this specific use case in this library version. On sum_with_parent_snapshotThe is_truncate guard inside sum_with_parent_snapshot already correctly zeroes cumulative totals (total_records, total_data_files, etc.) — you're correct that we could lean on it there. However deleted_records and deleted_data_files are different: they need to be populated from the parent snapshot's values (not zero), since they represent what was removed. That's why they're set separately. Glad to add a comment making this distinction explicit. |
||
| Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_records)) : 0; | ||
| Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_data_files)) : 0; | ||
|
|
||
| summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records)); | ||
| summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files)); | ||
| } | ||
| else if (num_deleted_rows == 0) | ||
| { | ||
| summary->set(Iceberg::f_operation, Iceberg::f_append); | ||
| summary->set(Iceberg::f_added_data_files, std::to_string(added_files)); | ||
|
|
@@ -157,7 +167,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( | |
|
|
||
| auto sum_with_parent_snapshot = [&](const char * field_name, Int32 snapshot_value) | ||
| { | ||
| Int32 prev_value = parent_snapshot ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0; | ||
| if (is_truncate) | ||
| { | ||
| summary->set(field_name, std::to_string(0)); | ||
| return; | ||
| } | ||
| Int32 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0; | ||
| summary->set(field_name, std::to_string(prev_value + snapshot_value)); | ||
| }; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| from pyiceberg.catalog import load_catalog | ||
| from helpers.config_cluster import minio_secret_key, minio_access_key | ||
| import uuid | ||
| import pyarrow as pa | ||
| from pyiceberg.schema import Schema, NestedField | ||
| from pyiceberg.types import LongType, StringType | ||
| from pyiceberg.partitioning import PartitionSpec | ||
|
|
||
| BASE_URL_LOCAL_RAW = "http://localhost:8182" | ||
| CATALOG_NAME = "demo" | ||
|
|
||
| def load_catalog_impl(started_cluster): | ||
| return load_catalog( | ||
| CATALOG_NAME, | ||
| **{ | ||
| "uri": BASE_URL_LOCAL_RAW, | ||
| "type": "rest", | ||
| "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", | ||
| "s3.access-key-id": minio_access_key, | ||
| "s3.secret-access-key": minio_secret_key, | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| def test_iceberg_truncate(started_cluster_iceberg_no_spark): | ||
| instance = started_cluster_iceberg_no_spark.instances["node1"] | ||
| catalog = load_catalog_impl(started_cluster_iceberg_no_spark) | ||
|
|
||
| # 1. Setup PyIceberg Namespace and Table | ||
| namespace = f"clickhouse_truncate_{uuid.uuid4().hex}" | ||
| catalog.create_namespace(namespace) | ||
|
|
||
| schema = Schema( | ||
| NestedField(field_id=1, name="id", field_type=LongType(), required=False), | ||
| NestedField(field_id=2, name="val", field_type=StringType(), required=False), | ||
| ) | ||
|
|
||
| table_name = "test_truncate" | ||
| table = catalog.create_table( | ||
| identifier=f"{namespace}.{table_name}", | ||
| schema=schema, | ||
| location=f"s3://warehouse-rest/{namespace}.{table_name}", | ||
| partition_spec=PartitionSpec(), | ||
| ) | ||
|
|
||
| # 2. Populate Data | ||
| df = pa.Table.from_pylist([ | ||
| {"id": 1, "val": "A"}, | ||
| {"id": 2, "val": "B"}, | ||
| {"id": 3, "val": "C"}, | ||
| ]) | ||
| table.append(df) | ||
|
|
||
| # Validate data is in iceberg | ||
| assert len(table.scan().to_arrow()) == 3 | ||
|
|
||
| # 3. Setup ClickHouse Database | ||
| instance.query(f"DROP DATABASE IF EXISTS {namespace}") | ||
| instance.query( | ||
| f""" | ||
| CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}') | ||
| SETTINGS | ||
| catalog_type='rest', | ||
| warehouse='demo', | ||
| storage_endpoint='http://minio:9000/warehouse-rest'; | ||
| """, | ||
| settings={"allow_database_iceberg": 1} | ||
| ) | ||
|
|
||
| # 4. Formulate the ClickHouse Table Identifier | ||
| # MUST wrap the inner table name in backticks so ClickHouse parses the Iceberg namespace correctly | ||
| ch_table_identifier = f"`{namespace}.{table_name}`" | ||
|
|
||
| # Assert data from ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 3 | ||
|
|
||
| # 5. Truncate Table via ClickHouse | ||
| instance.query( | ||
| f"TRUNCATE TABLE {namespace}.{ch_table_identifier}", | ||
| settings={"allow_experimental_insert_into_iceberg": 1} | ||
| ) | ||
|
|
||
| # Assert truncated from ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0 | ||
|
|
||
| # 6. Cross-Engine Validation using PyIceberg | ||
| # Refresh table state to grab the new v<N>.metadata.json you generated | ||
| table.refresh() | ||
|
|
||
| # Assert PyIceberg reads the empty snapshot successfully | ||
| assert len(table.scan().to_arrow()) == 0 | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps it is a good idea to insert data again and check it can be read just to make sure we haven't broken anything?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Added in the latest commit — after asserting count() == 0, now append a new row via PyIceberg and verify ClickHouse reads it back as count() == 1. This confirms the table remains fully writable after truncation and that the metadata state is consistent. |
||
| # 7. Verify Writable State | ||
| # Append a new row to ensure truncation didn't break table state | ||
| new_df = pa.Table.from_pylist([ | ||
| {"id": 4, "val": "D"} | ||
| ]) | ||
| table.append(new_df) | ||
|
|
||
| # Assert new row count via ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 1 | ||
|
|
||
| # Cleanup | ||
| instance.query(f"DROP DATABASE {namespace}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: this type of documentation seems a bit weird to me. If there is a thing you want to explain, I would say just write down some text explaining it instead of loose references like "bug 1 fix".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure — will remove those debug-style references and replaced with plain explanatory prose.