Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ad87946
feat: support incremental scan between 2 snapshots (#13)
xxchan Mar 18, 2025
2d07dcd
expose data file serialized
chenzl25 Nov 11, 2025
f040f26
support set snapshot id for fast append
chenzl25 Nov 11, 2025
8c60928
feat(iceberg): introduce remove snapshot action
Li0k Sep 26, 2025
6d4339e
feat: support append delete file
Li0k Aug 18, 2025
6519c98
feat: support merge append
Li0k Nov 5, 2025
3201d8c
feat: add process delete enrty in snapshot produce
Li0k Nov 5, 2025
cee1fa7
fix: cherry-pick #27
Li0k Nov 5, 2025
494ca90
chore: pick public function generate_unique_snapshot_id for exactly o…
Li0k Nov 5, 2025
a2b6cc4
feat(iceberg): rewrite files action (#47) (#86)
Li0k Nov 5, 2025
c06158b
fix: fix plan files with deletes (#61)
chenzl25 Jul 17, 2025
ef44e88
feat: optimize plan files memory consumption (#64)
chenzl25 Jul 24, 2025
fd23d8e
azblob
chenzl25 Nov 11, 2025
ccb48c3
fix(iceberg): Introduce new data sequence for RewriteFilesAction (#51)
Li0k May 23, 2025
d5efb44
feat: exposed file_size_in_bytes (#46)
xxhZs May 30, 2025
64e7e21
fix(iceberg): fix rewrite-files partition-spec-id (#54)
Li0k May 30, 2025
89808e4
Feature: Optionally configure consistent chunk sizes for multi-part u…
nagraham Jun 27, 2025
118fab3
feat: support write to branch (#62)
Li0k Jul 22, 2025
75eb48d
feat: support overwrite files action (#63)
Li0k Jul 24, 2025
80d553d
feat: manifest filter manager (#72)
Li0k Oct 14, 2025
cf4fdd0
fix(azdls): enable append mode for AZDLS write operations (#89)
Li0k Oct 31, 2025
cb4753a
feat: check file existence (#92)
Li0k Nov 10, 2025
bda9a37
enable ci
chenzl25 Nov 13, 2025
9374588
fmt
chenzl25 Nov 13, 2025
3963135
feat: support position delete writer (#95)
chenzl25 Nov 27, 2025
7026984
feat: expose task writer & support delta writer (#97)
Li0k Dec 15, 2025
20fb0cf
fix: fix record to struct deserialization (#102)
chenzl25 Dec 15, 2025
fc3867d
public upper bounds (#103)
wcy-fdu Dec 16, 2025
1560244
public schema change needed api (#104)
wcy-fdu Dec 16, 2025
54498bc
feat: add update_schema action for Transaction (#105)
wcy-fdu Dec 17, 2025
93be8e6
chore: add public access of object cache (#106)
yezizp2012 Jan 6, 2026
4bf15b9
fix(spec): Include delete file content to V3 manifest (#1979)
CTTY Jan 5, 2026
15e43f4
feat(reader): Add PartitionSpec support to FileScanTask and RecordBat…
nagraham Jan 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ on:
push:
branches:
- main
- dev_rebase_main_20251111
pull_request:
branches:
- dev_rebase_main_20251111
paths:
- '**' # Include all files and directories in the repository by default.
- '!.github/ISSUE_TEMPLATE/**' # Exclude files and directories that don't impact tests or code like templates, metadata, and documentation.
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ arrow-array = "56.2"
arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-ord = "56.2"
arrow-row = "56.2"
arrow-schema = "56.2"
arrow-select = "56.2"
arrow-string = "56.2"
Expand Down
5 changes: 4 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ repository = { workspace = true }

[features]
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-azblob"]

storage-azdls = ["opendal/services-azdls"]
storage-fs = ["opendal/services-fs"]
storage-gcs = ["opendal/services-gcs"]
storage-memory = ["opendal/services-memory"]
storage-oss = ["opendal/services-oss"]
storage-s3 = ["opendal/services-s3", "reqsign"]
storage-azblob = ["opendal/services-azblob"]

smol = ["dep:smol"]
tokio = ["tokio/rt-multi-thread"]
Expand All @@ -51,6 +52,7 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
Expand Down Expand Up @@ -89,6 +91,7 @@ smol = { workspace = true, optional = true }
strum = { workspace = true, features = ["derive"] }
thrift = { workspace = true }
tokio = { workspace = true, optional = false, features = ["sync"] }
tracing = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
Expand Down
64 changes: 55 additions & 9 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
use crate::expr::{Predicate, Reference};
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::scan::{ArrowRecordBatchStream, FileScanTask};
use crate::spec::{
DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor,
PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type,
Expand Down Expand Up @@ -138,7 +138,7 @@ impl CachingDeleteFileLoader {
/// ```
pub(crate) fn load_deletes(
&self,
delete_file_entries: &[FileScanTaskDeleteFile],
delete_file_entries: &[Arc<FileScanTask>],
schema: SchemaRef,
) -> Receiver<Result<DeleteFilter>> {
let (tx, rx) = channel();
Expand Down Expand Up @@ -204,32 +204,32 @@ impl CachingDeleteFileLoader {
}

async fn load_file_for_task(
task: &FileScanTaskDeleteFile,
task: &FileScanTask,
basic_delete_file_loader: BasicDeleteFileLoader,
del_filter: DeleteFilter,
schema: SchemaRef,
) -> Result<DeleteFileContext> {
match task.file_type {
match task.data_file_content {
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.parquet_to_batch_stream(task.data_file_path())
.await?,
)),

DataContentType::EqualityDeletes => {
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
let Some(notify) = del_filter.try_start_eq_del_load(task.data_file_path()) else {
return Ok(DeleteFileContext::ExistingEqDel);
};

let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);
del_filter.insert_equality_delete(task.data_file_path(), receiver);

// Per the Iceberg spec, evolve schema for equality deletes but only for the
// equality_ids columns, not all table columns.
let equality_ids_vec = task.equality_ids.clone().unwrap();
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.parquet_to_batch_stream(task.data_file_path())
.await?,
schema,
&equality_ids_vec,
Expand Down Expand Up @@ -552,6 +552,7 @@ mod tests {

use super::*;
use crate::arrow::delete_filter::tests::setup;
use crate::scan::FileScanTaskDeleteFile;

#[tokio::test]
async fn test_delete_file_loader_parse_equality_deletes() {
Expand Down Expand Up @@ -871,6 +872,44 @@ mod tests {
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
};

let pos_del_scan_task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: pos_del.file_path,
data_file_content: DataContentType::PositionDeletes,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![],
sequence_number: 0,
equality_ids: None,
file_size_in_bytes: 0,
partition: None,
partition_spec: None,
name_mapping: None,
};

let eq_del_scan_task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: eq_del.file_path.clone(),
data_file_content: DataContentType::EqualityDeletes,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![],
sequence_number: 0,
equality_ids: eq_del.equality_ids.clone(),
file_size_in_bytes: 0,
partition: None,
partition_spec: None,
name_mapping: None,
};

let file_scan_task = FileScanTask {
start: 0,
length: 0,
Expand All @@ -880,7 +919,14 @@ mod tests {
schema: data_file_schema.clone(),
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
deletes: vec![pos_del_scan_task.into(), eq_del_scan_task.into()],
sequence_number: 0,
data_file_content: DataContentType::Data,
equality_ids: None,
file_size_in_bytes: 0,
partition: None,
partition_spec: None,
name_mapping: None,
};

// Load the deletes - should handle both types without error
Expand Down
17 changes: 11 additions & 6 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};

use crate::arrow::ArrowReader;
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{Schema, SchemaRef};
Expand Down Expand Up @@ -82,7 +82,7 @@ impl BasicDeleteFileLoader {
equality_ids: &[i32],
) -> Result<ArrowRecordBatchStream> {
let mut record_batch_transformer =
RecordBatchTransformer::build(target_schema.clone(), equality_ids);
RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();

let record_batch_stream = record_batch_stream.map(move |record_batch| {
record_batch.and_then(|record_batch| {
Expand Down Expand Up @@ -120,6 +120,7 @@ mod tests {

use super::*;
use crate::arrow::delete_filter::tests::setup;
use crate::spec::DataContentType;

#[tokio::test]
async fn test_basic_delete_file_loader_read_delete_file() {
Expand All @@ -134,11 +135,15 @@ mod tests {

let file_scan_tasks = setup(table_location);

let delete_task = FileScanTaskDeleteFile {
file_path: file_scan_tasks[0].deletes[0].data_file_path.clone(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
};

let result = delete_file_loader
.read_delete_file(
&file_scan_tasks[0].deletes[0],
file_scan_tasks[0].schema_ref(),
)
.read_delete_file(&delete_task, file_scan_tasks[0].schema_ref())
.await
.unwrap();

Expand Down
63 changes: 38 additions & 25 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::sync::oneshot::Receiver;
use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::scan::{FileScanTask, FileScanTaskDeleteFile};
use crate::scan::FileScanTask;
use crate::spec::DataContentType;
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -122,14 +122,14 @@ impl DeleteFilter {
}

let Some(predicate) = self
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
.get_equality_delete_predicate_for_delete_file_path(delete.data_file_path())
.await
else {
return Err(Error::new(
ErrorKind::Unexpected,
format!(
"Missing predicate for equality delete file '{}'",
delete.file_path
delete.data_file_path()
),
));
};
Expand Down Expand Up @@ -192,8 +192,8 @@ impl DeleteFilter {
}
}

pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
matches!(f.file_type, DataContentType::EqualityDeletes)
pub(crate) fn is_equality_delete(f: &FileScanTask) -> bool {
matches!(f.data_file_content, DataContentType::EqualityDeletes)
}

#[cfg(test)]
Expand Down Expand Up @@ -309,25 +309,24 @@ pub(crate) mod tests {
writer.close().unwrap();
}

let pos_del_1 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
};

let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
};

let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
// Helper to build a positional delete task with minimal fields
let make_pos_del_task = |n: u8| FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/pos-del-{}.parquet", table_location.to_str().unwrap(), n),
data_file_content: DataContentType::PositionDeletes,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![],
sequence_number: 0,
equality_ids: None,
file_size_in_bytes: 0,
partition: None,
partition_spec: None,
name_mapping: None,
};

let file_scan_tasks = vec![
Expand All @@ -336,22 +335,36 @@ pub(crate) mod tests {
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
deletes: vec![make_pos_del_task(1).into(), make_pos_del_task(2).into()],
sequence_number: 0,
equality_ids: None,
file_size_in_bytes: 0,
partition: None,
partition_spec: None,
name_mapping: None,
},
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
deletes: vec![make_pos_del_task(3).into()],
sequence_number: 0,
equality_ids: None,
file_size_in_bytes: 0,
partition: None,
partition_spec: None,
name_mapping: None,
},
];

Expand Down
Loading
Loading