Skip to content

Commit 393e7ca

Browse files
committed
add create_ndjson_read_options
1 parent b1e67df commit 393e7ca

File tree

1 file changed

+43
-25
lines changed

1 file changed

+43
-25
lines changed

src/context.rs

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -858,16 +858,13 @@ impl PySessionContext {
858858

859859
// Create a future that moves owned values
860860
let result_future = async move {
861-
let mut options = NdJsonReadOptions::default()
862-
.file_compression_type(parsed_file_compression_type)
863-
.table_partition_cols(table_partition_cols.clone());
864-
options.schema_infer_max_records = schema_infer_max_records;
865-
options.file_extension = &file_extension_owned;
866-
867-
// Use owned schema if provided
868-
if let Some(s) = &schema_owned {
869-
options.schema = Some(s);
870-
}
861+
let options = create_ndjson_read_options(
862+
schema_infer_max_records,
863+
&file_extension_owned,
864+
parsed_file_compression_type,
865+
table_partition_cols.clone(),
866+
schema_owned.as_ref(),
867+
);
871868

872869
ctx.register_json(&name_owned, &path_owned, options).await
873870
};
@@ -1063,28 +1060,27 @@ impl PySessionContext {
10631060
let df = if schema_owned.is_some() {
10641061
// Create a future that moves owned values
10651062
let result_future = async move {
1066-
let mut options = NdJsonReadOptions::default()
1067-
.table_partition_cols(table_partition_cols.clone())
1068-
.file_compression_type(parsed_file_compression_type);
1069-
options.schema_infer_max_records = schema_infer_max_records;
1070-
options.file_extension = &file_extension_owned;
1071-
1072-
// Use owned schema if provided
1073-
if let Some(s) = &schema_owned {
1074-
options.schema = Some(s);
1075-
}
1063+
let options = create_ndjson_read_options(
1064+
schema_infer_max_records,
1065+
&file_extension_owned,
1066+
parsed_file_compression_type,
1067+
table_partition_cols.clone(),
1068+
schema_owned.as_ref(),
1069+
);
10761070

10771071
ctx.read_json(&path_owned, options).await
10781072
};
10791073
wait_for_future(py, result_future)?.map_err(PyDataFusionError::from)?
10801074
} else {
10811075
// Create a future that moves owned values
10821076
let result_future = async move {
1083-
let mut options = NdJsonReadOptions::default()
1084-
.table_partition_cols(table_partition_cols.clone())
1085-
.file_compression_type(parsed_file_compression_type);
1086-
options.schema_infer_max_records = schema_infer_max_records;
1087-
options.file_extension = &file_extension_owned;
1077+
let options = create_ndjson_read_options(
1078+
schema_infer_max_records,
1079+
&file_extension_owned,
1080+
parsed_file_compression_type,
1081+
table_partition_cols.clone(),
1082+
None,
1083+
);
10881084

10891085
ctx.read_json(&path_owned, options).await
10901086
};
@@ -1393,6 +1389,28 @@ pub fn convert_table_partition_cols(
13931389
.collect::<Result<Vec<_>, _>>()
13941390
}
13951391

1392+
/// Create NdJsonReadOptions with the provided parameters
1393+
fn create_ndjson_read_options<'a>(
1394+
schema_infer_max_records: usize,
1395+
file_extension: &'a str,
1396+
file_compression_type: FileCompressionType,
1397+
table_partition_cols: Vec<(String, DataType)>,
1398+
schema: Option<&'a Schema>,
1399+
) -> NdJsonReadOptions<'a> {
1400+
let mut options = NdJsonReadOptions::default()
1401+
.table_partition_cols(table_partition_cols)
1402+
.file_compression_type(file_compression_type);
1403+
options.schema_infer_max_records = schema_infer_max_records;
1404+
options.file_extension = file_extension;
1405+
1406+
// Set schema if provided
1407+
if let Some(s) = schema {
1408+
options.schema = Some(s);
1409+
}
1410+
1411+
options
1412+
}
1413+
13961414
/// Create CsvReadOptions with the provided parameters
13971415
fn create_csv_read_options<'a>(
13981416
has_header: bool,

0 commit comments

Comments
 (0)