Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
de68832
[WIP] (DONOTMERGE) Feature: Support TRUNCATE TABLE for Iceberg engine
Mar 16, 2026
601250f
Fix: Address code review feedback and drop stateless test
Mar 17, 2026
27a8467
Fix: Properly escape Iceberg namespace in DataLakeCatalog queries
Mar 18, 2026
84d76f1
Fix: Add missing S3 credentials to ClickHouse DataLakeCatalog settings
Mar 18, 2026
c81fc7f
Fix: Remove invalid DataLakeCatalog S3 settings
Mar 19, 2026
876577b
Fix: Restore minio_secret_key variable for DataLakeCatalog auth
Mar 19, 2026
cc1c7db
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 19, 2026
5a7e014
Fix: Dynamically determine Iceberg format version to prevent Avro seg…
Mar 19, 2026
2148724
feat(iceberg): Implement TRUNCATE TABLE for Iceberg Engine (REST cata…
Mar 20, 2026
4681e4e
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 20, 2026
12657bb
fix(iceberg): pass new_snapshot to updateMetadata in IcebergStorageSink
Mar 21, 2026
bd1d16e
fix(iceberg): restore return false in RestCatalog::updateMetadata
Mar 24, 2026
11810d8
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 25, 2026
2b66954
fix(iceberg): revert Mutations.cpp updateMetadata to pass new_snapshot
Mar 25, 2026
cf85627
refactor(iceberg): add comment explaining Avro zigzag encoding in gen…
Mar 26, 2026
e70e425
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 26, 2026
4cfc411
refactor(iceberg): address code review feedback on TRUNCATE implement…
Mar 27, 2026
47c8acc
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 27, 2026
2155e32
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable

virtual void modifyFormatSettings(FormatSettings &, const Context &) const {}

virtual bool supportsTruncate() const { return false; }
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName());
}

static constexpr bool supportsTotalRows() { return false; }
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
static constexpr bool supportsTotalBytes() { return false; }
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ DEFINE_ICEBERG_FIELD_ALIAS(partition_spec, partition-spec);
DEFINE_ICEBERG_FIELD_ALIAS(partition_specs, partition-specs);
DEFINE_ICEBERG_FIELD_ALIAS(spec_id, spec-id);
DEFINE_ICEBERG_FIELD_ALIAS(added_records, added-records);
DEFINE_ICEBERG_FIELD_ALIAS(deleted_records, deleted-records);
DEFINE_ICEBERG_FIELD_ALIAS(added_data_files, added-data-files);
DEFINE_ICEBERG_FIELD_ALIAS(deleted_data_files, deleted-data-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_delete_files, added-delete-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_position_delete_files, added-position-delete-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_position_deletes, added-position-deletes);
Expand Down
97 changes: 96 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
#include <Core/UUID.h>
#include <DataTypes/DataTypeSet.h>
#include <Formats/FormatFilterInfo.h>
#include <Formats/FormatParserSharedResources.h>
#include <Formats/ReadSchemaUtils.h>
#include <Functions/FunctionFactory.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/tuple.h>
#include <Processors/Formats/ISchemaReader.h>
Expand Down Expand Up @@ -104,6 +104,7 @@ extern const int NOT_IMPLEMENTED;
extern const int ICEBERG_SPECIFICATION_VIOLATION;
extern const int TABLE_ALREADY_EXISTS;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_DATA;
}

namespace Setting
Expand Down Expand Up @@ -610,6 +611,100 @@ void IcebergMetadata::mutate(
);
}

void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id)
{
if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value)
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Iceberg truncate is experimental. "
"To allow its usage, enable setting allow_experimental_insert_into_iceberg");

auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);

// Use -1 as the Iceberg spec sentinel for "no parent snapshot"
// (distinct from snapshot ID 0 which is a valid snapshot).
Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1);

auto config_path = persistent_components.table_path;
if (!config_path.starts_with('/')) config_path = '/' + config_path;
if (!config_path.ends_with('/')) config_path += "/";

bool is_transactional = (catalog != nullptr && catalog->isTransactional());

// Transactional catalogs (e.g. REST) require a fully-qualified blob URI
// (scheme://bucket/path) so the catalog can resolve the metadata location
// independently of any local path configuration. Non-transactional catalogs
// use bare paths relative to the object storage root.
FileNamesGenerator filename_generator;
if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata])
{
String location = metadata_object->getValue<String>(Iceberg::f_location);
if (!location.ends_with("/")) location += "/";
filename_generator = FileNamesGenerator(
location, config_path, is_transactional,
persistent_components.metadata_compression_method, write_format);
}
else
{
filename_generator = FileNamesGenerator(
config_path, config_path, false,
persistent_components.metadata_compression_method, write_format);
}

Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1;
filename_generator.setVersion(new_metadata_version);

auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName();

auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata(
filename_generator, metadata_name, parent_snapshot_id,
/* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0,
/* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0,
std::nullopt, std::nullopt, /*is_truncate=*/true);

auto write_settings = context->getWriteSettings();
auto buf = object_storage->writeObject(
StoredObject(storage_manifest_list_name),
WriteMode::Rewrite, std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE, write_settings);

generateManifestList(filename_generator, metadata_object, object_storage,
context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false);
buf->finalize();

String metadata_content = dumpMetadataObjectToString(metadata_object);
writeMessageToFile(metadata_content, storage_metadata_name, object_storage,
context, "*", "", persistent_components.metadata_compression_method);

if (catalog)
{
// Transactional catalogs require a fully-qualified blob URI so the catalog
// can resolve the metadata location independently of local path configuration.
String catalog_filename = metadata_name;
if (is_transactional)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in IcebergWrites the check is different, but leads to the same result. To the reader like me, it is not very clear why transactional needs a different URI.

I suggest one of the below:

  1. Use the same condition as IcebergWrites - that one is easier to understand because it is a path condition
  2. Add a comment explaining why transactional paths are different

{
// Build full URI from the table's location field (e.g. "s3://bucket/namespace.table")
// combined with the relative metadata name.
String location = metadata_object->getValue<String>(Iceberg::f_location);
if (!location.ends_with("/")) location += "/";
catalog_filename = location + metadata_name;
}

const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Failed to commit Iceberg truncate update to catalog.");
}
}

void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands)
{
for (const auto & command : commands)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class IcebergMetadata : public IDataLakeMetadata
bool supportsUpdate() const override { return true; }
bool supportsWrites() const override { return true; }
bool supportsParallelInsert() const override { return true; }
bool supportsTruncate() const override { return true; }

void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;

IcebergHistory getHistory(ContextPtr local_context) const;

Expand Down
57 changes: 57 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,31 @@ void generateManifestFile(
writer.close();
}

// Avro uses zigzag encoding for integers to efficiently represent small negative
// numbers. Positive n maps to 2n, negative n maps to 2(-n)-1, keeping small
// magnitudes compact regardless of sign. The value is then serialized as a
// variable-length base-128 integer (little-endian), where the high bit of each
// byte signals whether more bytes follow.
// See: https://avro.apache.org/docs/1.11.1/specification/#binary-encoding
static void writeAvroLong(WriteBuffer & out, int64_t val)
{
uint64_t n = (static_cast<uint64_t>(val) << 1) ^ static_cast<uint64_t>(val >> 63);
while (n & ~0x7fULL)
{
char c = static_cast<char>((n & 0x7f) | 0x80);
out.write(&c, 1);
n >>= 7;
}
char c = static_cast<char>(n);
out.write(&c, 1);
}

static void writeAvroBytes(WriteBuffer & out, const String & s)
{
writeAvroLong(out, static_cast<int64_t>(s.size()));
out.write(s.data(), s.size());
}

void generateManifestList(
const FileNamesGenerator & filename_generator,
Poco::JSON::Object::Ptr metadata,
Expand All @@ -451,6 +476,38 @@ void generateManifestList(
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version);

// For empty manifest list (e.g. TRUNCATE), write a valid Avro container
// file manually so we can embed the full schema JSON with field-ids intact,
// without triggering the DataFileWriter constructor's eager writeHeader()
// which commits encoder state before we can override avro.schema.
if (manifest_entry_names.empty() && !use_previous_snapshots)
{
// For an empty manifest list (e.g. after TRUNCATE), we write a minimal valid
// Avro Object Container File manually rather than using avro::DataFileWriter.
// The reason: DataFileWriter calls writeHeader() eagerly in its constructor,
// committing the binary encoder state. Post-construction setMetadata() calls
// corrupt StreamWriter::next_ causing a NULL dereference on close(). Writing
// the OCF header directly ensures the full schema JSON (with Iceberg field-ids)
// is embedded intact — the Avro C++ library strips unknown field properties
// like field-id during schema node serialization.
// Avro OCF format: [magic(4)] [metadata_map] [sync_marker(16)] [no data blocks]
buf.write("Obj\x01", 4);

writeAvroLong(buf, 2); // 2 metadata entries
writeAvroBytes(buf, "avro.codec");
writeAvroBytes(buf, "null");
writeAvroBytes(buf, "avro.schema");
writeAvroBytes(buf, schema_representation); // full JSON with field-ids intact

writeAvroLong(buf, 0); // end of metadata map

static const char sync_marker[16] = {};
buf.write(sync_marker, 16);

buf.finalize();
return;
}

auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT

auto adapter = std::make_unique<OutputStreamWriteBufferAdapter>(buf);
Expand Down
21 changes: 18 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(
Int32 added_delete_files,
Int32 num_deleted_rows,
std::optional<Int64> user_defined_snapshot_id,
std::optional<Int64> user_defined_timestamp)
std::optional<Int64> user_defined_timestamp,
bool is_truncate)
{
int format_version = metadata_object->getValue<Int32>(Iceberg::f_format_version);
Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object;
Expand All @@ -137,7 +138,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(

auto parent_snapshot = getParentSnapshot(parent_snapshot_id);
Poco::JSON::Object::Ptr summary = new Poco::JSON::Object;
if (num_deleted_rows == 0)
if (is_truncate)
{
summary->set(Iceberg::f_operation, Iceberg::f_overwrite);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't sum_with_parent_snapshot be used here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Abstraction

I DID try avro::DataFileWriter first and it's the organic one. But, unfortunately in the vendored Avro C++ version, writeHeader() is called eagerly in the constructor committing the binary encoder state. Calling setMetadata("avro.schema", ...) post-construction corrupts StreamWriter::next_ causing a NULL dereference on close(). I have a confirmed server segfault stack trace from this exact path. The manual OCF serialization exists because the abstraction is broken for this specific use case in this library version.

On sum_with_parent_snapshot

The is_truncate guard inside sum_with_parent_snapshot already correctly zeroes cumulative totals (total_records, total_data_files, etc.) — you're correct that we could lean on it there. However deleted_records and deleted_data_files are different: they need to be populated from the parent snapshot's values (not zero), since they represent what was removed. That's why they're set separately. Glad to add a comment making this distinction explicit.

Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_records)) : 0;
Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_data_files)) : 0;

summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records));
summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files));
}
else if (num_deleted_rows == 0)
{
summary->set(Iceberg::f_operation, Iceberg::f_append);
summary->set(Iceberg::f_added_data_files, std::to_string(added_files));
Expand All @@ -157,7 +167,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(

auto sum_with_parent_snapshot = [&](const char * field_name, Int32 snapshot_value)
{
Int32 prev_value = parent_snapshot ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
if (is_truncate)
{
summary->set(field_name, std::to_string(0));
return;
}
Int32 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
summary->set(field_name, std::to_string(prev_value + snapshot_value));
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class MetadataGenerator
Int32 added_delete_files,
Int32 num_deleted_rows,
std::optional<Int64> user_defined_snapshot_id = std::nullopt,
std::optional<Int64> user_defined_timestamp = std::nullopt);
std::optional<Int64> user_defined_timestamp = std::nullopt,
bool is_truncate = false);

void generateAddColumnMetadata(const String & column_name, DataTypePtr type);
void generateDropColumnMetadata(const String & column_name);
Expand Down
10 changes: 7 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans
void StorageObjectStorage::truncate(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
ContextPtr context,
TableExclusiveLockHolder & /* table_holder */)
{
const auto path = configuration->getRawPath();
Expand All @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate(

if (configuration->isDataLakeConfiguration())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Truncate is not supported for data lake engine");
auto * data_lake_metadata = getExternalMetadata(context);
if (!data_lake_metadata || !data_lake_metadata->supportsTruncate())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine");

data_lake_metadata->truncate(context, catalog, getStorageID());
return;
}

if (path.hasGlobs())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python3

from pyiceberg.catalog import load_catalog
from helpers.config_cluster import minio_secret_key, minio_access_key
import uuid
import pyarrow as pa
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import LongType, StringType
from pyiceberg.partitioning import PartitionSpec

BASE_URL_LOCAL_RAW = "http://localhost:8182"
CATALOG_NAME = "demo"

def load_catalog_impl(started_cluster):
return load_catalog(
CATALOG_NAME,
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000",
"s3.access-key-id": minio_access_key,
"s3.secret-access-key": minio_secret_key,
},
)


def test_iceberg_truncate_restart(started_cluster_iceberg_no_spark):
instance = started_cluster_iceberg_no_spark.instances["node1"]
catalog = load_catalog_impl(started_cluster_iceberg_no_spark)

namespace = f"clickhouse_truncate_restart_{uuid.uuid4().hex}"
catalog.create_namespace(namespace)

schema = Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
NestedField(field_id=2, name="val", field_type=StringType(), required=False),
)
table_name = "test_truncate_restart"
catalog.create_table(
identifier=f"{namespace}.{table_name}",
schema=schema,
location=f"s3://warehouse-rest/{namespace}.{table_name}",
partition_spec=PartitionSpec(),
)

ch_table_identifier = f"`{namespace}.{table_name}`"

instance.query(f"DROP DATABASE IF EXISTS {namespace}")
instance.query(
f"""
CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}')
SETTINGS
catalog_type='rest',
warehouse='demo',
storage_endpoint='http://minio:9000/warehouse-rest';
""",
settings={"allow_database_iceberg": 1}
)

# 1. Insert initial data and truncate
df = pa.Table.from_pylist([{"id": 1, "val": "A"}, {"id": 2, "val": "B"}])
catalog.load_table(f"{namespace}.{table_name}").append(df)

assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 2

instance.query(
f"TRUNCATE TABLE {namespace}.{ch_table_identifier}",
settings={"allow_experimental_insert_into_iceberg": 1}
)
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0

# 2. Restart ClickHouse and verify table is still readable (count = 0)
instance.restart_clickhouse()
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0

# 3. Insert new data after restart and verify it's readable
new_df = pa.Table.from_pylist([{"id": 3, "val": "C"}])
catalog.load_table(f"{namespace}.{table_name}").append(new_df)
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 1

instance.query(f"DROP DATABASE {namespace}")
Loading