Skip to content

Commit e25f888

Browse files
committed
do not expose serde
1 parent 41a75bd commit e25f888

File tree

4 files changed

+60
-44
lines changed

4 files changed

+60
-44
lines changed

crates/iceberg/src/spec/manifest/_serde.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,9 @@ impl ManifestEntryV1 {
9696
}
9797
}
9898

99-
/// todo doc
10099
#[serde_as]
101100
#[derive(Serialize, Deserialize)]
102-
pub struct DataFileSerde {
101+
pub(super) struct DataFileSerde {
103102
#[serde(default)]
104103
content: i32,
105104
file_path: String,
@@ -127,7 +126,6 @@ pub struct DataFileSerde {
127126
}
128127

129128
impl DataFileSerde {
130-
/// todo doc
131129
pub fn try_from(
132130
value: super::DataFile,
133131
partition_type: &StructType,
@@ -162,7 +160,6 @@ impl DataFileSerde {
162160
})
163161
}
164162

165-
/// todo doc
166163
pub fn try_into(
167164
self,
168165
partition_spec_id: i32,

crates/iceberg/src/spec/manifest/mod.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
// todo fix encapsulation
1918
mod _serde;
20-
pub use _serde::*;
2119

2220
mod data_file;
2321
pub use data_file::*;
@@ -35,7 +33,7 @@ use super::{
3533
Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
3634
UNASSIGNED_SEQUENCE_NUMBER,
3735
};
38-
use crate::error::Result;
36+
use crate::error::{Error, ErrorKind, Result};
3937

4038
/// A manifest contains metadata and a list of entries.
4139
#[derive(Debug, PartialEq, Eq, Clone)]
@@ -121,6 +119,38 @@ impl Manifest {
121119
}
122120
}
123121

122+
/// Serialize a DataFile to a JSON string.
123+
pub fn serialize_data_file_to_json(
124+
data_file: DataFile,
125+
partition_type: &super::StructType,
126+
is_version_1: bool,
127+
) -> Result<String> {
128+
let serde = _serde::DataFileSerde::try_from(data_file, partition_type, is_version_1)?;
129+
serde_json::to_string(&serde).map_err(|e| {
130+
Error::new(
131+
ErrorKind::DataInvalid,
132+
format!("Failed to serialize DataFile to JSON: {}", e),
133+
)
134+
})
135+
}
136+
137+
/// Deserialize a DataFile from a JSON string.
138+
pub fn deserialize_data_file_from_json(
139+
json: &str,
140+
partition_spec_id: i32,
141+
partition_type: &super::StructType,
142+
schema: &Schema,
143+
) -> Result<DataFile> {
144+
let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
145+
Error::new(
146+
ErrorKind::DataInvalid,
147+
format!("Failed to deserialize JSON to DataFile: {}", e),
148+
)
149+
})?;
150+
151+
serde.try_into(partition_spec_id, partition_type, schema)
152+
}
153+
124154
#[cfg(test)]
125155
mod tests {
126156
use std::collections::HashMap;

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion::physical_plan::{
3333
};
3434
use futures::StreamExt;
3535
use iceberg::Catalog;
36-
use iceberg::spec::{DataFile, DataFileSerde};
36+
use iceberg::spec::{DataFile, deserialize_data_file_from_json};
3737
use iceberg::table::Table;
3838
use iceberg::transaction::Transaction;
3939

@@ -234,18 +234,18 @@ impl ExecutionPlan for IcebergCommitExec {
234234
total_count += count_array.iter().flatten().sum::<u64>();
235235

236236
// Deserialize all data files from the StringArray
237-
let batch_files: Vec<DataFile> = (0..files_array.len())
238-
.map(|i| -> DFResult<DataFile> {
237+
let batch_files: Vec<DataFile> = files_array
238+
.into_iter()
239+
.flatten()
240+
.map(|f| -> DFResult<DataFile> {
239241
// Parse JSON to DataFileSerde and convert to DataFile
240-
serde_json::from_str::<DataFileSerde>(files_array.value(i))
241-
.map_err(|e| {
242-
DataFusionError::Internal(format!(
243-
"Failed to deserialize data files: {}",
244-
e
245-
))
246-
})?
247-
.try_into(spec_id, &partition_type, &current_schema)
248-
.map_err(to_datafusion_error)
242+
deserialize_data_file_from_json(
243+
f,
244+
spec_id,
245+
&partition_type,
246+
&current_schema,
247+
)
248+
.map_err(to_datafusion_error)
249249
})
250250
.collect::<datafusion::common::Result<_>>()?;
251251

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion::physical_plan::{
3434
};
3535
use futures::StreamExt;
3636
use iceberg::arrow::schema_to_arrow_schema;
37-
use iceberg::spec::{DataFileFormat, DataFileSerde, FormatVersion};
37+
use iceberg::spec::{DataFileFormat, FormatVersion, serialize_data_file_to_json};
3838
use iceberg::table::Table;
3939
use iceberg::writer::CurrentFileStatus;
4040
use iceberg::writer::file_writer::location_generator::{
@@ -208,19 +208,10 @@ impl ExecutionPlan for IcebergWriteExec {
208208
DataFusionError::Execution(format!("Failed to build data file: {}", e))
209209
})?;
210210

211-
// Convert to DataFileSerde
212-
let serde = DataFileSerde::try_from(data_file, &partition_type, is_version_1)
213-
.map_err(|e| {
214-
DataFusionError::Execution(format!(
215-
"Failed to convert to DataFileSerde: {}",
216-
e
217-
))
218-
})?;
219-
220211
// Serialize to JSON
221-
let json = serde_json::to_string(&serde).map_err(|e| {
222-
DataFusionError::Execution(format!("Failed to serialize to JSON: {}", e))
223-
})?;
212+
let json =
213+
serialize_data_file_to_json(data_file, &partition_type, is_version_1)
214+
.map_err(to_datafusion_error)?;
224215

225216
println!("Serialized data file: {}", json); // todo remove log
226217
Ok(json)
@@ -244,8 +235,8 @@ mod tests {
244235

245236
use datafusion::arrow::array::StringArray;
246237
use iceberg::spec::{
247-
DataFile, DataFileBuilder, DataFileFormat, DataFileSerde, PartitionSpec, PrimitiveType,
248-
Schema, Struct, Type,
238+
DataFile, DataFileBuilder, DataFileFormat, PartitionSpec, PrimitiveType, Schema, Struct,
239+
Type, deserialize_data_file_from_json, serialize_data_file_to_json,
249240
};
250241

251242
// todo move this to DataFileSerde?
@@ -316,8 +307,7 @@ mod tests {
316307
let serialized_files = data_files
317308
.into_iter()
318309
.map(|f| {
319-
let serde = DataFileSerde::try_from(f, &partition_type, is_version_1).unwrap();
320-
let json = serde_json::to_string(&serde).unwrap();
310+
let json = serialize_data_file_to_json(f, &partition_type, is_version_1).unwrap();
321311
println!("Test serialized data file: {}", json);
322312
json
323313
})
@@ -343,14 +333,13 @@ mod tests {
343333
let deserialized_files: Vec<DataFile> = serialized_files
344334
.into_iter()
345335
.map(|json| {
346-
// First deserialize to DataFileSerde
347-
let data_file_serde: DataFileSerde =
348-
serde_json::from_str(&json).expect("Failed to deserialize to DataFileSerde");
349-
350-
// Then convert to DataFile
351-
let data_file = data_file_serde
352-
.try_into(partition_spec.spec_id(), &partition_type, &schema)
353-
.expect("Failed to convert DataFileSerde to DataFile");
336+
let data_file = deserialize_data_file_from_json(
337+
&json,
338+
partition_spec.spec_id(),
339+
&partition_type,
340+
&schema,
341+
)
342+
.unwrap();
354343

355344
println!("Deserialized DataFile: {:?}", data_file);
356345
data_file

0 commit comments

Comments
 (0)