Skip to content

Commit 1426f29

Browse files
committed
Merge branch 'main' into hjiang/delete-column-implementation
2 parents 477519e + 17e4351 commit 1426f29

File tree

9 files changed

+1127
-13
lines changed

9 files changed

+1127
-13
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ impl RestCatalog {
320320
None => None,
321321
};
322322

323-
let file_io = match warehouse_path.or(metadata_location) {
323+
let file_io = match metadata_location.or(warehouse_path) {
324324
Some(url) => FileIO::from_path(url)?
325325
.with_props(props)
326326
.with_extensions(self.file_io_extensions.clone())

crates/iceberg/src/delete_vector.rs

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ use roaring::RoaringTreemap;
2121
use roaring::bitmap::Iter;
2222
use roaring::treemap::BitmapIter;
2323

24+
use crate::{Error, ErrorKind, Result};
25+
2426
#[derive(Debug, Default)]
2527
pub struct DeleteVector {
2628
inner: RoaringTreemap,
2729
}
2830

2931
impl DeleteVector {
3032
#[allow(unused)]
31-
pub(crate) fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
33+
pub fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
3234
DeleteVector {
3335
inner: roaring_treemap,
3436
}
@@ -43,6 +45,25 @@ impl DeleteVector {
4345
self.inner.insert(pos)
4446
}
4547

48+
/// Marks the given `positions` as deleted and returns the number of elements appended.
49+
///
50+
/// The input slice must be strictly ordered in ascending order, and every value must be greater than all existing values already in the set.
51+
///
52+
/// # Errors
53+
///
54+
/// Returns an error if the precondition is not met.
55+
#[allow(dead_code)]
56+
pub fn insert_positions(&mut self, positions: &[u64]) -> Result<usize> {
57+
if let Err(err) = self.inner.append(positions.iter().copied()) {
58+
return Err(Error::new(
59+
ErrorKind::PreconditionFailed,
60+
"failed to marks rows as deleted".to_string(),
61+
)
62+
.with_source(err));
63+
}
64+
Ok(positions.len())
65+
}
66+
4667
#[allow(unused)]
4768
pub fn len(&self) -> u64 {
4869
self.inner.len()
@@ -120,3 +141,61 @@ impl BitOrAssign for DeleteVector {
120141
self.inner.bitor_assign(&other.inner);
121142
}
122143
}
144+
145+
#[cfg(test)]
146+
mod tests {
147+
use super::*;
148+
149+
#[test]
150+
fn test_insertion_and_iteration() {
151+
let mut dv = DeleteVector::default();
152+
assert!(dv.insert(42));
153+
assert!(dv.insert(100));
154+
assert!(!dv.insert(42));
155+
156+
let mut items: Vec<u64> = dv.iter().collect();
157+
items.sort();
158+
assert_eq!(items, vec![42, 100]);
159+
assert_eq!(dv.len(), 2);
160+
}
161+
162+
#[test]
163+
fn test_successful_insert_positions() {
164+
let mut dv = DeleteVector::default();
165+
let positions = vec![1, 2, 3, 1000, 1 << 33];
166+
assert_eq!(dv.insert_positions(&positions).unwrap(), 5);
167+
168+
let mut collected: Vec<u64> = dv.iter().collect();
169+
collected.sort();
170+
assert_eq!(collected, positions);
171+
}
172+
173+
/// Testing scenario: bulk insertion fails because input positions are not strictly increasing.
174+
#[test]
175+
fn test_failed_insertion_unsorted_elements() {
176+
let mut dv = DeleteVector::default();
177+
let positions = vec![1, 3, 5, 4];
178+
let res = dv.insert_positions(&positions);
179+
assert!(res.is_err());
180+
}
181+
182+
/// Testing scenario: bulk insertion fails because input positions have intersection with existing ones.
183+
#[test]
184+
fn test_failed_insertion_with_intersection() {
185+
let mut dv = DeleteVector::default();
186+
let positions = vec![1, 3, 5];
187+
assert_eq!(dv.insert_positions(&positions).unwrap(), 3);
188+
189+
let res = dv.insert_positions(&[2, 4]);
190+
assert!(res.is_err());
191+
}
192+
193+
/// Testing scenario: bulk insertion fails because input positions have duplicates.
194+
#[test]
195+
fn test_failed_insertion_duplicate_elements() {
196+
let mut dv = DeleteVector::default();
197+
let positions = vec![1, 3, 5, 5];
198+
let res = dv.insert_positions(&positions);
199+
assert!(res.is_err());
200+
}
201+
}

crates/iceberg/src/spec/manifest/_serde.rs

Lines changed: 168 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize};
2121
use serde_with::serde_as;
2222

2323
use super::{Datum, ManifestEntry, Schema, Struct};
24-
use crate::spec::{Literal, RawLiteral, StructType, Type};
24+
use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
2525
use crate::{Error, ErrorKind};
2626

2727
#[derive(Serialize, Deserialize)]
@@ -40,7 +40,7 @@ impl ManifestEntryV2 {
4040
snapshot_id: value.snapshot_id,
4141
sequence_number: value.sequence_number,
4242
file_sequence_number: value.file_sequence_number,
43-
data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?,
43+
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
4444
})
4545
}
4646

@@ -74,7 +74,7 @@ impl ManifestEntryV1 {
7474
Ok(Self {
7575
status: value.status as i32,
7676
snapshot_id: value.snapshot_id.unwrap_or_default(),
77-
data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?,
77+
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
7878
})
7979
}
8080

@@ -129,9 +129,13 @@ impl DataFileSerde {
129129
pub fn try_from(
130130
value: super::DataFile,
131131
partition_type: &StructType,
132-
is_version_1: bool,
132+
format_version: FormatVersion,
133133
) -> Result<Self, Error> {
134-
let block_size_in_bytes = if is_version_1 { Some(0) } else { None };
134+
let block_size_in_bytes = if format_version == FormatVersion::V1 {
135+
Some(0)
136+
} else {
137+
None
138+
};
135139
Ok(Self {
136140
content: value.content as i32,
137141
file_path: value.file_path,
@@ -292,16 +296,23 @@ fn parse_i64_entry(v: Vec<I64Entry>) -> Result<HashMap<i32, u64>, Error> {
292296
Ok(m)
293297
}
294298

299+
#[allow(unused_mut)]
295300
fn to_i64_entry(entries: HashMap<i32, u64>) -> Result<Vec<I64Entry>, Error> {
296-
entries
301+
let mut i64_entries = entries
297302
.iter()
298303
.map(|e| {
299304
Ok(I64Entry {
300305
key: *e.0,
301306
value: (*e.1).try_into()?,
302307
})
303308
})
304-
.collect()
309+
.collect::<Result<Vec<_>, Error>>()?;
310+
311+
// Ensure that the order is deterministic during testing
312+
#[cfg(test)]
313+
i64_entries.sort_by_key(|e| e.key);
314+
315+
Ok(i64_entries)
305316
}
306317

307318
#[cfg(test)]
@@ -432,4 +443,154 @@ mod tests {
432443

433444
assert_eq!(actual_data_file[0].content, DataContentType::Data)
434445
}
446+
447+
#[test]
448+
fn test_manifest_entry_v1_to_v2_projection() {
449+
use crate::spec::manifest::_serde::{DataFileSerde, ManifestEntryV1};
450+
use crate::spec::{Literal, RawLiteral, Struct, StructType};
451+
452+
let partition = RawLiteral::try_from(
453+
Literal::Struct(Struct::empty()),
454+
&Type::Struct(StructType::new(vec![])),
455+
)
456+
.unwrap();
457+
458+
// Create a V1 manifest entry struct (lacks V2 sequence number fields)
459+
let v1_entry = ManifestEntryV1 {
460+
status: 1, // Added
461+
snapshot_id: 12345,
462+
data_file: DataFileSerde {
463+
content: 0, // DataFileSerde is shared between V1/V2
464+
file_path: "test/path.parquet".to_string(),
465+
file_format: "PARQUET".to_string(),
466+
partition,
467+
record_count: 100,
468+
file_size_in_bytes: 1024,
469+
block_size_in_bytes: Some(0), // V1 includes this field
470+
column_sizes: None,
471+
value_counts: None,
472+
null_value_counts: None,
473+
nan_value_counts: None,
474+
lower_bounds: None,
475+
upper_bounds: None,
476+
key_metadata: None,
477+
split_offsets: None,
478+
equality_ids: None, // Will be converted to empty vec
479+
sort_order_id: None,
480+
first_row_id: None,
481+
referenced_data_file: None,
482+
content_offset: None,
483+
content_size_in_bytes: None,
484+
},
485+
};
486+
487+
// Test the explicit V1→V2 conversion logic in ManifestEntryV1::try_into()
488+
let v2_entry = v1_entry
489+
.try_into(
490+
0, // partition_spec_id
491+
&StructType::new(vec![]),
492+
&schema(),
493+
)
494+
.unwrap();
495+
496+
// Verify that V1→V2 conversion adds the missing V2 sequence number fields
497+
assert_eq!(
498+
v2_entry.sequence_number,
499+
Some(0),
500+
"ManifestEntryV1::try_into() should set sequence_number to 0"
501+
);
502+
assert_eq!(
503+
v2_entry.file_sequence_number,
504+
Some(0),
505+
"ManifestEntryV1::try_into() should set file_sequence_number to 0"
506+
);
507+
assert_eq!(
508+
v2_entry.snapshot_id,
509+
Some(12345),
510+
"snapshot_id should be preserved during conversion"
511+
);
512+
513+
// Verify that DataFileSerde conversion applies V2 defaults
514+
assert_eq!(
515+
v2_entry.data_file.content,
516+
DataContentType::Data,
517+
"DataFileSerde should convert content 0 to DataContentType::Data"
518+
);
519+
assert_eq!(
520+
v2_entry.data_file.equality_ids,
521+
Vec::<i32>::new(),
522+
"DataFileSerde should convert None equality_ids to empty vec"
523+
);
524+
525+
// Verify other fields are preserved during conversion
526+
assert_eq!(v2_entry.data_file.file_path, "test/path.parquet");
527+
assert_eq!(v2_entry.data_file.record_count, 100);
528+
assert_eq!(v2_entry.data_file.file_size_in_bytes, 1024);
529+
}
530+
531+
#[test]
532+
fn test_data_file_serde_v1_field_defaults() {
533+
use crate::spec::manifest::_serde::DataFileSerde;
534+
use crate::spec::{Literal, RawLiteral, Struct, StructType};
535+
536+
let partition = RawLiteral::try_from(
537+
Literal::Struct(Struct::empty()),
538+
&Type::Struct(StructType::new(vec![])),
539+
)
540+
.unwrap();
541+
542+
// Create a DataFileSerde that simulates V1 deserialization behavior
543+
// (missing V2 fields would be None due to #[serde(default)])
544+
let v1_style_data_file = DataFileSerde {
545+
content: 0, // V1 doesn't have this field, defaults to 0 via #[serde(default)]
546+
file_path: "test/data.parquet".to_string(),
547+
file_format: "PARQUET".to_string(),
548+
partition,
549+
record_count: 500,
550+
file_size_in_bytes: 2048,
551+
block_size_in_bytes: Some(1024), // V1 includes this field, V2 skips it
552+
column_sizes: None,
553+
value_counts: None,
554+
null_value_counts: None,
555+
nan_value_counts: None,
556+
lower_bounds: None,
557+
upper_bounds: None,
558+
key_metadata: None,
559+
split_offsets: None,
560+
equality_ids: None, // V1 doesn't have this field, defaults to None via #[serde(default)]
561+
sort_order_id: None,
562+
first_row_id: None,
563+
referenced_data_file: None,
564+
content_offset: None,
565+
content_size_in_bytes: None,
566+
};
567+
568+
// Test the DataFileSerde::try_into() conversion that handles V1 field defaults
569+
let data_file = v1_style_data_file
570+
.try_into(
571+
0, // partition_spec_id
572+
&StructType::new(vec![]),
573+
&schema(),
574+
)
575+
.unwrap();
576+
577+
// Verify that DataFileSerde::try_into() applies correct defaults for missing V2 fields
578+
assert_eq!(
579+
data_file.content,
580+
DataContentType::Data,
581+
"content 0 should convert to DataContentType::Data"
582+
);
583+
assert_eq!(
584+
data_file.equality_ids,
585+
Vec::<i32>::new(),
586+
"None equality_ids should convert to empty vec via unwrap_or_default()"
587+
);
588+
589+
// Verify other fields are handled correctly during conversion
590+
assert_eq!(data_file.file_path, "test/data.parquet");
591+
assert_eq!(data_file.file_format, DataFileFormat::Parquet);
592+
assert_eq!(data_file.record_count, 500);
593+
assert_eq!(data_file.file_size_in_bytes, 2048);
594+
assert_eq!(data_file.partition_spec_id, 0);
595+
}
435596
}

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,12 @@ pub fn write_data_files_to_avro<W: Write>(
297297
let mut writer = AvroWriter::new(&avro_schema, writer);
298298

299299
for data_file in data_files {
300-
let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)?
301-
.resolve(&avro_schema)?;
300+
let value = to_value(DataFileSerde::try_from(
301+
data_file,
302+
partition_type,
303+
FormatVersion::V1,
304+
)?)?
305+
.resolve(&avro_schema)?;
302306
writer.append(value)?;
303307
}
304308

@@ -333,9 +337,10 @@ pub fn read_data_files_from_avro<R: Read>(
333337

334338
/// Type of content stored by the data file: data, equality deletes, or
335339
/// position deletes (all v1 files are data files)
336-
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
340+
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
337341
pub enum DataContentType {
338342
/// value: 0
343+
#[default]
339344
Data = 0,
340345
/// value: 1
341346
PositionDeletes = 1,
@@ -399,3 +404,17 @@ impl std::fmt::Display for DataFileFormat {
399404
}
400405
}
401406
}
407+
408+
#[cfg(test)]
409+
mod test {
410+
use crate::spec::DataContentType;
411+
#[test]
412+
fn test_data_content_type_default() {
413+
assert_eq!(DataContentType::default(), DataContentType::Data);
414+
}
415+
416+
#[test]
417+
fn test_data_content_type_default_value() {
418+
assert_eq!(DataContentType::default() as i32, 0);
419+
}
420+
}

0 commit comments

Comments
 (0)