Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 183 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

//! Functions that are query-able and searchable via the `\h` command

use datafusion_common::instant::Instant;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::array::{
DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
TimestampMillisecondArray, UInt64Array,
};
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
Expand Down Expand Up @@ -697,3 +702,179 @@ impl TableFunctionImpl for StatisticsCacheFunc {
Ok(Arc::new(statistics_cache))
}
}

// Implementation of the `list_files_cache` table function in datafusion-cli.
///
/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object.
/// DataFusion uses these cached results to plan queries against external tables.
/// # Schema
/// ```sql
/// > describe select * from list_files_cache();
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// | column_name | data_type | is_nullable |
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// | table | Utf8 | NO |
/// | path | Utf8 | NO |
/// | metadata_size_bytes | UInt64 | NO |
/// | expires_in | Duration(ms) | YES |
/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES |
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// ```
#[derive(Debug)]
struct ListFilesCacheTable {
schema: SchemaRef,
batch: RecordBatch,
}

#[async_trait]
impl TableProvider for ListFilesCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}

#[derive(Debug)]
pub struct ListFilesCacheFunc {
cache_manager: Arc<CacheManager>,
}

impl ListFilesCacheFunc {
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
Self { cache_manager }
}
}

impl TableFunctionImpl for ListFilesCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
if !exprs.is_empty() {
return plan_err!("list_files_cache should have no arguments");
}

let nested_fields = Fields::from(vec![
Field::new("file_path", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("file_size_bytes", DataType::UInt64, false),
Field::new("e_tag", DataType::Utf8, true),
Field::new("version", DataType::Utf8, true),
]);

let metadata_field =
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);

let schema = Arc::new(Schema::new(vec![
Field::new("table", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),
Field::new("metadata_size_bytes", DataType::UInt64, false),
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
Field::new(
"expires_in",
DataType::Duration(TimeUnit::Millisecond),
true,
),
Field::new(
"metadata_list",
DataType::List(Arc::new(metadata_field.clone())),
true,
),
]));

let mut table_arr = vec![];
let mut path_arr = vec![];
let mut metadata_size_bytes_arr = vec![];
let mut expires_arr = vec![];

let mut file_path_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut etag_arr = vec![];
let mut version_arr = vec![];
let mut offsets: Vec<i32> = vec![0];

if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
let now = Instant::now();
let mut current_offset: i32 = 0;

for (path, entry) in list_files_cache.list_entries() {
table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string()));
path_arr.push(path.path.to_string());
metadata_size_bytes_arr.push(entry.size_bytes as u64);
// calculates time left before entry expires
expires_arr.push(
entry
.expires
.map(|t| t.duration_since(now).as_millis() as i64),
);

for meta in entry.metas.iter() {
file_path_arr.push(meta.location.to_string());
file_modified_arr.push(meta.last_modified.timestamp_millis());
file_size_bytes_arr.push(meta.size);
etag_arr.push(meta.e_tag.clone());
version_arr.push(meta.version.clone());
}
current_offset += entry.metas.len() as i32;
offsets.push(current_offset);
}
}

let struct_arr = StructArray::new(
nested_fields,
vec![
Arc::new(StringArray::from(file_path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(etag_arr)),
Arc::new(StringArray::from(version_arr)),
],
None,
);

let offsets_buffer: OffsetBuffer<i32> =
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(table_arr)),
Arc::new(StringArray::from(path_arr)),
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
Arc::new(DurationMillisecondArray::from(expires_arr)),
Arc::new(GenericListArray::new(
Arc::new(metadata_field),
offsets_buffer,
Arc::new(struct_arr),
None,
)),
],
)?;

let list_files_cache = ListFilesCacheTable { schema, batch };
Ok(Arc::new(list_files_cache))
}
}
113 changes: 110 additions & 3 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
};
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
Expand Down Expand Up @@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
)),
);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
Expand Down Expand Up @@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
prelude::ParquetReadOptions,
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
use object_store::memory::InMemory;
use url::Url;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
Expand Down Expand Up @@ -741,4 +753,99 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_list_files_cache() -> Result<(), DataFusionError> {
let list_files_cache = Arc::new(DefaultListFilesCache::new(
1024,
Some(Duration::from_secs(1)),
));

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
CacheManagerConfig::default()
.with_list_files_cache(Some(list_files_cache)),
)
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);

ctx.register_object_store(
&Url::parse("mem://test_table").unwrap(),
Arc::new(InMemory::new()),
);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

ctx.sql(
"CREATE EXTERNAL TABLE src_table
STORED AS PARQUET
LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
)
.await?
.collect()
.await?;

ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;

ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;

ctx.sql(
"CREATE EXTERNAL TABLE test_table
STORED AS PARQUET
LOCATION 'mem://test_table/'
",
)
.await?
.collect()
.await?;

let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()";
let df = ctx
.sql(sql)
.await?
.unnest_columns(&["metadata_list"])?
.with_column_renamed("metadata_list", "metadata")?
.unnest_columns(&["metadata"])?;

assert_eq!(
2,
df.clone()
.filter(col("expires_in").is_not_null())?
.count()
.await?
);

let df = df
.with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")?
.with_column_renamed(r#""metadata.e_tag""#, "etag")?
.with_column(
"filename",
split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
)?
.select_columns(&[
"metadata_size_bytes",
"filename",
"file_size_bytes",
"etag",
])?
.sort(vec![col("filename").sort(true, false)])?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+---------------------+-----------+-----------------+------+
| metadata_size_bytes | filename | file_size_bytes | etag |
+---------------------+-----------+-----------------+------+
| 212 | 0.parquet | 3645 | 0 |
| 212 | 1.parquet | 3645 | 1 |
+---------------------+-----------+-----------------+------+
");

Ok(())
}
}
7 changes: 6 additions & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
Expand Down Expand Up @@ -565,7 +566,11 @@ impl TableProvider for ListingTable {

// Invalidate cache entries for this table if they exist
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
let _ = lfc.remove(table_path.prefix());
let key = TableScopedPath {
table: table_path.get_table_ref().clone(),
path: table_path.prefix().clone(),
};
let _ = lfc.remove(&key);
}

// Sink related option, apart from format
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory {
))?
.create(session_state, &cmd.options)?;

let mut table_path = ListingTableUrl::parse(&cmd.location)?;
let mut table_path =
ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone());
let file_extension = match table_path.is_collection() {
// Setting the extension to be empty instead of allowing the default extension seems
// odd, but was done to ensure existing behavior isn't modified. It seems like this
Expand Down Expand Up @@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory {
}
None => format!("*.{}", cmd.file_type.to_lowercase()),
};
table_path = table_path.with_glob(glob.as_ref())?;
table_path = table_path
.with_glob(glob.as_ref())?
.with_table_ref(cmd.name.clone());
}
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
Expand Down
Loading
Loading