|
21 | 21 | use crate::ParquetFileMetrics; |
22 | 22 | use bytes::Bytes; |
23 | 23 | use datafusion_datasource::file_meta::FileMeta; |
| 24 | +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; |
24 | 25 | use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; |
25 | 26 | use futures::future::BoxFuture; |
| 27 | +use futures::FutureExt; |
26 | 28 | use object_store::ObjectStore; |
27 | 29 | use parquet::arrow::arrow_reader::ArrowReaderOptions; |
28 | 30 | use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; |
29 | | -use parquet::file::metadata::ParquetMetaData; |
| 31 | +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; |
| 32 | +use std::any::Any; |
30 | 33 | use std::fmt::Debug; |
31 | 34 | use std::ops::Range; |
32 | 35 | use std::sync::Arc; |
@@ -150,3 +153,139 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { |
150 | 153 | })) |
151 | 154 | } |
152 | 155 | } |
| 156 | + |
| 157 | +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page |
| 158 | +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. |
| 159 | +/// This reader always loads the entire metadata (including page index, unless the file is |
| 160 | +/// encrypted), even if not required by the current query, to ensure it is always available for |
| 161 | +/// those that need it. |
| 162 | +#[derive(Debug)] |
| 163 | +pub struct CachedParquetFileReaderFactory { |
| 164 | + store: Arc<dyn ObjectStore>, |
| 165 | + metadata_cache: Arc<dyn FileMetadataCache>, |
| 166 | +} |
| 167 | + |
| 168 | +impl CachedParquetFileReaderFactory { |
| 169 | + pub fn new( |
| 170 | + store: Arc<dyn ObjectStore>, |
| 171 | + metadata_cache: Arc<dyn FileMetadataCache>, |
| 172 | + ) -> Self { |
| 173 | + Self { |
| 174 | + store, |
| 175 | + metadata_cache, |
| 176 | + } |
| 177 | + } |
| 178 | +} |
| 179 | + |
| 180 | +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { |
| 181 | + fn create_reader( |
| 182 | + &self, |
| 183 | + partition_index: usize, |
| 184 | + file_meta: FileMeta, |
| 185 | + metadata_size_hint: Option<usize>, |
| 186 | + metrics: &ExecutionPlanMetricsSet, |
| 187 | + ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> { |
| 188 | + let file_metrics = ParquetFileMetrics::new( |
| 189 | + partition_index, |
| 190 | + file_meta.location().as_ref(), |
| 191 | + metrics, |
| 192 | + ); |
| 193 | + let store = Arc::clone(&self.store); |
| 194 | + |
| 195 | + let mut inner = |
| 196 | + ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) |
| 197 | + .with_file_size(file_meta.object_meta.size); |
| 198 | + |
| 199 | + if let Some(hint) = metadata_size_hint { |
| 200 | + inner = inner.with_footer_size_hint(hint) |
| 201 | + }; |
| 202 | + |
| 203 | + Ok(Box::new(CachedParquetFileReader { |
| 204 | + inner, |
| 205 | + file_metrics, |
| 206 | + file_meta, |
| 207 | + metadata_cache: Arc::clone(&self.metadata_cache), |
| 208 | + })) |
| 209 | + } |
| 210 | +} |
| 211 | + |
| 212 | +/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata |
| 213 | +/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then |
| 214 | +/// updates the cache. |
| 215 | +pub(crate) struct CachedParquetFileReader { |
| 216 | + pub file_metrics: ParquetFileMetrics, |
| 217 | + pub inner: ParquetObjectReader, |
| 218 | + file_meta: FileMeta, |
| 219 | + metadata_cache: Arc<dyn FileMetadataCache>, |
| 220 | +} |
| 221 | + |
| 222 | +impl AsyncFileReader for CachedParquetFileReader { |
| 223 | + fn get_bytes( |
| 224 | + &mut self, |
| 225 | + range: Range<u64>, |
| 226 | + ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { |
| 227 | + let bytes_scanned = range.end - range.start; |
| 228 | + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); |
| 229 | + self.inner.get_bytes(range) |
| 230 | + } |
| 231 | + |
| 232 | + fn get_byte_ranges( |
| 233 | + &mut self, |
| 234 | + ranges: Vec<Range<u64>>, |
| 235 | + ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> |
| 236 | + where |
| 237 | + Self: Send, |
| 238 | + { |
| 239 | + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); |
| 240 | + self.file_metrics.bytes_scanned.add(total as usize); |
| 241 | + self.inner.get_byte_ranges(ranges) |
| 242 | + } |
| 243 | + |
| 244 | + fn get_metadata<'a>( |
| 245 | + &'a mut self, |
| 246 | + options: Option<&'a ArrowReaderOptions>, |
| 247 | + ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> { |
| 248 | + let file_meta = self.file_meta.clone(); |
| 249 | + let metadata_cache = Arc::clone(&self.metadata_cache); |
| 250 | + |
| 251 | + async move { |
| 252 | + let object_meta = &file_meta.object_meta; |
| 253 | + |
| 254 | + // lookup if the metadata is already cached |
| 255 | + if let Some(metadata) = metadata_cache.get(object_meta) { |
| 256 | + if let Some(parquet_metadata) = |
| 257 | + metadata.as_any().downcast_ref::<CachedParquetMetaData>() |
| 258 | + { |
| 259 | + return Ok(Arc::clone(&parquet_metadata.0)); |
| 260 | + } |
| 261 | + } |
| 262 | + |
| 263 | + let mut reader = ParquetMetaDataReader::new(); |
| 264 | + // the page index can only be loaded with unencrypted files |
| 265 | + if let Some(file_decryption_properties) = |
| 266 | + options.and_then(|o| o.file_decryption_properties()) |
| 267 | + { |
| 268 | + reader = |
| 269 | + reader.with_decryption_properties(Some(file_decryption_properties)); |
| 270 | + } else { |
| 271 | + reader = reader.with_page_indexes(true); |
| 272 | + } |
| 273 | + reader.try_load(&mut self.inner, object_meta.size).await?; |
| 274 | + let metadata = Arc::new(reader.finish()?); |
| 275 | + let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); |
| 276 | + |
| 277 | + metadata_cache.put(object_meta, cached_metadata); |
| 278 | + Ok(metadata) |
| 279 | + } |
| 280 | + .boxed() |
| 281 | + } |
| 282 | +} |
| 283 | + |
| 284 | +/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. |
| 285 | +struct CachedParquetMetaData(Arc<ParquetMetaData>); |
| 286 | + |
| 287 | +impl FileMetadata for CachedParquetMetaData { |
| 288 | + fn as_any(&self) -> &dyn Any { |
| 289 | + self |
| 290 | + } |
| 291 | +} |
0 commit comments