Skip to content

Commit 1faac00

Browse files
authored
Merge branch 'main' into ctty/name-pos
2 parents d935957 + e7160df commit 1faac00

File tree

9 files changed

+237
-83
lines changed

9 files changed

+237
-83
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pretty_assertions = "1.4"
9898
rand = "0.8.5"
9999
regex = "1.10.5"
100100
reqwest = { version = "0.12.12", default-features = false, features = ["json"] }
101-
roaring = { version = "0.10.12" }
101+
roaring = { version = "0.11" }
102102
rust_decimal = "1.37.1"
103103
serde = { version = "1.0.210", features = ["rc"] }
104104
serde_bytes = "0.11.15"

crates/iceberg/src/io/file_io.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,17 @@ impl FileWrite for opendal::Writer {
390390
}
391391
}
392392

393+
#[async_trait::async_trait]
394+
impl FileWrite for Box<dyn FileWrite> {
395+
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
396+
self.as_mut().write(bs).await
397+
}
398+
399+
async fn close(&mut self) -> crate::Result<()> {
400+
self.as_mut().close().await
401+
}
402+
}
403+
393404
/// Output file is used for writing to files..
394405
#[derive(Debug)]
395406
pub struct OutputFile {

crates/iceberg/src/spec/manifest/_serde.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize};
2121
use serde_with::serde_as;
2222

2323
use super::{Datum, ManifestEntry, Schema, Struct};
24-
use crate::spec::{Literal, RawLiteral, StructType, Type};
24+
use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
2525
use crate::{Error, ErrorKind};
2626

2727
#[derive(Serialize, Deserialize)]
@@ -40,7 +40,7 @@ impl ManifestEntryV2 {
4040
snapshot_id: value.snapshot_id,
4141
sequence_number: value.sequence_number,
4242
file_sequence_number: value.file_sequence_number,
43-
data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?,
43+
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
4444
})
4545
}
4646

@@ -74,7 +74,7 @@ impl ManifestEntryV1 {
7474
Ok(Self {
7575
status: value.status as i32,
7676
snapshot_id: value.snapshot_id.unwrap_or_default(),
77-
data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?,
77+
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
7878
})
7979
}
8080

@@ -129,9 +129,13 @@ impl DataFileSerde {
129129
pub fn try_from(
130130
value: super::DataFile,
131131
partition_type: &StructType,
132-
is_version_1: bool,
132+
format_version: FormatVersion,
133133
) -> Result<Self, Error> {
134-
let block_size_in_bytes = if is_version_1 { Some(0) } else { None };
134+
let block_size_in_bytes = if format_version == FormatVersion::V1 {
135+
Some(0)
136+
} else {
137+
None
138+
};
135139
Ok(Self {
136140
content: value.content as i32,
137141
file_path: value.file_path,
@@ -292,16 +296,23 @@ fn parse_i64_entry(v: Vec<I64Entry>) -> Result<HashMap<i32, u64>, Error> {
292296
Ok(m)
293297
}
294298

299+
#[allow(unused_mut)]
295300
fn to_i64_entry(entries: HashMap<i32, u64>) -> Result<Vec<I64Entry>, Error> {
296-
entries
301+
let mut i64_entries = entries
297302
.iter()
298303
.map(|e| {
299304
Ok(I64Entry {
300305
key: *e.0,
301306
value: (*e.1).try_into()?,
302307
})
303308
})
304-
.collect()
309+
.collect::<Result<Vec<_>, Error>>()?;
310+
311+
// Ensure that the order is deterministic during testing
312+
#[cfg(test)]
313+
i64_entries.sort_by_key(|e| e.key);
314+
315+
Ok(i64_entries)
305316
}
306317

307318
#[cfg(test)]

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,12 @@ pub fn write_data_files_to_avro<W: Write>(
297297
let mut writer = AvroWriter::new(&avro_schema, writer);
298298

299299
for data_file in data_files {
300-
let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)?
301-
.resolve(&avro_schema)?;
300+
let value = to_value(DataFileSerde::try_from(
301+
data_file,
302+
partition_type,
303+
FormatVersion::V1,
304+
)?)?
305+
.resolve(&avro_schema)?;
302306
writer.append(value)?;
303307
}
304308

crates/iceberg/src/spec/manifest/mod.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use super::{
3434
UNASSIGNED_SEQUENCE_NUMBER,
3535
};
3636
use crate::error::Result;
37+
use crate::{Error, ErrorKind};
3738

3839
/// A manifest contains metadata and a list of entries.
3940
#[derive(Debug, PartialEq, Eq, Clone)]
@@ -119,12 +120,47 @@ impl Manifest {
119120
}
120121
}
121122

123+
/// Serialize a DataFile to a JSON string.
124+
pub fn serialize_data_file_to_json(
125+
data_file: DataFile,
126+
partition_type: &super::StructType,
127+
format_version: FormatVersion,
128+
) -> Result<String> {
129+
let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
130+
serde_json::to_string(&serde).map_err(|e| {
131+
Error::new(
132+
ErrorKind::DataInvalid,
133+
"Failed to serialize DataFile to JSON!".to_string(),
134+
)
135+
.with_source(e)
136+
})
137+
}
138+
139+
/// Deserialize a DataFile from a JSON string.
140+
pub fn deserialize_data_file_from_json(
141+
json: &str,
142+
partition_spec_id: i32,
143+
partition_type: &super::StructType,
144+
schema: &Schema,
145+
) -> Result<DataFile> {
146+
let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
147+
Error::new(
148+
ErrorKind::DataInvalid,
149+
"Failed to deserialize JSON to DataFile!".to_string(),
150+
)
151+
.with_source(e)
152+
})?;
153+
154+
serde.try_into(partition_spec_id, partition_type, schema)
155+
}
156+
122157
#[cfg(test)]
123158
mod tests {
124159
use std::collections::HashMap;
125160
use std::fs;
126161
use std::sync::Arc;
127162

163+
use serde_json::Value;
128164
use tempfile::TempDir;
129165

130166
use super::*;
@@ -1056,4 +1092,159 @@ mod tests {
10561092
assert!(!partitions[2].clone().contains_null);
10571093
assert_eq!(partitions[2].clone().contains_nan, Some(false));
10581094
}
1095+
1096+
#[test]
1097+
fn test_data_file_serialization() {
1098+
// Create a simple schema
1099+
let schema = Schema::builder()
1100+
.with_schema_id(1)
1101+
.with_identifier_field_ids(vec![1])
1102+
.with_fields(vec![
1103+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1104+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1105+
])
1106+
.build()
1107+
.unwrap();
1108+
1109+
// Create a partition spec
1110+
let partition_spec = PartitionSpec::builder(schema.clone())
1111+
.with_spec_id(1)
1112+
.add_partition_field("id", "id_partition", Transform::Identity)
1113+
.unwrap()
1114+
.build()
1115+
.unwrap();
1116+
1117+
// Get partition type from the partition spec
1118+
let partition_type = partition_spec.partition_type(&schema).unwrap();
1119+
1120+
// Create a vector of DataFile objects
1121+
let data_files = vec![
1122+
DataFileBuilder::default()
1123+
.content(DataContentType::Data)
1124+
.file_format(DataFileFormat::Parquet)
1125+
.file_path("path/to/file1.parquet".to_string())
1126+
.file_size_in_bytes(1024)
1127+
.record_count(100)
1128+
.partition_spec_id(1)
1129+
.partition(Struct::empty())
1130+
.column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1131+
.value_counts(HashMap::from([(1, 100), (2, 500)]))
1132+
.null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1133+
.build()
1134+
.unwrap(),
1135+
DataFileBuilder::default()
1136+
.content(DataContentType::Data)
1137+
.file_format(DataFileFormat::Parquet)
1138+
.file_path("path/to/file2.parquet".to_string())
1139+
.file_size_in_bytes(2048)
1140+
.record_count(200)
1141+
.partition_spec_id(1)
1142+
.partition(Struct::empty())
1143+
.column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1144+
.value_counts(HashMap::from([(1, 200), (2, 600)]))
1145+
.null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1146+
.build()
1147+
.unwrap(),
1148+
];
1149+
1150+
// Serialize the DataFile objects
1151+
let serialized_files = data_files
1152+
.clone()
1153+
.into_iter()
1154+
.map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1155+
.collect::<Vec<String>>();
1156+
1157+
// Verify we have the expected serialized files
1158+
assert_eq!(serialized_files.len(), 2);
1159+
let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1160+
let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1161+
let expected_serialized_file1 = serde_json::json!({
1162+
"content": 0,
1163+
"file_path": "path/to/file1.parquet",
1164+
"file_format": "PARQUET",
1165+
"partition": {},
1166+
"record_count": 100,
1167+
"file_size_in_bytes": 1024,
1168+
"column_sizes": [
1169+
{ "key": 1, "value": 512 },
1170+
{ "key": 2, "value": 1024 }
1171+
],
1172+
"value_counts": [
1173+
{ "key": 1, "value": 100 },
1174+
{ "key": 2, "value": 500 }
1175+
],
1176+
"null_value_counts": [
1177+
{ "key": 1, "value": 0 },
1178+
{ "key": 2, "value": 1 }
1179+
],
1180+
"nan_value_counts": [],
1181+
"lower_bounds": [],
1182+
"upper_bounds": [],
1183+
"key_metadata": null,
1184+
"split_offsets": [],
1185+
"equality_ids": [],
1186+
"sort_order_id": null,
1187+
"first_row_id": null,
1188+
"referenced_data_file": null,
1189+
"content_offset": null,
1190+
"content_size_in_bytes": null
1191+
});
1192+
let expected_serialized_file2 = serde_json::json!({
1193+
"content": 0,
1194+
"file_path": "path/to/file2.parquet",
1195+
"file_format": "PARQUET",
1196+
"partition": {},
1197+
"record_count": 200,
1198+
"file_size_in_bytes": 2048,
1199+
"column_sizes": [
1200+
{ "key": 1, "value": 1024 },
1201+
{ "key": 2, "value": 2048 }
1202+
],
1203+
"value_counts": [
1204+
{ "key": 1, "value": 200 },
1205+
{ "key": 2, "value": 600 }
1206+
],
1207+
"null_value_counts": [
1208+
{ "key": 1, "value": 10 },
1209+
{ "key": 2, "value": 999 }
1210+
],
1211+
"nan_value_counts": [],
1212+
"lower_bounds": [],
1213+
"upper_bounds": [],
1214+
"key_metadata": null,
1215+
"split_offsets": [],
1216+
"equality_ids": [],
1217+
"sort_order_id": null,
1218+
"first_row_id": null,
1219+
"referenced_data_file": null,
1220+
"content_offset": null,
1221+
"content_size_in_bytes": null
1222+
});
1223+
assert_eq!(pretty_json1, expected_serialized_file1);
1224+
assert_eq!(pretty_json2, expected_serialized_file2);
1225+
1226+
// Now deserialize the JSON strings back into DataFile objects
1227+
let deserialized_files: Vec<DataFile> = serialized_files
1228+
.into_iter()
1229+
.map(|json| {
1230+
deserialize_data_file_from_json(
1231+
&json,
1232+
partition_spec.spec_id(),
1233+
&partition_type,
1234+
&schema,
1235+
)
1236+
.unwrap()
1237+
})
1238+
.collect();
1239+
1240+
// Verify we have the expected number of deserialized files
1241+
assert_eq!(deserialized_files.len(), 2);
1242+
let deserialized_data_file1 = deserialized_files.first().unwrap();
1243+
let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1244+
let original_data_file1 = data_files.first().unwrap();
1245+
let original_data_file2 = data_files.get(1).unwrap();
1246+
1247+
assert_eq!(deserialized_data_file1, original_data_file1);
1248+
assert_eq!(deserialized_data_file2, original_data_file2);
1249+
}
10591250
}

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use crate::spec::DataFileBuilder;
2626

2727
mod parquet_writer;
2828
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
29-
mod track_writer;
3029

3130
pub mod location_generator;
3231
/// Module providing writers that can automatically roll over to new files based on size thresholds.

0 commit comments

Comments
 (0)