Skip to content

Commit 69b6bf5

Browse files
committed
Parquet: Handle reading data from multiple sqvers correctly
Reads that involved reading from multiple different sqvers versions was not handled correctly causing such reads to return incorrect data. This patch addresses that. Essentially, we need to read each sqvers folder individually and then join all the dataframes together. Signed-off-by: Dinesh Dutt <[email protected]>
1 parent b6db6ef commit 69b6bf5

File tree

1 file changed

+96
-78
lines changed

1 file changed

+96
-78
lines changed

suzieq/db/parquet/parquetdb.py

Lines changed: 96 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ def read(self, table_name: str, data_format: str,
8888

8989
folder = self._get_table_directory(table_name, False)
9090

91+
if not all(x in fields for x in key_fields):
92+
raise ValueError('Key fields MUST be included in columns list')
93+
9194
if addnl_filter:
9295
# This is for special cases that are specific to an object
9396
query_str = addnl_filter
@@ -108,7 +111,7 @@ def read(self, table_name: str, data_format: str,
108111

109112
# If requesting a specific version of the data, handle that diff too
110113
sqvers = kwargs.pop('sqvers', None)
111-
datasets = []
114+
final_df = pd.DataFrame()
112115
try:
113116
dirs = Path(folder)
114117
try:
@@ -123,64 +126,43 @@ def read(self, table_name: str, data_format: str,
123126
if vers > max_vers:
124127
max_vers = vers
125128

126-
datasets.append(ds.dataset(elem, format='parquet',
127-
partitioning='hive'))
129+
dataset = ds.dataset(elem, format='parquet',
130+
partitioning='hive')
131+
132+
if not dataset:
133+
continue
134+
135+
tmp_df = self._process_dataset(dataset, namespace, start,
136+
end, fields, merge_fields,
137+
query_str, **kwargs)
138+
if not tmp_df.empty:
139+
final_df = pd.concat([final_df, tmp_df])
128140
except FileNotFoundError:
129141
pass
130142
except Exception as e:
131143
raise e
132144

133-
# Now find the exact set of files we need to go over
145+
# Now operate on the coalesced data set
134146
cp_dataset = self._get_cp_dataset(table_name, need_sqvers, sqvers,
135147
view, start, end)
136148
if cp_dataset:
137-
datasets.append(cp_dataset)
138-
139-
if not datasets:
140-
datasets = [ds.dataset(folder, format='parquet',
141-
partitioning='hive')]
142-
143-
# Build the filters for predicate pushdown
144-
master_schema = self._build_master_schema(datasets)
145-
146-
avail_fields = list(filter(lambda x: x in master_schema.names,
147-
fields))
148-
filters = self.build_ds_filters(
149-
start, end, master_schema, merge_fields=merge_fields, **kwargs)
150-
151-
filtered_datasets = self._get_filtered_fileset(
152-
datasets, namespace)
153-
154-
final_df = filtered_datasets \
155-
.to_table(filter=filters, columns=avail_fields) \
156-
.to_pandas(self_destruct=True) \
157-
.query(query_str) \
158-
.sort_values(by='timestamp')
159-
160-
if merge_fields:
161-
# These are key fields that need to be set right before we do
162-
# the drop duplicates to avoid missing out all the data
163-
for field in merge_fields:
164-
newfld = merge_fields[field]
165-
if (field in final_df.columns and
166-
newfld in final_df.columns):
167-
final_df[newfld] = np.where(final_df[newfld],
168-
final_df[newfld],
169-
final_df[field])
170-
elif (field in final_df.columns and
171-
newfld not in final_df.columns):
172-
final_df = final_df.rename(columns={field: newfld})
149+
tmp_df = self._process_dataset(cp_dataset, namespace, start,
150+
end, fields, merge_fields,
151+
query_str, **kwargs)
152+
if not tmp_df.empty:
153+
final_df = pd.concat([final_df, tmp_df])
173154

174155
# Because of how coalescing works, we can have multiple duplicated
175156
# entries with same timestamp. Remove them
176-
dupts_keys = key_fields + ['timestamp']
177-
final_df = final_df.set_index(dupts_keys) \
178-
.query('~index.duplicated(keep="last")') \
179-
.reset_index()
180-
if (not final_df.empty and (view == 'latest') and
181-
all(x in final_df.columns for x in key_fields)):
182-
final_df = final_df.set_index(key_fields) \
183-
.query('~index.duplicated(keep="last")')
157+
if not final_df.empty:
158+
final_df = final_df.sort_values(by=['timestamp'])
159+
dupts_keys = key_fields + ['timestamp']
160+
final_df = final_df.set_index(dupts_keys) \
161+
.query('~index.duplicated(keep="last")') \
162+
.reset_index()
163+
if not final_df.empty and (view == 'latest'):
164+
final_df = final_df.set_index(key_fields) \
165+
.query('~index.duplicated(keep="last")')
184166
except (pa.lib.ArrowInvalid, OSError):
185167
return pd.DataFrame(columns=fields)
186168

@@ -483,6 +465,46 @@ def _get_avail_sqvers(self, table_name: str, coalesced: bool) -> List[str]:
483465

484466
return sqvers_list
485467

468+
def _process_dataset(self, dataset: ds.Dataset, namespace: List[str],
469+
start: str, end: str, fields: List[str],
470+
merge_fields: List[str], query_str: str,
471+
**kwargs) -> pd.DataFrame:
472+
'''Process provided dataset and return a pandas DF'''
473+
474+
# Build the filters for predicate pushdown
475+
master_schema = dataset.schema
476+
477+
avail_fields = [f for f in fields if f in master_schema.names]
478+
filters = self.build_ds_filters(
479+
start, end, master_schema, merge_fields=merge_fields,
480+
**kwargs)
481+
482+
filtered_dataset = self._get_filtered_fileset(dataset, namespace)
483+
484+
if not filtered_dataset.files:
485+
return pd.DataFrame()
486+
487+
tmp_df = filtered_dataset \
488+
.to_table(filter=filters, columns=avail_fields) \
489+
.to_pandas(self_destruct=True) \
490+
.query(query_str)
491+
492+
if merge_fields and not tmp_df.empty:
493+
# These are key fields that need to be set right before we do
494+
# the drop duplicates to avoid missing out all the data
495+
for field in merge_fields:
496+
newfld = merge_fields[field]
497+
if (field in tmp_df.columns and
498+
newfld in tmp_df.columns):
499+
tmp_df[newfld] = np.where(tmp_df[newfld],
500+
tmp_df[newfld],
501+
tmp_df[field])
502+
elif (field in tmp_df.columns and
503+
newfld not in tmp_df.columns):
504+
tmp_df = tmp_df.rename(columns={field: newfld})
505+
506+
return tmp_df
507+
486508
def _get_cp_dataset(self, table_name: str, need_sqvers: bool,
487509
sqvers: str, view: str, start_time: float,
488510
end_time: float) -> ds.dataset:
@@ -608,7 +630,7 @@ def _build_master_schema(self, datasets: list) -> pa.lib.Schema:
608630

609631
return msch
610632

611-
def _get_filtered_fileset(self, datasets: list, namespace: list) -> ds:
633+
def _get_filtered_fileset(self, dataset: ds, namespace: list) -> ds:
612634
"""Filter the dataset based on the namespace
613635
614636
We can use this method to filter out namespaces and hostnames based
@@ -620,48 +642,44 @@ def _get_filtered_fileset(self, datasets: list, namespace: list) -> ds:
620642
Returns:
621643
ds: pyarrow dataset of only the files that match filter
622644
"""
623-
filelist = []
624-
for dataset in datasets:
625-
if not namespace:
626-
filelist.extend(dataset.files)
627-
continue
645+
if not namespace:
646+
return dataset
628647

629-
notlist = [x for x in namespace
648+
# Exclude not and regexp operators as they're handled elsewhere.
649+
excluded_ns = [x for x in namespace
630650
if x.startswith('!') or x.startswith('~!')]
631-
if notlist != namespace:
632-
newns = [x for x in namespace
633-
if not x.startswith('!') and not x.startswith('~!')]
634-
else:
635-
newns = namespace
636-
ns_filelist = []
637-
chklist = dataset.files
638-
for ns in newns or []:
651+
if excluded_ns != namespace:
652+
ns_filters = [x for x in namespace
653+
if not x.startswith('!') and not x.startswith('~!')]
654+
else:
655+
ns_filters = namespace
656+
ns_filelist = []
657+
chklist = dataset.files
658+
for ns in ns_filters or []:
659+
if ns.startswith('!'):
660+
ns = ns[1:]
661+
ns_filelist = \
662+
[x for x in chklist
663+
if not re.search(f'namespace={ns}/', x)]
664+
chklist = ns_filelist
665+
elif ns.startswith('~'):
666+
ns = ns[1:]
639667
if ns.startswith('!'):
640668
ns = ns[1:]
641669
ns_filelist = \
642670
[x for x in chklist
643671
if not re.search(f'namespace={ns}/', x)]
644672
chklist = ns_filelist
645-
elif ns.startswith('~'):
646-
ns = ns[1:]
647-
if ns.startswith('!'):
648-
ns = ns[1:]
649-
ns_filelist = \
650-
[x for x in chklist
651-
if not re.search(f'namespace={ns}/', x)]
652-
chklist = ns_filelist
653-
else:
654-
ns_filelist.extend(
655-
[x for x in dataset.files
656-
if re.search(f'namespace={ns}/', x)])
657673
else:
658674
ns_filelist.extend(
659675
[x for x in dataset.files
660676
if re.search(f'namespace={ns}/', x)])
677+
else:
678+
ns_filelist.extend(
679+
[x for x in dataset.files
680+
if re.search(f'namespace={ns}/', x)])
661681

662-
filelist.extend(ns_filelist)
663-
664-
return ds.dataset(filelist, format='parquet', partitioning='hive')
682+
return ds.dataset(ns_filelist, format='parquet', partitioning='hive')
665683

666684
def _cons_int_filter(self, keyfld: str, filter_str: str) -> ds.Expression:
667685
'''Construct Integer filters with arithmetic operations'''

0 commit comments

Comments
 (0)