Skip to content

Commit 282d229

Browse files
committed
feat: add list_files_cache udtf for cli
1 parent d8e68a4 commit 282d229

File tree

4 files changed

+375
-31
lines changed

4 files changed

+375
-31
lines changed

datafusion-cli/src/functions.rs

Lines changed: 160 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
//! Functions that are query-able and searchable via the `\h` command
1919
20+
use datafusion_common::instant::Instant;
2021
use std::fmt;
2122
use std::fs::File;
2223
use std::str::FromStr;
2324
use std::sync::Arc;
2425

25-
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
26+
use arrow::array::{
27+
DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
28+
TimestampMillisecondArray, UInt64Array,
29+
};
30+
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
31+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
2732
use arrow::record_batch::RecordBatch;
2833
use arrow::util::pretty::pretty_format_batches;
2934
use datafusion::catalog::{Session, TableFunctionImpl};
@@ -697,3 +702,156 @@ impl TableFunctionImpl for StatisticsCacheFunc {
697702
Ok(Arc::new(statistics_cache))
698703
}
699704
}
705+
706+
#[derive(Debug)]
707+
struct ListFilesCacheTable {
708+
schema: SchemaRef,
709+
batch: RecordBatch,
710+
}
711+
712+
#[async_trait]
713+
impl TableProvider for ListFilesCacheTable {
714+
fn as_any(&self) -> &dyn std::any::Any {
715+
self
716+
}
717+
718+
fn schema(&self) -> arrow::datatypes::SchemaRef {
719+
self.schema.clone()
720+
}
721+
722+
fn table_type(&self) -> datafusion::logical_expr::TableType {
723+
datafusion::logical_expr::TableType::Base
724+
}
725+
726+
async fn scan(
727+
&self,
728+
_state: &dyn Session,
729+
projection: Option<&Vec<usize>>,
730+
_filters: &[Expr],
731+
_limit: Option<usize>,
732+
) -> Result<Arc<dyn ExecutionPlan>> {
733+
Ok(MemorySourceConfig::try_new_exec(
734+
&[vec![self.batch.clone()]],
735+
TableProvider::schema(self),
736+
projection.cloned(),
737+
)?)
738+
}
739+
}
740+
741+
#[derive(Debug)]
742+
pub struct ListFilesCacheFunc {
743+
cache_manager: Arc<CacheManager>,
744+
}
745+
746+
impl ListFilesCacheFunc {
747+
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
748+
Self { cache_manager }
749+
}
750+
}
751+
752+
impl TableFunctionImpl for ListFilesCacheFunc {
753+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
754+
if !exprs.is_empty() {
755+
return plan_err!("list_files_cache should have no arguments");
756+
}
757+
758+
let nested_fields = Fields::from(vec![
759+
Field::new("file_path", DataType::Utf8, false),
760+
Field::new(
761+
"file_modified",
762+
DataType::Timestamp(TimeUnit::Millisecond, None),
763+
false,
764+
),
765+
Field::new("file_size_bytes", DataType::UInt64, false),
766+
Field::new("e_tag", DataType::Utf8, true),
767+
Field::new("version", DataType::Utf8, true),
768+
]);
769+
770+
let metadata_field =
771+
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
772+
773+
let schema = Arc::new(Schema::new(vec![
774+
Field::new("path", DataType::Utf8, false),
775+
Field::new("metadata_size_bytes", DataType::UInt64, false),
776+
Field::new(
777+
"expires_in",
778+
DataType::Duration(TimeUnit::Millisecond),
779+
true,
780+
),
781+
Field::new(
782+
"metadata_list",
783+
DataType::List(Arc::new(metadata_field.clone())),
784+
true,
785+
),
786+
]));
787+
788+
let mut path_arr = vec![];
789+
let mut metadata_size_bytes_arr = vec![];
790+
let mut expires_arr = vec![];
791+
792+
let mut file_path_arr = vec![];
793+
let mut file_modified_arr = vec![];
794+
let mut file_size_bytes_arr = vec![];
795+
let mut etag_arr = vec![];
796+
let mut version_arr = vec![];
797+
let mut offsets: Vec<i32> = vec![0];
798+
799+
if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
800+
let now = Instant::now();
801+
let mut current_offset: i32 = 0;
802+
803+
for (path, entry) in list_files_cache.list_entries() {
804+
path_arr.push(path.to_string());
805+
metadata_size_bytes_arr.push(entry.size_bytes as u64);
806+
expires_arr.push(
807+
entry
808+
.expires
809+
.map(|t| t.duration_since(now).as_millis() as i64),
810+
);
811+
812+
for meta in entry.metas.iter() {
813+
file_path_arr.push(meta.location.to_string());
814+
file_modified_arr.push(meta.last_modified.timestamp_millis());
815+
file_size_bytes_arr.push(meta.size);
816+
etag_arr.push(meta.e_tag.clone());
817+
version_arr.push(meta.version.clone());
818+
}
819+
current_offset += entry.metas.len() as i32;
820+
offsets.push(current_offset);
821+
}
822+
}
823+
824+
let struct_arr = StructArray::new(
825+
nested_fields,
826+
vec![
827+
Arc::new(StringArray::from(file_path_arr)),
828+
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
829+
Arc::new(UInt64Array::from(file_size_bytes_arr)),
830+
Arc::new(StringArray::from(etag_arr)),
831+
Arc::new(StringArray::from(version_arr)),
832+
],
833+
None,
834+
);
835+
836+
let offsets_buffer: OffsetBuffer<i32> =
837+
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
838+
839+
let batch = RecordBatch::try_new(
840+
schema.clone(),
841+
vec![
842+
Arc::new(StringArray::from(path_arr)),
843+
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
844+
Arc::new(DurationMillisecondArray::from(expires_arr)),
845+
Arc::new(GenericListArray::new(
846+
Arc::new(metadata_field),
847+
offsets_buffer,
848+
Arc::new(struct_arr),
849+
None,
850+
)),
851+
],
852+
)?;
853+
854+
let list_files_cache = ListFilesCacheTable { schema, batch };
855+
Ok(Arc::new(list_files_cache))
856+
}
857+
}

datafusion-cli/src/main.rs

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
3232
use datafusion::prelude::SessionContext;
3333
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3434
use datafusion_cli::functions::{
35-
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
35+
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
3636
};
3737
use datafusion_cli::object_storage::instrumented::{
3838
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
@@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
253253
)),
254254
);
255255

256+
ctx.register_udtf(
257+
"list_files_cache",
258+
Arc::new(ListFilesCacheFunc::new(
259+
ctx.task_ctx().runtime_env().cache_manager.clone(),
260+
)),
261+
);
262+
256263
let mut print_options = PrintOptions {
257264
format: args.format,
258265
quiet: args.quiet,
@@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
431438

432439
#[cfg(test)]
433440
mod tests {
441+
use std::time::Duration;
442+
434443
use super::*;
435444
use datafusion::{
436445
common::test_util::batches_to_string,
437446
execution::cache::{
438-
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
447+
DefaultListFilesCache, cache_manager::CacheManagerConfig,
448+
cache_unit::DefaultFileStatisticsCache,
439449
},
440-
prelude::ParquetReadOptions,
450+
prelude::{ParquetReadOptions, col, lit, split_part},
441451
};
442452
use insta::assert_snapshot;
453+
use object_store::memory::InMemory;
454+
use url::Url;
443455

444456
fn assert_conversion(input: &str, expected: Result<usize, String>) {
445457
let result = extract_memory_pool_size(input);
@@ -741,4 +753,99 @@ mod tests {
741753

742754
Ok(())
743755
}
756+
757+
#[tokio::test]
758+
async fn test_list_files_cache() -> Result<(), DataFusionError> {
759+
let list_files_cache = Arc::new(DefaultListFilesCache::new(
760+
1024,
761+
Some(Duration::from_secs(1)),
762+
));
763+
764+
let rt = RuntimeEnvBuilder::new()
765+
.with_cache_manager(
766+
CacheManagerConfig::default()
767+
.with_list_files_cache(Some(list_files_cache)),
768+
)
769+
.build_arc()
770+
.unwrap();
771+
772+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
773+
774+
ctx.register_object_store(
775+
&Url::parse("mem://test_table").unwrap(),
776+
Arc::new(InMemory::new()),
777+
);
778+
779+
ctx.register_udtf(
780+
"list_files_cache",
781+
Arc::new(ListFilesCacheFunc::new(
782+
ctx.task_ctx().runtime_env().cache_manager.clone(),
783+
)),
784+
);
785+
786+
ctx.sql(
787+
"create external table src_table
788+
stored as parquet
789+
location '../parquet-testing/data/alltypes_plain.parquet'",
790+
)
791+
.await?
792+
.collect()
793+
.await?;
794+
795+
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;
796+
797+
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;
798+
799+
ctx.sql(
800+
"create external table test_table
801+
stored as parquet
802+
location 'mem://test_table/'
803+
",
804+
)
805+
.await?
806+
.collect()
807+
.await?;
808+
809+
let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()";
810+
let df = ctx
811+
.sql(sql)
812+
.await?
813+
.unnest_columns(&["metadata_list"])?
814+
.with_column_renamed("metadata_list", "metadata")?
815+
.unnest_columns(&["metadata"])?;
816+
817+
assert_eq!(
818+
2,
819+
df.clone()
820+
.filter(col("expires_in").is_not_null())?
821+
.count()
822+
.await?
823+
);
824+
825+
let df = df
826+
.with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")?
827+
.with_column_renamed(r#""metadata.e_tag""#, "etag")?
828+
.with_column(
829+
"filename",
830+
split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
831+
)?
832+
.select_columns(&[
833+
"metadata_size_bytes",
834+
"filename",
835+
"file_size_bytes",
836+
"etag",
837+
])?
838+
.sort(vec![col("filename").sort(true, false)])?;
839+
let rbs = df.collect().await?;
840+
assert_snapshot!(batches_to_string(&rbs),@r"
841+
+---------------------+-----------+-----------------+------+
842+
| metadata_size_bytes | filename | file_size_bytes | etag |
843+
+---------------------+-----------+-----------------+------+
844+
| 212 | 0.parquet | 3645 | 0 |
845+
| 212 | 1.parquet | 3645 | 1 |
846+
+---------------------+-----------+-----------------+------+
847+
");
848+
849+
Ok(())
850+
}
744851
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::cache::CacheAccessor;
1919
use crate::cache::cache_unit::DefaultFilesMetadataCache;
20+
use crate::cache::list_files_cache::ListFilesEntry;
2021
use datafusion_common::stats::Precision;
2122
use datafusion_common::{Result, Statistics};
2223
use object_store::ObjectMeta;
@@ -93,6 +94,9 @@ pub trait ListFilesCache:
9394

9495
/// Updates the cache with a new TTL (time-to-live).
9596
fn update_cache_ttl(&self, ttl: Option<Duration>);
97+
98+
/// Retrieves the information about the entries currently cached.
99+
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
96100
}
97101

98102
/// Generic file-embedded metadata used with [`FileMetadataCache`].

0 commit comments

Comments
 (0)