Skip to content

Commit 59a3428

Browse files
committed
use format version when serde datafiles
1 parent d2168f2 commit 59a3428

File tree

4 files changed

+21
-15
lines changed

4 files changed

+21
-15
lines changed

crates/iceberg/src/spec/manifest/_serde.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize};
2121
use serde_with::serde_as;
2222

2323
use super::{Datum, ManifestEntry, Schema, Struct};
24-
use crate::spec::{Literal, RawLiteral, StructType, Type};
24+
use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
2525
use crate::{Error, ErrorKind};
2626

2727
#[derive(Serialize, Deserialize)]
@@ -40,7 +40,7 @@ impl ManifestEntryV2 {
4040
snapshot_id: value.snapshot_id,
4141
sequence_number: value.sequence_number,
4242
file_sequence_number: value.file_sequence_number,
43-
data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?,
43+
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
4444
})
4545
}
4646

@@ -74,7 +74,7 @@ impl ManifestEntryV1 {
7474
Ok(Self {
7575
status: value.status as i32,
7676
snapshot_id: value.snapshot_id.unwrap_or_default(),
77-
data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?,
77+
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
7878
})
7979
}
8080

@@ -129,9 +129,13 @@ impl DataFileSerde {
129129
pub fn try_from(
130130
value: super::DataFile,
131131
partition_type: &StructType,
132-
is_version_1: bool,
132+
format_version: FormatVersion,
133133
) -> Result<Self, Error> {
134-
let block_size_in_bytes = if is_version_1 { Some(0) } else { None };
134+
let block_size_in_bytes = if format_version == FormatVersion::V1 {
135+
Some(0)
136+
} else {
137+
None
138+
};
135139
Ok(Self {
136140
content: value.content as i32,
137141
file_path: value.file_path,

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,12 @@ pub fn write_data_files_to_avro<W: Write>(
297297
let mut writer = AvroWriter::new(&avro_schema, writer);
298298

299299
for data_file in data_files {
300-
let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)?
301-
.resolve(&avro_schema)?;
300+
let value = to_value(DataFileSerde::try_from(
301+
data_file,
302+
partition_type,
303+
FormatVersion::V1,
304+
)?)?
305+
.resolve(&avro_schema)?;
302306
writer.append(value)?;
303307
}
304308

crates/iceberg/src/spec/manifest/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ impl Manifest {
123123
pub fn serialize_data_file_to_json(
124124
data_file: DataFile,
125125
partition_type: &super::StructType,
126-
is_version_1: bool,
126+
format_version: FormatVersion,
127127
) -> Result<String> {
128-
let serde = _serde::DataFileSerde::try_from(data_file, partition_type, is_version_1)?;
128+
let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
129129
serde_json::to_string(&serde).map_err(|e| {
130130
Error::new(
131131
ErrorKind::DataInvalid,
@@ -1120,9 +1120,6 @@ mod tests {
11201120
// Get partition type from the partition spec
11211121
let partition_type = partition_spec.partition_type(&schema).unwrap();
11221122

1123-
// Set version flag
1124-
let is_version_1 = false;
1125-
11261123
// Create a vector of DataFile objects
11271124
let data_files = vec![
11281125
DataFileBuilder::default()
@@ -1157,7 +1154,8 @@ mod tests {
11571154
let serialized_files = data_files
11581155
.into_iter()
11591156
.map(|f| {
1160-
let json = serialize_data_file_to_json(f, &partition_type, is_version_1).unwrap();
1157+
let json =
1158+
serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap();
11611159
println!("Test serialized data file: {}", json);
11621160
json
11631161
})

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl ExecutionPlan for IcebergWriteExec {
174174
// todo non-default partition spec?
175175
let spec_id = self.table.metadata().default_partition_spec_id();
176176
let partition_type = self.table.metadata().default_partition_type().clone();
177-
let is_version_1 = self.table.metadata().format_version() == FormatVersion::V1;
177+
let format_version = self.table.metadata().format_version();
178178

179179
// Check data file format
180180
let file_format = DataFileFormat::from_str(
@@ -245,7 +245,7 @@ impl ExecutionPlan for IcebergWriteExec {
245245
.map(|data_file| -> DFResult<String> {
246246
// Serialize to JSON
247247
let json =
248-
serialize_data_file_to_json(data_file, &partition_type, is_version_1)
248+
serialize_data_file_to_json(data_file, &partition_type, format_version)
249249
.map_err(to_datafusion_error)?;
250250

251251
println!("Serialized data file: {}", json); // todo remove log

0 commit comments

Comments
 (0)