@@ -88,6 +88,9 @@ def read(self, table_name: str, data_format: str,
8888
8989 folder = self ._get_table_directory (table_name , False )
9090
91+ if not all (x in fields for x in key_fields ):
92+ raise ValueError ('Key fields MUST be included in columns list' )
93+
9194 if addnl_filter :
9295 # This is for special cases that are specific to an object
9396 query_str = addnl_filter
@@ -108,7 +111,7 @@ def read(self, table_name: str, data_format: str,
108111
109112 # If requesting a specific version of the data, handle that diff too
110113 sqvers = kwargs .pop ('sqvers' , None )
111- datasets = []
114+ final_df = pd . DataFrame ()
112115 try :
113116 dirs = Path (folder )
114117 try :
@@ -123,64 +126,43 @@ def read(self, table_name: str, data_format: str,
123126 if vers > max_vers :
124127 max_vers = vers
125128
126- datasets .append (ds .dataset (elem , format = 'parquet' ,
127- partitioning = 'hive' ))
129+ dataset = ds .dataset (elem , format = 'parquet' ,
130+ partitioning = 'hive' )
131+
132+ if not dataset :
133+ continue
134+
135+ tmp_df = self ._process_dataset (dataset , namespace , start ,
136+ end , fields , merge_fields ,
137+ query_str , ** kwargs )
138+ if not tmp_df .empty :
139+ final_df = pd .concat ([final_df , tmp_df ])
128140 except FileNotFoundError :
129141 pass
130142 except Exception as e :
131143 raise e
132144
133- # Now find the exact set of files we need to go over
145+ # Now operate on the coalesced data set
134146 cp_dataset = self ._get_cp_dataset (table_name , need_sqvers , sqvers ,
135147 view , start , end )
136148 if cp_dataset :
137- datasets .append (cp_dataset )
138-
139- if not datasets :
140- datasets = [ds .dataset (folder , format = 'parquet' ,
141- partitioning = 'hive' )]
142-
143- # Build the filters for predicate pushdown
144- master_schema = self ._build_master_schema (datasets )
145-
146- avail_fields = list (filter (lambda x : x in master_schema .names ,
147- fields ))
148- filters = self .build_ds_filters (
149- start , end , master_schema , merge_fields = merge_fields , ** kwargs )
150-
151- filtered_datasets = self ._get_filtered_fileset (
152- datasets , namespace )
153-
154- final_df = filtered_datasets \
155- .to_table (filter = filters , columns = avail_fields ) \
156- .to_pandas (self_destruct = True ) \
157- .query (query_str ) \
158- .sort_values (by = 'timestamp' )
159-
160- if merge_fields :
161- # These are key fields that need to be set right before we do
162- # the drop duplicates to avoid missing out all the data
163- for field in merge_fields :
164- newfld = merge_fields [field ]
165- if (field in final_df .columns and
166- newfld in final_df .columns ):
167- final_df [newfld ] = np .where (final_df [newfld ],
168- final_df [newfld ],
169- final_df [field ])
170- elif (field in final_df .columns and
171- newfld not in final_df .columns ):
172- final_df = final_df .rename (columns = {field : newfld })
149+ tmp_df = self ._process_dataset (cp_dataset , namespace , start ,
150+ end , fields , merge_fields ,
151+ query_str , ** kwargs )
152+ if not tmp_df .empty :
153+ final_df = pd .concat ([final_df , tmp_df ])
173154
174155 # Because of how coalescing works, we can have multiple duplicated
175156 # entries with same timestamp. Remove them
176- dupts_keys = key_fields + ['timestamp' ]
177- final_df = final_df .set_index (dupts_keys ) \
178- .query ('~index.duplicated(keep="last")' ) \
179- .reset_index ()
180- if (not final_df .empty and (view == 'latest' ) and
181- all (x in final_df .columns for x in key_fields )):
182- final_df = final_df .set_index (key_fields ) \
183- .query ('~index.duplicated(keep="last")' )
157+ if not final_df .empty :
158+ final_df = final_df .sort_values (by = ['timestamp' ])
159+ dupts_keys = key_fields + ['timestamp' ]
160+ final_df = final_df .set_index (dupts_keys ) \
161+ .query ('~index.duplicated(keep="last")' ) \
162+ .reset_index ()
163+ if not final_df .empty and (view == 'latest' ):
164+ final_df = final_df .set_index (key_fields ) \
165+ .query ('~index.duplicated(keep="last")' )
184166 except (pa .lib .ArrowInvalid , OSError ):
185167 return pd .DataFrame (columns = fields )
186168
@@ -483,6 +465,46 @@ def _get_avail_sqvers(self, table_name: str, coalesced: bool) -> List[str]:
483465
484466 return sqvers_list
485467
468+ def _process_dataset (self , dataset : ds .Dataset , namespace : List [str ],
469+ start : str , end : str , fields : List [str ],
470+ merge_fields : List [str ], query_str : str ,
471+ ** kwargs ) -> pd .DataFrame :
472+ '''Process provided dataset and return a pandas DF'''
473+
474+ # Build the filters for predicate pushdown
475+ master_schema = dataset .schema
476+
477+ avail_fields = [f for f in fields if f in master_schema .names ]
478+ filters = self .build_ds_filters (
479+ start , end , master_schema , merge_fields = merge_fields ,
480+ ** kwargs )
481+
482+ filtered_dataset = self ._get_filtered_fileset (dataset , namespace )
483+
484+ if not filtered_dataset .files :
485+ return pd .DataFrame ()
486+
487+ tmp_df = filtered_dataset \
488+ .to_table (filter = filters , columns = avail_fields ) \
489+ .to_pandas (self_destruct = True ) \
490+ .query (query_str )
491+
492+ if merge_fields and not tmp_df .empty :
493+ # These are key fields that need to be set right before we do
494+ # the drop duplicates to avoid missing out all the data
495+ for field in merge_fields :
496+ newfld = merge_fields [field ]
497+ if (field in tmp_df .columns and
498+ newfld in tmp_df .columns ):
499+ tmp_df [newfld ] = np .where (tmp_df [newfld ],
500+ tmp_df [newfld ],
501+ tmp_df [field ])
502+ elif (field in tmp_df .columns and
503+ newfld not in tmp_df .columns ):
504+ tmp_df = tmp_df .rename (columns = {field : newfld })
505+
506+ return tmp_df
507+
486508 def _get_cp_dataset (self , table_name : str , need_sqvers : bool ,
487509 sqvers : str , view : str , start_time : float ,
488510 end_time : float ) -> ds .dataset :
@@ -608,7 +630,7 @@ def _build_master_schema(self, datasets: list) -> pa.lib.Schema:
608630
609631 return msch
610632
611- def _get_filtered_fileset (self , datasets : list , namespace : list ) -> ds :
633+ def _get_filtered_fileset (self , dataset : ds , namespace : list ) -> ds :
612634 """Filter the dataset based on the namespace
613635
614636 We can use this method to filter out namespaces and hostnames based
@@ -620,48 +642,44 @@ def _get_filtered_fileset(self, datasets: list, namespace: list) -> ds:
620642 Returns:
621643 ds: pyarrow dataset of only the files that match filter
622644 """
623- filelist = []
624- for dataset in datasets :
625- if not namespace :
626- filelist .extend (dataset .files )
627- continue
645+ if not namespace :
646+ return dataset
628647
629- notlist = [x for x in namespace
648+ # Exclude not and regexp operators as they're handled elsewhere.
649+ excluded_ns = [x for x in namespace
630650 if x .startswith ('!' ) or x .startswith ('~!' )]
631- if notlist != namespace :
632- newns = [x for x in namespace
633- if not x .startswith ('!' ) and not x .startswith ('~!' )]
634- else :
635- newns = namespace
636- ns_filelist = []
637- chklist = dataset .files
638- for ns in newns or []:
651+ if excluded_ns != namespace :
652+ ns_filters = [x for x in namespace
653+ if not x .startswith ('!' ) and not x .startswith ('~!' )]
654+ else :
655+ ns_filters = namespace
656+ ns_filelist = []
657+ chklist = dataset .files
658+ for ns in ns_filters or []:
659+ if ns .startswith ('!' ):
660+ ns = ns [1 :]
661+ ns_filelist = \
662+ [x for x in chklist
663+ if not re .search (f'namespace={ ns } /' , x )]
664+ chklist = ns_filelist
665+ elif ns .startswith ('~' ):
666+ ns = ns [1 :]
639667 if ns .startswith ('!' ):
640668 ns = ns [1 :]
641669 ns_filelist = \
642670 [x for x in chklist
643671 if not re .search (f'namespace={ ns } /' , x )]
644672 chklist = ns_filelist
645- elif ns .startswith ('~' ):
646- ns = ns [1 :]
647- if ns .startswith ('!' ):
648- ns = ns [1 :]
649- ns_filelist = \
650- [x for x in chklist
651- if not re .search (f'namespace={ ns } /' , x )]
652- chklist = ns_filelist
653- else :
654- ns_filelist .extend (
655- [x for x in dataset .files
656- if re .search (f'namespace={ ns } /' , x )])
657673 else :
658674 ns_filelist .extend (
659675 [x for x in dataset .files
660676 if re .search (f'namespace={ ns } /' , x )])
677+ else :
678+ ns_filelist .extend (
679+ [x for x in dataset .files
680+ if re .search (f'namespace={ ns } /' , x )])
661681
662- filelist .extend (ns_filelist )
663-
664- return ds .dataset (filelist , format = 'parquet' , partitioning = 'hive' )
682+ return ds .dataset (ns_filelist , format = 'parquet' , partitioning = 'hive' )
665683
666684 def _cons_int_filter (self , keyfld : str , filter_str : str ) -> ds .Expression :
667685 '''Construct Integer filters with arithmetic operations'''
0 commit comments