@@ -4,7 +4,9 @@ use crate::remotefs::RemoteFs;
44use crate :: { app_metrics, CubeError } ;
55use chrono:: Utc ;
66use datafusion:: cube_ext;
7+ use datafusion:: parquet:: file:: reader:: { FileReader , SerializedFileReader } ;
78use std:: collections:: HashSet ;
9+ use std:: fs:: { DirEntry , File } ;
810use std:: path:: Path ;
911use std:: sync:: Arc ;
1012use tokio:: time:: Duration ;
@@ -219,27 +221,11 @@ impl RemoteFsCleanup {
219221 if !file_name. ends_with ( ".parquet" ) {
220222 continue ;
221223 }
222-
223- let should_deleted = if let Ok ( metadata) = entry. metadata ( ) {
224- match metadata. created ( ) {
225- Ok ( created) => {
226- if created
227- . elapsed ( )
228- . map_or ( true , |e| e < cleanup_local_files_delay)
229- {
230- false
231- } else {
232- true
233- }
234- }
235- Err ( e) => {
236- log:: error!(
237- "error while getting created time for file {:?}:{}" ,
238- entry. file_name( ) ,
239- e
240- ) ;
241- false
242- }
224+ let should_deleted = if let Some ( created) = get_timestamp ( & entry) {
225+ if created < cleanup_local_files_delay {
226+ false
227+ } else {
228+ true
243229 }
244230 } else {
245231 false
@@ -286,6 +272,38 @@ impl RemoteFsCleanup {
286272 }
287273 }
288274}
275+
276+ fn get_timestamp ( entry : & DirEntry ) -> Option < Duration > {
277+ let path = entry. path ( ) ;
278+ let file = match File :: open ( & path) {
279+ Ok ( file) => file,
280+ Err ( e) => {
281+ log:: error!( "Error opening file {:?}: {}" , path, e) ;
282+ return None ;
283+ }
284+ } ;
285+ let reader = match SerializedFileReader :: new ( file) {
286+ Ok ( reader) => reader,
287+ Err ( e) => {
288+ log:: error!( "Error reading Parquet file {:?}: {}" , path, e) ;
289+ return None ;
290+ }
291+ } ;
292+ let metadata = reader. metadata ( ) . file_metadata ( ) . key_value_metadata ( ) ;
293+ if let Some ( key_values) = metadata {
294+ let created = key_values
295+ . iter ( )
296+ . find ( |key_value| key_value. key == "created_at" ) ;
297+ if let Some ( created) = created {
298+ if let Some ( value) = created. value . as_ref ( ) {
299+ if let Ok ( timestamp) = value. parse :: < u64 > ( ) {
300+ return Some ( Duration :: from_secs ( timestamp) ) ;
301+ }
302+ }
303+ }
304+ }
305+ None
306+ }
289307#[ cfg( test) ]
290308mod test {
291309 use super :: * ;
0 commit comments