Skip to content

Commit c37dd5e

Browse files
authored
feat: Cache Parquet metadata in built in parquet reader (#16971)
* feat: Cache Parquet metadata * Convert FileMetadata and FileMetadataCache to traits * Use as_any to respect MSRV * Use ObjectMeta as the key of FileMetadataCache
1 parent f4c5a63 commit c37dd5e

File tree

18 files changed

+536
-3
lines changed

18 files changed

+536
-3
lines changed

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,12 @@ config_namespace! {
559559
/// (reading) Use any available bloom filters when reading parquet files
560560
pub bloom_filter_on_read: bool, default = true
561561

562+
/// (reading) Whether or not to enable the caching of embedded metadata of Parquet files
563+
/// (footer and page metadata). Enabling it can offer substantial performance improvements
564+
/// for repeated queries over large files. By default, the cache is automatically
565+
/// invalidated when the underlying file is modified.
566+
pub cache_metadata: bool, default = false
567+
562568
// The following options affect writing to parquet files
563569
// and map to parquet::file::properties::WriterProperties
564570

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ impl ParquetOptions {
245245
binary_as_string: _, // not used for writer props
246246
coerce_int96: _, // not used for writer props
247247
skip_arrow_metadata: _,
248+
cache_metadata: _,
248249
} = self;
249250

250251
let mut builder = WriterProperties::builder()
@@ -522,6 +523,7 @@ mod tests {
522523
binary_as_string: defaults.binary_as_string,
523524
skip_arrow_metadata: defaults.skip_arrow_metadata,
524525
coerce_int96: None,
526+
cache_metadata: defaults.cache_metadata,
525527
}
526528
}
527529

@@ -634,6 +636,7 @@ mod tests {
634636
binary_as_string: global_options_defaults.binary_as_string,
635637
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
636638
coerce_int96: None,
639+
cache_metadata: global_options_defaults.cache_metadata,
637640
},
638641
column_specific_options,
639642
key_value_metadata,

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> {
254254
pub file_sort_order: Vec<Vec<SortExpr>>,
255255
/// Properties for decryption of Parquet files that use modular encryption
256256
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
257+
/// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and
258+
/// page metadata). Enabling it can offer substantial performance improvements for repeated
259+
/// queries over large files. By default, the cache is automatically invalidated when the
260+
/// underlying file is modified.
261+
pub cache_metadata: Option<bool>,
257262
}
258263

259264
impl Default for ParquetReadOptions<'_> {
@@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> {
266271
schema: None,
267272
file_sort_order: vec![],
268273
file_decryption_properties: None,
274+
cache_metadata: None,
269275
}
270276
}
271277
}
@@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> {
325331
self.file_decryption_properties = Some(file_decryption_properties);
326332
self
327333
}
334+
335+
/// Specify whether to enable or not metadata caching
336+
pub fn cache_metadata(mut self, cache_metadata: bool) -> Self {
337+
self.cache_metadata = Some(cache_metadata);
338+
self
339+
}
328340
}
329341

330342
/// Options that control the reading of ARROW files.
@@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
590602
if let Some(file_decryption_properties) = &self.file_decryption_properties {
591603
options.crypto.file_decryption = Some(file_decryption_properties.clone());
592604
}
605+
if let Some(cache_metadata) = self.cache_metadata {
606+
options.global.cache_metadata = cache_metadata;
607+
}
593608
let mut file_format = ParquetFormat::new().with_options(options);
594609

595610
if let Some(parquet_pruning) = self.parquet_pruning {

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator;
6363
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
6464
use datafusion_session::Session;
6565

66+
use crate::reader::CachedParquetFileReaderFactory;
6667
use crate::source::{parse_coerce_int96_string, ParquetSource};
6768
use async_trait::async_trait;
6869
use bytes::Bytes;
@@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat {
435436

436437
async fn create_physical_plan(
437438
&self,
438-
_state: &dyn Session,
439+
state: &dyn Session,
439440
conf: FileScanConfig,
440441
) -> Result<Arc<dyn ExecutionPlan>> {
441442
let mut metadata_size_hint = None;
@@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat {
446447

447448
let mut source = ParquetSource::new(self.options.clone());
448449

450+
// Use the CachedParquetFileReaderFactory when metadata caching is enabled
451+
if self.options.global.cache_metadata {
452+
if let Some(metadata_cache) =
453+
state.runtime_env().cache_manager.get_file_metadata_cache()
454+
{
455+
let store = state
456+
.runtime_env()
457+
.object_store(conf.object_store_url.clone())?;
458+
let cached_parquet_read_factory =
459+
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
460+
source =
461+
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
462+
}
463+
}
464+
449465
if let Some(metadata_size_hint) = metadata_size_hint {
450466
source = source.with_metadata_size_hint(metadata_size_hint)
451467
}

datafusion/datasource-parquet/src/reader.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
use crate::ParquetFileMetrics;
2222
use bytes::Bytes;
2323
use datafusion_datasource::file_meta::FileMeta;
24+
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
2425
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
2526
use futures::future::BoxFuture;
27+
use futures::FutureExt;
2628
use object_store::ObjectStore;
2729
use parquet::arrow::arrow_reader::ArrowReaderOptions;
2830
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
29-
use parquet::file::metadata::ParquetMetaData;
31+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
32+
use std::any::Any;
3033
use std::fmt::Debug;
3134
use std::ops::Range;
3235
use std::sync::Arc;
@@ -150,3 +153,139 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
150153
}))
151154
}
152155
}
156+
157+
/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page
158+
/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data.
159+
/// This reader always loads the entire metadata (including page index, unless the file is
160+
/// encrypted), even if not required by the current query, to ensure it is always available for
161+
/// those that need it.
162+
#[derive(Debug)]
163+
pub struct CachedParquetFileReaderFactory {
164+
store: Arc<dyn ObjectStore>,
165+
metadata_cache: Arc<dyn FileMetadataCache>,
166+
}
167+
168+
impl CachedParquetFileReaderFactory {
169+
pub fn new(
170+
store: Arc<dyn ObjectStore>,
171+
metadata_cache: Arc<dyn FileMetadataCache>,
172+
) -> Self {
173+
Self {
174+
store,
175+
metadata_cache,
176+
}
177+
}
178+
}
179+
180+
impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
181+
fn create_reader(
182+
&self,
183+
partition_index: usize,
184+
file_meta: FileMeta,
185+
metadata_size_hint: Option<usize>,
186+
metrics: &ExecutionPlanMetricsSet,
187+
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
188+
let file_metrics = ParquetFileMetrics::new(
189+
partition_index,
190+
file_meta.location().as_ref(),
191+
metrics,
192+
);
193+
let store = Arc::clone(&self.store);
194+
195+
let mut inner =
196+
ParquetObjectReader::new(store, file_meta.object_meta.location.clone())
197+
.with_file_size(file_meta.object_meta.size);
198+
199+
if let Some(hint) = metadata_size_hint {
200+
inner = inner.with_footer_size_hint(hint)
201+
};
202+
203+
Ok(Box::new(CachedParquetFileReader {
204+
inner,
205+
file_metrics,
206+
file_meta,
207+
metadata_cache: Arc::clone(&self.metadata_cache),
208+
}))
209+
}
210+
}
211+
212+
/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata
213+
/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
214+
/// updates the cache.
215+
pub(crate) struct CachedParquetFileReader {
216+
pub file_metrics: ParquetFileMetrics,
217+
pub inner: ParquetObjectReader,
218+
file_meta: FileMeta,
219+
metadata_cache: Arc<dyn FileMetadataCache>,
220+
}
221+
222+
impl AsyncFileReader for CachedParquetFileReader {
223+
fn get_bytes(
224+
&mut self,
225+
range: Range<u64>,
226+
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
227+
let bytes_scanned = range.end - range.start;
228+
self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
229+
self.inner.get_bytes(range)
230+
}
231+
232+
fn get_byte_ranges(
233+
&mut self,
234+
ranges: Vec<Range<u64>>,
235+
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
236+
where
237+
Self: Send,
238+
{
239+
let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
240+
self.file_metrics.bytes_scanned.add(total as usize);
241+
self.inner.get_byte_ranges(ranges)
242+
}
243+
244+
fn get_metadata<'a>(
245+
&'a mut self,
246+
options: Option<&'a ArrowReaderOptions>,
247+
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
248+
let file_meta = self.file_meta.clone();
249+
let metadata_cache = Arc::clone(&self.metadata_cache);
250+
251+
async move {
252+
let object_meta = &file_meta.object_meta;
253+
254+
// lookup if the metadata is already cached
255+
if let Some(metadata) = metadata_cache.get(object_meta) {
256+
if let Some(parquet_metadata) =
257+
metadata.as_any().downcast_ref::<CachedParquetMetaData>()
258+
{
259+
return Ok(Arc::clone(&parquet_metadata.0));
260+
}
261+
}
262+
263+
let mut reader = ParquetMetaDataReader::new();
264+
// the page index can only be loaded with unencrypted files
265+
if let Some(file_decryption_properties) =
266+
options.and_then(|o| o.file_decryption_properties())
267+
{
268+
reader =
269+
reader.with_decryption_properties(Some(file_decryption_properties));
270+
} else {
271+
reader = reader.with_page_indexes(true);
272+
}
273+
reader.try_load(&mut self.inner, object_meta.size).await?;
274+
let metadata = Arc::new(reader.finish()?);
275+
let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata)));
276+
277+
metadata_cache.put(object_meta, cached_metadata);
278+
Ok(metadata)
279+
}
280+
.boxed()
281+
}
282+
}
283+
284+
/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
285+
struct CachedParquetMetaData(Arc<ParquetMetaData>);
286+
287+
impl FileMetadata for CachedParquetMetaData {
288+
fn as_any(&self) -> &dyn Any {
289+
self
290+
}
291+
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::cache::cache_unit::DefaultFilesMetadataCache;
1819
use crate::cache::CacheAccessor;
1920
use datafusion_common::{Result, Statistics};
2021
use object_store::path::Path;
2122
use object_store::ObjectMeta;
23+
use std::any::Any;
2224
use std::fmt::{Debug, Formatter};
2325
use std::sync::Arc;
2426

@@ -32,6 +34,19 @@ pub type FileStatisticsCache =
3234
pub type ListFilesCache =
3335
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
3436

37+
/// Represents generic file-embedded metadata.
38+
pub trait FileMetadata: Any + Send + Sync {
39+
/// Returns the file metadata as [`Any`] so that it can be downcasted to a specific
40+
/// implementation.
41+
fn as_any(&self) -> &dyn Any;
42+
}
43+
44+
/// Cache to store file-embedded metadata.
45+
pub trait FileMetadataCache:
46+
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
47+
{
48+
}
49+
3550
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
3651
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3752
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
@@ -44,10 +59,17 @@ impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
4459
}
4560
}
4661

62+
impl Debug for dyn FileMetadataCache {
63+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64+
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
65+
}
66+
}
67+
4768
#[derive(Default, Debug)]
4869
pub struct CacheManager {
4970
file_statistic_cache: Option<FileStatisticsCache>,
5071
list_files_cache: Option<ListFilesCache>,
72+
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
5173
}
5274

5375
impl CacheManager {
@@ -59,6 +81,13 @@ impl CacheManager {
5981
if let Some(lc) = &config.list_files_cache {
6082
manager.list_files_cache = Some(Arc::clone(lc))
6183
}
84+
if let Some(mc) = &config.file_metadata_cache {
85+
manager.file_metadata_cache = Some(Arc::clone(mc));
86+
} else {
87+
manager.file_metadata_cache =
88+
Some(Arc::new(DefaultFilesMetadataCache::default()));
89+
}
90+
6291
Ok(Arc::new(manager))
6392
}
6493

@@ -71,6 +100,11 @@ impl CacheManager {
71100
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
72101
self.list_files_cache.clone()
73102
}
103+
104+
/// Get the file embedded metadata cache.
105+
pub fn get_file_metadata_cache(&self) -> Option<Arc<dyn FileMetadataCache>> {
106+
self.file_metadata_cache.clone()
107+
}
74108
}
75109

76110
#[derive(Clone, Default)]
@@ -86,6 +120,10 @@ pub struct CacheManagerConfig {
86120
/// location.
87121
/// Default is disable.
88122
pub list_files_cache: Option<ListFilesCache>,
123+
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
124+
/// data file (e.g., Parquet footer and page metadata).
125+
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
126+
pub file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
89127
}
90128

91129
impl CacheManagerConfig {
@@ -101,4 +139,12 @@ impl CacheManagerConfig {
101139
self.list_files_cache = cache;
102140
self
103141
}
142+
143+
pub fn with_file_metadata_cache(
144+
mut self,
145+
cache: Option<Arc<dyn FileMetadataCache>>,
146+
) -> Self {
147+
self.file_metadata_cache = cache;
148+
self
149+
}
104150
}

0 commit comments

Comments
 (0)