Skip to content

Commit 336266d

Browse files
committed
feat: add list_files_cache udtf for cli
1 parent 1ce4b51 commit 336266d

File tree

8 files changed

+449
-33
lines changed

8 files changed

+449
-33
lines changed

datafusion-cli/src/functions.rs

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
//! Functions that are query-able and searchable via the `\h` command
1919
20+
use datafusion_common::instant::Instant;
2021
use std::fmt;
2122
use std::fs::File;
2223
use std::str::FromStr;
2324
use std::sync::Arc;
2425

25-
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
26+
use arrow::array::{
27+
DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
28+
TimestampMillisecondArray, UInt64Array,
29+
};
30+
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
31+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
2732
use arrow::record_batch::RecordBatch;
2833
use arrow::util::pretty::pretty_format_batches;
2934
use datafusion::catalog::{Session, TableFunctionImpl};
@@ -697,3 +702,158 @@ impl TableFunctionImpl for StatisticsCacheFunc {
697702
Ok(Arc::new(statistics_cache))
698703
}
699704
}
705+
706+
#[derive(Debug)]
707+
struct ListFilesCacheTable {
708+
schema: SchemaRef,
709+
batch: RecordBatch,
710+
}
711+
712+
#[async_trait]
713+
impl TableProvider for ListFilesCacheTable {
714+
fn as_any(&self) -> &dyn std::any::Any {
715+
self
716+
}
717+
718+
fn schema(&self) -> arrow::datatypes::SchemaRef {
719+
self.schema.clone()
720+
}
721+
722+
fn table_type(&self) -> datafusion::logical_expr::TableType {
723+
datafusion::logical_expr::TableType::Base
724+
}
725+
726+
async fn scan(
727+
&self,
728+
_state: &dyn Session,
729+
projection: Option<&Vec<usize>>,
730+
_filters: &[Expr],
731+
_limit: Option<usize>,
732+
) -> Result<Arc<dyn ExecutionPlan>> {
733+
Ok(MemorySourceConfig::try_new_exec(
734+
&[vec![self.batch.clone()]],
735+
TableProvider::schema(self),
736+
projection.cloned(),
737+
)?)
738+
}
739+
}
740+
741+
#[derive(Debug)]
742+
pub struct ListFilesCacheFunc {
743+
cache_manager: Arc<CacheManager>,
744+
}
745+
746+
impl ListFilesCacheFunc {
747+
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
748+
Self { cache_manager }
749+
}
750+
}
751+
752+
impl TableFunctionImpl for ListFilesCacheFunc {
753+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
754+
if !exprs.is_empty() {
755+
return plan_err!("list_files_cache should have no arguments");
756+
}
757+
758+
let nested_fields = Fields::from(vec![
759+
Field::new("file_path", DataType::Utf8, false),
760+
Field::new(
761+
"file_modified",
762+
DataType::Timestamp(TimeUnit::Millisecond, None),
763+
false,
764+
),
765+
Field::new("file_size_bytes", DataType::UInt64, false),
766+
Field::new("e_tag", DataType::Utf8, true),
767+
Field::new("version", DataType::Utf8, true),
768+
]);
769+
770+
let metadata_field =
771+
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
772+
773+
let schema = Arc::new(Schema::new(vec![
774+
Field::new("path", DataType::Utf8, false),
775+
Field::new("metadata_size_bytes", DataType::UInt64, false),
776+
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
777+
Field::new(
778+
"expires_in",
779+
DataType::Duration(TimeUnit::Millisecond),
780+
true,
781+
),
782+
Field::new(
783+
"metadata_list",
784+
DataType::List(Arc::new(metadata_field.clone())),
785+
true,
786+
),
787+
]));
788+
789+
let mut path_arr = vec![];
790+
let mut metadata_size_bytes_arr = vec![];
791+
let mut expires_arr = vec![];
792+
793+
let mut file_path_arr = vec![];
794+
let mut file_modified_arr = vec![];
795+
let mut file_size_bytes_arr = vec![];
796+
let mut etag_arr = vec![];
797+
let mut version_arr = vec![];
798+
let mut offsets: Vec<i32> = vec![0];
799+
800+
if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
801+
let now = Instant::now();
802+
let mut current_offset: i32 = 0;
803+
804+
for (path, entry) in list_files_cache.list_entries() {
805+
path_arr.push(path.to_string());
806+
metadata_size_bytes_arr.push(entry.size_bytes as u64);
807+
// calculates time left before entry expires
808+
expires_arr.push(
809+
entry
810+
.expires
811+
.map(|t| t.duration_since(now).as_millis() as i64),
812+
);
813+
814+
for meta in entry.metas.iter() {
815+
file_path_arr.push(meta.location.to_string());
816+
file_modified_arr.push(meta.last_modified.timestamp_millis());
817+
file_size_bytes_arr.push(meta.size);
818+
etag_arr.push(meta.e_tag.clone());
819+
version_arr.push(meta.version.clone());
820+
}
821+
current_offset += entry.metas.len() as i32;
822+
offsets.push(current_offset);
823+
}
824+
}
825+
826+
let struct_arr = StructArray::new(
827+
nested_fields,
828+
vec![
829+
Arc::new(StringArray::from(file_path_arr)),
830+
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
831+
Arc::new(UInt64Array::from(file_size_bytes_arr)),
832+
Arc::new(StringArray::from(etag_arr)),
833+
Arc::new(StringArray::from(version_arr)),
834+
],
835+
None,
836+
);
837+
838+
let offsets_buffer: OffsetBuffer<i32> =
839+
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
840+
841+
let batch = RecordBatch::try_new(
842+
schema.clone(),
843+
vec![
844+
Arc::new(StringArray::from(path_arr)),
845+
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
846+
Arc::new(DurationMillisecondArray::from(expires_arr)),
847+
Arc::new(GenericListArray::new(
848+
Arc::new(metadata_field),
849+
offsets_buffer,
850+
Arc::new(struct_arr),
851+
None,
852+
)),
853+
],
854+
)?;
855+
856+
let list_files_cache = ListFilesCacheTable { schema, batch };
857+
Ok(Arc::new(list_files_cache))
858+
}
859+
}

datafusion-cli/src/main.rs

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
3232
use datafusion::prelude::SessionContext;
3333
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3434
use datafusion_cli::functions::{
35-
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
35+
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
3636
};
3737
use datafusion_cli::object_storage::instrumented::{
3838
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
@@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
253253
)),
254254
);
255255

256+
ctx.register_udtf(
257+
"list_files_cache",
258+
Arc::new(ListFilesCacheFunc::new(
259+
ctx.task_ctx().runtime_env().cache_manager.clone(),
260+
)),
261+
);
262+
256263
let mut print_options = PrintOptions {
257264
format: args.format,
258265
quiet: args.quiet,
@@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
431438

432439
#[cfg(test)]
433440
mod tests {
441+
use std::time::Duration;
442+
434443
use super::*;
435444
use datafusion::{
436445
common::test_util::batches_to_string,
437446
execution::cache::{
438-
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
447+
DefaultListFilesCache, cache_manager::CacheManagerConfig,
448+
cache_unit::DefaultFileStatisticsCache,
439449
},
440-
prelude::ParquetReadOptions,
450+
prelude::{ParquetReadOptions, col, lit, split_part},
441451
};
442452
use insta::assert_snapshot;
453+
use object_store::memory::InMemory;
454+
use url::Url;
443455

444456
fn assert_conversion(input: &str, expected: Result<usize, String>) {
445457
let result = extract_memory_pool_size(input);
@@ -741,4 +753,130 @@ mod tests {
741753

742754
Ok(())
743755
}
756+
757+
#[tokio::test]
758+
async fn test_list_files_cache() -> Result<(), DataFusionError> {
759+
let list_files_cache = Arc::new(DefaultListFilesCache::new(
760+
1024,
761+
Some(Duration::from_secs(1)),
762+
));
763+
764+
let rt = RuntimeEnvBuilder::new()
765+
.with_cache_manager(
766+
CacheManagerConfig::default()
767+
.with_list_files_cache(Some(list_files_cache)),
768+
)
769+
.build_arc()
770+
.unwrap();
771+
772+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
773+
774+
ctx.register_object_store(
775+
&Url::parse("mem://test_table").unwrap(),
776+
Arc::new(InMemory::new()),
777+
);
778+
779+
ctx.register_udtf(
780+
"list_files_cache",
781+
Arc::new(ListFilesCacheFunc::new(
782+
ctx.task_ctx().runtime_env().cache_manager.clone(),
783+
)),
784+
);
785+
786+
ctx.sql(
787+
"CREATE EXTERNAL TABLE src_table
788+
STORED AS PARQUET
789+
LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
790+
)
791+
.await?
792+
.collect()
793+
.await?;
794+
795+
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;
796+
797+
ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;
798+
799+
ctx.sql(
800+
"CREATE EXTERNAL TABLE test_table
801+
STORED AS PARQUET
802+
LOCATION 'mem://test_table/'
803+
",
804+
)
805+
.await?
806+
.collect()
807+
.await?;
808+
809+
let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()";
810+
let df = ctx
811+
.sql(sql)
812+
.await?
813+
.unnest_columns(&["metadata_list"])?
814+
.with_column_renamed("metadata_list", "metadata")?
815+
.unnest_columns(&["metadata"])?;
816+
817+
assert_eq!(
818+
2,
819+
df.clone()
820+
.filter(col("expires_in").is_not_null())?
821+
.count()
822+
.await?
823+
);
824+
825+
let df = df
826+
.with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")?
827+
.with_column_renamed(r#""metadata.e_tag""#, "etag")?
828+
.with_column(
829+
"filename",
830+
split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
831+
)?
832+
.select_columns(&[
833+
"metadata_size_bytes",
834+
"filename",
835+
"file_size_bytes",
836+
"etag",
837+
])?
838+
.sort(vec![col("filename").sort(true, false)])?;
839+
let rbs = df.collect().await?;
840+
assert_snapshot!(batches_to_string(&rbs),@r"
841+
+---------------------+-----------+-----------------+------+
842+
| metadata_size_bytes | filename | file_size_bytes | etag |
843+
+---------------------+-----------+-----------------+------+
844+
| 212 | 0.parquet | 3645 | 0 |
845+
| 212 | 1.parquet | 3645 | 1 |
846+
+---------------------+-----------+-----------------+------+
847+
");
848+
849+
Ok(())
850+
}
851+
852+
#[tokio::test]
853+
async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> {
854+
let rt = RuntimeEnvBuilder::new()
855+
.with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None))
856+
.build_arc()
857+
.unwrap();
858+
859+
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);
860+
861+
ctx.register_udtf(
862+
"list_files_cache",
863+
Arc::new(ListFilesCacheFunc::new(
864+
ctx.task_ctx().runtime_env().cache_manager.clone(),
865+
)),
866+
);
867+
868+
let rbs = ctx
869+
.sql("SELECT * FROM list_files_cache()")
870+
.await?
871+
.collect()
872+
.await?;
873+
assert_snapshot!(batches_to_string(&rbs),@r"
874+
+------+---------------------+------------+---------------+
875+
| path | metadata_size_bytes | expires_in | metadata_list |
876+
+------+---------------------+------------+---------------+
877+
+------+---------------------+------------+---------------+
878+
");
879+
880+
Ok(())
881+
}
744882
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::cache::cache_unit::DefaultFilesMetadataCache;
19+
use crate::cache::list_files_cache::ListFilesEntry;
1920
use crate::cache::{CacheAccessor, DefaultListFilesCache};
2021
use datafusion_common::stats::Precision;
2122
use datafusion_common::{Result, Statistics};
@@ -93,6 +94,9 @@ pub trait ListFilesCache:
9394

9495
/// Updates the cache with a new TTL (time-to-live).
9596
fn update_cache_ttl(&self, ttl: Option<Duration>);
97+
98+
/// Retrieves the information about the entries currently cached.
99+
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
96100
}
97101

98102
/// Generic file-embedded metadata used with [`FileMetadataCache`].

0 commit comments

Comments
 (0)