Skip to content

Commit bcc8669

Browse files
author
Devdutt Shenoi
committed
refactor: partitioned_files as a method
1 parent 8a7cbf8 commit bcc8669

File tree

1 file changed

+81
-85
lines changed

1 file changed

+81
-85
lines changed

src/query/stream_schema_provider.rs

Lines changed: 81 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl StandardTableProvider {
199199
})
200200
.collect();
201201

202-
let (partitioned_files, statistics) = partitioned_files(cached, &self.schema);
202+
let (partitioned_files, statistics) = self.partitioned_files(cached);
203203
let plan = self
204204
.create_parquet_physical_plan(
205205
ObjectStoreUrl::parse("file:///").unwrap(),
@@ -249,7 +249,7 @@ impl StandardTableProvider {
249249
})
250250
.collect();
251251

252-
let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &self.schema);
252+
let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files);
253253
let plan = self
254254
.create_parquet_physical_plan(
255255
ObjectStoreUrl::parse("file:///").unwrap(),
@@ -317,6 +317,84 @@ impl StandardTableProvider {
317317
};
318318
Ok(exec)
319319
}
320+
321+
fn partitioned_files(
322+
&self,
323+
manifest_files: Vec<catalog::manifest::File>,
324+
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
325+
let target_partition = num_cpus::get();
326+
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
327+
let mut column_statistics =
328+
HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
329+
let mut count = 0;
330+
for (index, file) in manifest_files
331+
.into_iter()
332+
.enumerate()
333+
.map(|(x, y)| (x % target_partition, y))
334+
{
335+
#[allow(unused_mut)]
336+
let catalog::manifest::File {
337+
mut file_path,
338+
num_rows,
339+
columns,
340+
..
341+
} = file;
342+
343+
// object_store::path::Path doesn't automatically deal with Windows path separators
344+
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
345+
// before sending the file path to PartitionedFile
346+
// the github issue- https://github.com/parseablehq/parseable/issues/824
347+
// For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution
348+
// TODO: figure out an elegant solution to this
349+
#[cfg(windows)]
350+
{
351+
if CONFIG.storage_name.eq("drive") {
352+
file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
353+
}
354+
}
355+
let pf = PartitionedFile::new(file_path, file.file_size);
356+
partitioned_files[index].push(pf);
357+
358+
columns.into_iter().for_each(|col| {
359+
column_statistics
360+
.entry(col.name)
361+
.and_modify(|x| {
362+
if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone())
363+
{
364+
*x = Some(stats.update(col_stats));
365+
}
366+
})
367+
.or_insert_with(|| col.stats.as_ref().cloned());
368+
});
369+
count += num_rows;
370+
}
371+
let statistics = self
372+
.schema
373+
.fields()
374+
.iter()
375+
.map(|field| {
376+
column_statistics
377+
.get(field.name())
378+
.and_then(|stats| stats.as_ref())
379+
.and_then(|stats| stats.clone().min_max_as_scalar(field.data_type()))
380+
.map(|(min, max)| datafusion::common::ColumnStatistics {
381+
null_count: Precision::Absent,
382+
max_value: Precision::Exact(max),
383+
min_value: Precision::Exact(min),
384+
distinct_count: Precision::Absent,
385+
})
386+
.unwrap_or_default()
387+
})
388+
.collect();
389+
390+
let statistics = datafusion::common::Statistics {
391+
num_rows: Precision::Exact(count as usize),
392+
total_byte_size: Precision::Absent,
393+
column_statistics: statistics,
394+
};
395+
396+
(partitioned_files, statistics)
397+
}
320398
}
321399

322400
async fn collect_from_snapshot(
@@ -366,88 +444,6 @@ async fn collect_from_snapshot(
366444
Ok(manifest_files)
367445
}
368446

369-
fn partitioned_files(
370-
manifest_files: Vec<catalog::manifest::File>,
371-
table_schema: &Schema,
372-
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
373-
let target_partition = num_cpus::get();
374-
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
375-
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
376-
let mut count = 0;
377-
for (index, file) in manifest_files
378-
.into_iter()
379-
.enumerate()
380-
.map(|(x, y)| (x % target_partition, y))
381-
{
382-
let catalog::manifest::File {
383-
file_path,
384-
num_rows,
385-
columns,
386-
..
387-
} = file;
388-
389-
// object_store::path::Path doesn't automatically deal with Windows path separators
390-
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
391-
// before sending the file path to PartitionedFile
392-
// the github issue- https://github.com/parseablehq/parseable/issues/824
393-
// For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution
394-
// TODO: figure out an elegant solution to this
395-
let pf;
396-
397-
#[cfg(unix)]
398-
{
399-
pf = PartitionedFile::new(file_path, file.file_size);
400-
}
401-
#[cfg(windows)]
402-
{
403-
pf = if CONFIG.storage_name.eq("drive") {
404-
let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
405-
PartitionedFile::new(file_path, file.file_size)
406-
} else {
407-
PartitionedFile::new(file_path, file.file_size)
408-
};
409-
}
410-
411-
partitioned_files[index].push(pf);
412-
columns.into_iter().for_each(|col| {
413-
column_statistics
414-
.entry(col.name)
415-
.and_modify(|x| {
416-
if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) {
417-
*x = Some(stats.update(col_stats));
418-
}
419-
})
420-
.or_insert_with(|| col.stats.as_ref().cloned());
421-
});
422-
count += num_rows;
423-
}
424-
let statistics = table_schema
425-
.fields()
426-
.iter()
427-
.map(|field| {
428-
column_statistics
429-
.get(field.name())
430-
.and_then(|stats| stats.as_ref())
431-
.and_then(|stats| stats.clone().min_max_as_scalar(field.data_type()))
432-
.map(|(min, max)| datafusion::common::ColumnStatistics {
433-
null_count: Precision::Absent,
434-
max_value: Precision::Exact(max),
435-
min_value: Precision::Exact(min),
436-
distinct_count: Precision::Absent,
437-
})
438-
.unwrap_or_default()
439-
})
440-
.collect();
441-
442-
let statistics = datafusion::common::Statistics {
443-
num_rows: Precision::Exact(count as usize),
444-
total_byte_size: Precision::Absent,
445-
column_statistics: statistics,
446-
};
447-
448-
(partitioned_files, statistics)
449-
}
450-
451447
#[async_trait::async_trait]
452448
impl TableProvider for StandardTableProvider {
453449
fn as_any(&self) -> &dyn std::any::Any {
@@ -602,7 +598,7 @@ impl TableProvider for StandardTableProvider {
602598
);
603599
}
604600

605-
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema);
601+
let (partitioned_files, statistics) = self.partitioned_files(manifest_files);
606602
let remote_exec = self
607603
.create_parquet_physical_plan(
608604
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),

0 commit comments

Comments
 (0)