Skip to content

Commit cd53ac2

Browse files
authored
feat(writer): Make LocationGenerator partition-aware (#1625)
## Which issue does this PR close? - Closes #1624 ## What changes are included in this PR? - Added `PartitionKey` to hold necessary partition-related info for path generation - [BREAKING] Changed `LocationGenerator::generate_location` to accept an optional partition key - [BREAKING] Have `ParquetWriter` take in an `Option<PartitionKey>` ## Are these changes tested? Added ut
1 parent 1397a36 commit cd53ac2

File tree

15 files changed

+226
-31
lines changed

15 files changed

+226
-31
lines changed

crates/iceberg/src/spec/partition.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,39 @@ impl PartitionSpec {
176176
}
177177
}
178178

179+
/// A partition key represents a specific partition in a table, containing the partition spec,
180+
/// schema, and the actual partition values.
181+
#[derive(Clone, Debug)]
182+
pub struct PartitionKey {
183+
/// The partition spec that contains the partition fields.
184+
spec: PartitionSpec,
185+
/// The schema to which the partition spec is bound.
186+
schema: SchemaRef,
187+
/// Partition fields' values in struct.
188+
data: Struct,
189+
}
190+
191+
impl PartitionKey {
192+
/// Creates a new partition key with the given spec, schema, and data.
193+
pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
194+
Self { spec, schema, data }
195+
}
196+
197+
/// Generates a partition path based on the partition values.
198+
pub fn to_path(&self) -> String {
199+
self.spec.partition_to_path(&self.data, self.schema.clone())
200+
}
201+
202+
/// Returns `true` if the partition key is absent (`None`)
203+
/// or represents an unpartitioned spec.
204+
pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
205+
match partition_key {
206+
None => true,
207+
Some(pk) => pk.spec.is_unpartitioned(),
208+
}
209+
}
210+
}
211+
179212
/// Reference to [`UnboundPartitionSpec`].
180213
pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
181214
/// Unbound partition field can be built without a schema and later bound to a schema.
@@ -1772,7 +1805,7 @@ mod tests {
17721805

17731806
assert_eq!(
17741807
spec.partition_to_path(&data, schema.into()),
1775-
"id=42/name=\"alice\""
1808+
"id=42/name=alice"
17761809
);
17771810
}
17781811
}

crates/iceberg/src/spec/snapshot_summary.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ mod tests {
819819
assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
820820
assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
821821

822-
let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=\"2025\"");
822+
let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
823823

824824
assert!(props.contains_key(&partition_key));
825825

crates/iceberg/src/spec/transform.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl Transform {
142142
let field_type = field_type.as_primitive_type().unwrap();
143143
let datum = Datum::new(field_type.clone(), value);
144144
match self {
145-
Self::Identity => datum.to_string(),
145+
Self::Identity => datum.to_human_string(),
146146
Self::Void => "null".to_string(),
147147
_ => {
148148
todo!()

crates/iceberg/src/spec/values.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,17 @@ impl Datum {
12661266
_ => false,
12671267
}
12681268
}
1269+
1270+
/// Returns a human-readable string representation of this literal.
1271+
///
1272+
/// For string literals, this returns the raw string value without quotes.
1273+
/// For all other literals, it falls back to [`to_string()`].
1274+
pub(crate) fn to_human_string(&self) -> String {
1275+
match self.literal() {
1276+
PrimitiveLiteral::String(s) => s.to_string(),
1277+
_ => self.to_string(),
1278+
}
1279+
}
12691280
}
12701281

12711282
/// Map is a collection of key-value pairs with a key type and a value type.

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ mod test {
144144
let pw = ParquetWriterBuilder::new(
145145
WriterProperties::builder().build(),
146146
Arc::new(schema),
147+
None,
147148
file_io.clone(),
148149
location_gen,
149150
file_name_gen,
@@ -221,6 +222,7 @@ mod test {
221222
let parquet_writer_builder = ParquetWriterBuilder::new(
222223
WriterProperties::builder().build(),
223224
Arc::new(schema.clone()),
225+
None,
224226
file_io.clone(),
225227
location_gen,
226228
file_name_gen,

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ mod test {
404404
let pb = ParquetWriterBuilder::new(
405405
WriterProperties::builder().build(),
406406
Arc::new(delete_schema),
407+
None,
407408
file_io.clone(),
408409
location_gen,
409410
file_name_gen,
@@ -569,6 +570,7 @@ mod test {
569570
let pb = ParquetWriterBuilder::new(
570571
WriterProperties::builder().build(),
571572
Arc::new(delete_schema),
573+
None,
572574
file_io.clone(),
573575
location_gen,
574576
file_name_gen,

crates/iceberg/src/writer/file_writer/location_generator.rs

Lines changed: 123 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,24 @@ use std::sync::Arc;
2121
use std::sync::atomic::AtomicU64;
2222

2323
use crate::Result;
24-
use crate::spec::{DataFileFormat, TableMetadata};
24+
use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
2525

2626
/// `LocationGenerator` used to generate the location of data file.
2727
pub trait LocationGenerator: Clone + Send + 'static {
28-
/// Generate an absolute path for the given file name.
29-
/// e.g.
30-
/// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet"
31-
fn generate_location(&self, file_name: &str) -> String;
28+
/// Generate an absolute path for the given file name that includes the partition path.
29+
///
30+
/// # Arguments
31+
///
32+
/// * `partition_key` - The partition key of the file. If None, generate a non-partitioned path.
33+
/// * `file_name` - The name of the file
34+
///
35+
/// # Returns
36+
///
37+
/// An absolute path that includes the partition path, e.g.,
38+
/// "/table/data/id=1/name=alice/part-00000.parquet"
39+
/// or non-partitioned path:
40+
/// "/table/data/part-00000.parquet"
41+
fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String;
3242
}
3343

3444
const WRITE_DATA_LOCATION: &str = "write.data.path";
@@ -39,29 +49,38 @@ const DEFAULT_DATA_DIR: &str = "/data";
3949
/// `DefaultLocationGenerator` used to generate the data dir location of data file.
4050
/// The location is generated based on the table location and the data location in table properties.
4151
pub struct DefaultLocationGenerator {
42-
dir_path: String,
52+
data_location: String,
4353
}
4454

4555
impl DefaultLocationGenerator {
4656
/// Create a new `DefaultLocationGenerator`.
4757
pub fn new(table_metadata: TableMetadata) -> Result<Self> {
4858
let table_location = table_metadata.location();
4959
let prop = table_metadata.properties();
50-
let data_location = prop
60+
let configured_data_location = prop
5161
.get(WRITE_DATA_LOCATION)
5262
.or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
53-
let dir_path = if let Some(data_location) = data_location {
63+
let data_location = if let Some(data_location) = configured_data_location {
5464
data_location.clone()
5565
} else {
5666
format!("{}{}", table_location, DEFAULT_DATA_DIR)
5767
};
58-
Ok(Self { dir_path })
68+
Ok(Self { data_location })
5969
}
6070
}
6171

6272
impl LocationGenerator for DefaultLocationGenerator {
63-
fn generate_location(&self, file_name: &str) -> String {
64-
format!("{}/{}", self.dir_path, file_name)
73+
fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String {
74+
if PartitionKey::is_effectively_none(partition_key) {
75+
format!("{}/{}", self.data_location, file_name)
76+
} else {
77+
format!(
78+
"{}/{}/{}",
79+
self.data_location,
80+
partition_key.unwrap().to_path(),
81+
file_name
82+
)
83+
}
6584
}
6685
}
6786

@@ -115,11 +134,15 @@ impl FileNameGenerator for DefaultFileNameGenerator {
115134
#[cfg(test)]
116135
pub(crate) mod test {
117136
use std::collections::HashMap;
137+
use std::sync::Arc;
118138

119139
use uuid::Uuid;
120140

121141
use super::LocationGenerator;
122-
use crate::spec::{FormatVersion, PartitionSpec, StructType, TableMetadata};
142+
use crate::spec::{
143+
FormatVersion, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Schema,
144+
Struct, StructType, TableMetadata, Transform, Type,
145+
};
123146
use crate::writer::file_writer::location_generator::{
124147
FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION,
125148
};
@@ -136,8 +159,17 @@ pub(crate) mod test {
136159
}
137160

138161
impl LocationGenerator for MockLocationGenerator {
139-
fn generate_location(&self, file_name: &str) -> String {
140-
format!("{}/{}", self.root, file_name)
162+
fn generate_location(&self, partition: Option<&PartitionKey>, file_name: &str) -> String {
163+
if PartitionKey::is_effectively_none(partition) {
164+
format!("{}/{}", self.root, file_name)
165+
} else {
166+
format!(
167+
"{}/{}/{}",
168+
self.root,
169+
partition.unwrap().to_path(),
170+
file_name
171+
)
172+
}
141173
}
142174
}
143175

@@ -169,7 +201,7 @@ pub(crate) mod test {
169201
encryption_keys: HashMap::new(),
170202
};
171203

172-
let file_name_genertaor = super::DefaultFileNameGenerator::new(
204+
let file_name_generator = super::DefaultFileNameGenerator::new(
173205
"part".to_string(),
174206
Some("test".to_string()),
175207
crate::spec::DataFileFormat::Parquet,
@@ -179,7 +211,7 @@ pub(crate) mod test {
179211
let location_generator =
180212
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
181213
let location =
182-
location_generator.generate_location(&file_name_genertaor.generate_file_name());
214+
location_generator.generate_location(None, &file_name_generator.generate_file_name());
183215
assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet");
184216

185217
// test custom data location
@@ -190,7 +222,7 @@ pub(crate) mod test {
190222
let location_generator =
191223
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
192224
let location =
193-
location_generator.generate_location(&file_name_genertaor.generate_file_name());
225+
location_generator.generate_location(None, &file_name_generator.generate_file_name());
194226
assert_eq!(
195227
location,
196228
"s3://data.db/table/data_1/part-00001-test.parquet"
@@ -203,7 +235,7 @@ pub(crate) mod test {
203235
let location_generator =
204236
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
205237
let location =
206-
location_generator.generate_location(&file_name_genertaor.generate_file_name());
238+
location_generator.generate_location(None, &file_name_generator.generate_file_name());
207239
assert_eq!(
208240
location,
209241
"s3://data.db/table/data_2/part-00002-test.parquet"
@@ -217,7 +249,79 @@ pub(crate) mod test {
217249
let location_generator =
218250
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
219251
let location =
220-
location_generator.generate_location(&file_name_genertaor.generate_file_name());
252+
location_generator.generate_location(None, &file_name_generator.generate_file_name());
221253
assert_eq!(location, "s3://data.db/data_3/part-00003-test.parquet");
222254
}
255+
256+
#[test]
257+
fn test_location_generate_with_partition() {
258+
// Create a schema with two fields: id (int) and name (string)
259+
let schema = Arc::new(
260+
Schema::builder()
261+
.with_schema_id(1)
262+
.with_fields(vec![
263+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
264+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
265+
])
266+
.build()
267+
.unwrap(),
268+
);
269+
270+
// Create a partition spec with both fields
271+
let partition_spec = PartitionSpec::builder(schema.clone())
272+
.add_partition_field("id", "id", Transform::Identity)
273+
.unwrap()
274+
.add_partition_field("name", "name", Transform::Identity)
275+
.unwrap()
276+
.build()
277+
.unwrap();
278+
279+
// Create partition data with values
280+
let partition_data =
281+
Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
282+
283+
// Create a partition key
284+
let partition_key = PartitionKey::new(partition_spec, schema, partition_data);
285+
286+
// Test with MockLocationGenerator
287+
let mock_location_gen = MockLocationGenerator::new("/base/path".to_string());
288+
let file_name = "data-00000.parquet";
289+
let location = mock_location_gen.generate_location(Some(&partition_key), file_name);
290+
assert_eq!(location, "/base/path/id=42/name=alice/data-00000.parquet");
291+
292+
// Create a table metadata for DefaultLocationGenerator
293+
let table_metadata = TableMetadata {
294+
format_version: FormatVersion::V2,
295+
table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
296+
location: "s3://data.db/table".to_string(),
297+
last_updated_ms: 1515100955770,
298+
last_column_id: 2,
299+
schemas: HashMap::new(),
300+
current_schema_id: 1,
301+
partition_specs: HashMap::new(),
302+
default_spec: PartitionSpec::unpartition_spec().into(),
303+
default_partition_type: StructType::new(vec![]),
304+
last_partition_id: 1000,
305+
default_sort_order_id: 0,
306+
sort_orders: HashMap::from_iter(vec![]),
307+
snapshots: HashMap::default(),
308+
current_snapshot_id: None,
309+
last_sequence_number: 1,
310+
properties: HashMap::new(),
311+
snapshot_log: Vec::new(),
312+
metadata_log: vec![],
313+
refs: HashMap::new(),
314+
statistics: HashMap::new(),
315+
partition_statistics: HashMap::new(),
316+
encryption_keys: HashMap::new(),
317+
};
318+
319+
// Test with DefaultLocationGenerator
320+
let default_location_gen = super::DefaultLocationGenerator::new(table_metadata).unwrap();
321+
let location = default_location_gen.generate_location(Some(&partition_key), file_name);
322+
assert_eq!(
323+
location,
324+
"s3://data.db/table/data/id=42/name=alice/data-00000.parquet"
325+
);
326+
}
223327
}

0 commit comments

Comments
 (0)