Skip to content

Commit 62179a0

Browse files
authored
Merge pull request #189 from lanl/issue188
new parquet reader/writer and removed old parquet backend
2 parents d9bc417 + 55f1e15 commit 62179a0

File tree

15 files changed

+159
-189
lines changed

15 files changed

+159
-189
lines changed

docs/backends.rst

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,3 @@ GUFI
5151
.. automodule:: dsi.backends.gufi
5252
:members:
5353
:special-members: __init__
54-
55-
Parquet
56-
--------
57-
58-
.. automodule:: dsi.backends.parquet
59-
:members:
60-
:special-members: __init__

docs/cr_intro.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ All DSI backends include:
4242
- DuckDB: In-process SQL database designed for fast queries on large data files
4343
- GUFI: the `Grand Unified File Index system <https://github.com/mar-file-system/GUFI>`_ ; developed at LANL.
4444
GUFI is a fast, secure metadata search across a filesystem accessible to both privileged and unprivileged users.
45-
- Parquet: a columnar storage format for `Apache Hadoop <https://hadoop.apache.org>`_.
4645

4746
DSI Core
4847
~~~~~~~~

dsi/backends/filesystem.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ class Filesystem(Backend):
7878
# Declare store types
7979
GUFI_STORE = "gufi"
8080
SQLITE_STORE = "sqlite"
81-
PARQUET_STORE = "parquet"
8281

8382
# Declare comparison types
8483
GT = ">"

dsi/backends/parquet.py

Lines changed: 0 additions & 114 deletions
This file was deleted.

dsi/backends/tests/test_parquet.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

dsi/cli.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,7 @@ def export_table(self, table_name, filename):
219219
elif file_extension.lower() == "csv":
220220
self.t.load_module('plugin', "Csv_Writer", "writer", filename = filename, table_name = table_name)
221221
elif file_extension.lower() in ['pq', 'parquet']:
222-
table_data = self.t.active_metadata[table_name]
223-
df = pd.DataFrame(table_data)
224-
df.to_parquet(filename, engine='pyarrow', index=False)
222+
self.t.load_module('plugin', "Parquet_Writer", "writer", filename = filename, table_name = table_name)
225223
else:
226224
success_load = False
227225
except Exception as e:
@@ -481,14 +479,8 @@ def read(self, args):
481479
self.t.load_module('plugin', "YAML1", "reader", filenames = dbfile)
482480
elif file_extension.lower() == 'json':
483481
self.t.load_module('plugin', "JSON", "reader", filenames = dbfile)
484-
elif file_extension.lower() == 'pq' or file_extension.lower() == 'parquet':
485-
self.t.load_module('backend','Parquet','back-write', filename=dbfile)
486-
data = OrderedDict(self.t.artifact_handler(interaction_type="query")) #Parquet's query() returns a normal dict
487-
if table_name is not None:
488-
self.t.active_metadata[table_name] = data
489-
else:
490-
self.t.active_metadata["Parquet"] = data
491-
self.t.unload_module('backend','Parquet','back-write')
482+
elif file_extension.lower() in ['pq', 'parquet']:
483+
self.t.load_module('plugin', "Parquet", "reader", filenames = dbfile, table_name = table_name)
492484
except Exception as e:
493485
print(f"read ERROR: {e}\n")
494486
self.t.active_metadata = OrderedDict()

dsi/core.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ class Terminal():
2424
for more information.
2525
"""
2626
BACKEND_PREFIX = ['dsi.backends']
27-
BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'parquet', 'duckdb', 'hpss']
27+
BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'duckdb', 'hpss']
2828
PLUGIN_PREFIX = ['dsi.plugins']
2929
PLUGIN_IMPLEMENTATIONS = ['env', 'file_reader', 'file_writer', 'collection_reader']
3030
VALID_ENV = ['Hostname', 'SystemKernel', 'GitInfo']
31-
VALID_READERS = ['Bueno', 'Csv', 'YAML1', 'TOML1', 'Schema', 'JSON', 'MetadataReader1', 'Ensemble', 'Cloverleaf', 'Dict']
31+
VALID_READERS = ['Bueno', 'Csv', 'YAML1', 'TOML1', 'Parquet', 'Schema', 'JSON', 'MetadataReader1', 'Ensemble', 'Cloverleaf', 'Dict']
3232
VALID_DATACARDS = ['Oceans11Datacard', 'DublinCoreDatacard', 'SchemaOrgDatacard', 'GoogleDatacard']
33-
VALID_WRITERS = ['ER_Diagram', 'Table_Plot', 'Csv_Writer']
33+
VALID_WRITERS = ['ER_Diagram', 'Table_Plot', 'Csv_Writer', 'Parquet_Writer']
3434
VALID_PLUGINS = VALID_ENV + VALID_READERS + VALID_WRITERS + VALID_DATACARDS
35-
VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet', 'DuckDB', 'SqlAlchemy', 'HPSS']
35+
VALID_BACKENDS = ['Gufi', 'Sqlite', 'DuckDB', 'SqlAlchemy', 'HPSS']
3636
VALID_MODULES = VALID_PLUGINS + VALID_BACKENDS
3737
VALID_MODULE_FUNCTIONS = {'plugin': ['reader', 'writer'],
3838
'backend': ['back-read', 'back-write']}
@@ -503,13 +503,7 @@ def artifact_handler(self, interaction_type, query = None, **kwargs):
503503
self.logger.info(f"{first_backend.__class__.__name__} backend - {interaction_type.upper()} the data")
504504
start = datetime.now()
505505
if interaction_type in ['query', 'get']:
506-
# Only used when reading data from Parquet backend in CLI API (Parquet uses query not process) -
507-
# TODO fix this passthrough by updating Parquet to use process_artifacts()
508-
# TODO query all backends
509-
if len(self.loaded_backends) > 1:
510-
if parent_backend == "Filesystem" and ".temp.db" in first_backend.filename:
511-
first_backend = self.loaded_backends[1]
512-
parent_backend = first_backend.__class__.__bases__[0].__name__
506+
# TODO query all backends together
513507
if self.valid_backend(first_backend, parent_backend):
514508
if "query" in first_backend.query_artifacts.__code__.co_varnames:
515509
self.logger.info(f"Query to get data: {query}")
@@ -1362,8 +1356,6 @@ def valid_backend(self, backend, parent_name):
13621356
valid = True
13631357
if backend.__class__.__name__ == "DuckDB" and os.path.getsize(backend.filename) > 13000:
13641358
valid = True
1365-
if backend.__class__.__name__ == "Parquet" and os.path.getsize(backend.filename) > 100:
1366-
valid = True
13671359
return valid
13681360

13691361

dsi/dsi.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def list_readers(self):
117117
print("\nValid Readers for `reader_name` in read():\n" + "-"*50)
118118
print("Collection : Loads data from an Ordered Dict. If multiple tables, each table must be a nested OrderedDict.")
119119
print("CSV : Loads data from CSV files (one table per call)")
120+
print("Parquet : Loads data from Parquet - a columnar storage format for Apache Hadoop (one table per call)")
120121
print("YAML1 : Loads data from YAML files of a certain structure")
121122
print("TOML1 : Loads data from TOML files of a certain structure")
122123
print("JSON : Loads single-table data from JSON files")
@@ -139,6 +140,7 @@ def read(self, filenames, reader_name, table_name = None):
139140
The expected input type depends on the selected `reader_name`:
140141
- "Collection" → Ordered Dictionary of table(s)
141142
- "CSV" → .csv
143+
- "Parquet" → .pq
142144
- "YAML1" → .yaml or .yml
143145
- "TOML1" → .toml
144146
- "JSON" → .json
@@ -163,7 +165,7 @@ def read(self, filenames, reader_name, table_name = None):
163165
164166
Required when using the `Collection` reader to load an Ordered Dictionary representing only one table.
165167
166-
Recommended when the input file contains a single table for the `CSV`, `JSON`, or `Ensemble` reader.
168+
Recommended when the input file contains a single table for the `CSV`, `Parquet`, `JSON`, or `Ensemble` reader.
167169
"""
168170
if isinstance(filenames, str) and not os.path.exists(filenames):
169171
sys.exit("read() ERROR: The input file must be a valid filepath. Please check again.")
@@ -234,6 +236,8 @@ def read(self, filenames, reader_name, table_name = None):
234236
self.t.load_module('plugin', 'Bueno', 'reader', filenames=filenames)
235237
elif reader_name.lower() == "csv":
236238
self.t.load_module('plugin', 'Csv', 'reader', filenames=filenames, table_name=table_name)
239+
elif reader_name.lower() == "parquet":
240+
self.t.load_module('plugin', 'Parquet', 'reader', filenames=filenames, table_name=table_name)
237241
elif reader_name.lower() == "yaml1":
238242
self.t.load_module('plugin', 'YAML1', 'reader', filenames=filenames)
239243
elif reader_name.lower() == "toml1":
@@ -257,7 +261,7 @@ def read(self, filenames, reader_name, table_name = None):
257261

258262
if correct_reader == False:
259263
print("read() ERROR: Please check your spelling of the 'reader_name' argument as it does not exist in DSI\n")
260-
elg = "Collection, CSV, YAML1, TOML1, JSON, Ensemble, Cloverleaf, Bueno, DublinCoreDatacard, SchemaOrgDatacard"
264+
elg = "Collection, CSV, Parquet, YAML1, TOML1, JSON, Ensemble, Cloverleaf, Bueno, DublinCoreDatacard, SchemaOrgDatacard"
261265
sys.exit(f"Eligible readers are: {elg}, GoogleDatacard, Oceans11Datacard")
262266

263267
table_keys = [k for k in self.t.new_tables if k not in ("dsi_relations", "dsi_units")]
@@ -672,6 +676,7 @@ def list_writers(self):
672676
print("ER_Diagram : Creates a visual ER diagram image based on all tables in DSI.")
673677
print("Table_Plot : Generates a plot of numerical data from a specified table.")
674678
print("Csv : Exports the data of a specified table to a CSV file.")
679+
print("Parquet : Exports the data of a specified table to a Parquet file.")
675680
print()
676681

677682
def write(self, filename, writer_name, table_name = None):
@@ -685,6 +690,7 @@ def write(self, filename, writer_name, table_name = None):
685690
- "ER_Diagram" → .png, .pdf, .jpg, .jpeg
686691
- "Table_Plot" → .png, .jpg, .jpeg
687692
- "Csv" → .csv
693+
- "Parquet" → .pq
688694
689695
`writer_name` : str
690696
Name of the DSI Writer to export data.
@@ -695,7 +701,7 @@ def write(self, filename, writer_name, table_name = None):
695701
For guidance on creating a DSI-compatible Writer, view :ref:`custom_writer`.
696702
697703
`table_name`: str, optional
698-
Required when using "Table_Plot" or "Csv" to specify which table to export.
704+
Required when using "Table_Plot", "Csv" or "Parquet" to specify which table to export.
699705
"""
700706
if not self.t.valid_backend(self.main_backend_obj, self.main_backend_obj.__class__.__bases__[0].__name__):
701707
sys.exit("ERROR: Cannot write() data from an empty backend. Please ensure there is data in it.")
@@ -764,6 +770,8 @@ def write(self, filename, writer_name, table_name = None):
764770
self.t.load_module('plugin', 'Table_Plot', 'writer', filename=filename, table_name = table_name)
765771
elif writer_name.lower() in ["csv", "csv writer", "csv_writer"]:
766772
self.t.load_module('plugin', 'Csv_Writer', 'writer', filename=filename, table_name = table_name)
773+
elif writer_name.lower() in ["parquet", "parquet writer", "parquet_writer"]:
774+
self.t.load_module('plugin', 'Parquet_Writer', 'writer', filename=filename, table_name = table_name)
767775
else:
768776
correct_writer = False
769777
except Exception as e:

dsi/plugins/file_reader.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
try: import tomllib
1010
except ModuleNotFoundError: import pip._vendor.tomli as tomllib
1111
import os
12+
from pyarrow import parquet as pq
1213
# import ast
1314

1415
from dsi.plugins.metadata import StructuredMetadata
@@ -459,6 +460,52 @@ def add_rows(self) -> None:
459460

460461
self.set_schema_2(self.toml_data)
461462

463+
class Parquet(FileReader):
464+
"""
465+
DSI Reader that loads data stored in a Parquet file as a table. Users can choose to specify the table name upon reading too.
466+
"""
467+
def __init__(self, filenames, table_name = None, **kwargs):
468+
"""
469+
Initializes the Parquet Reader with user specified filenames and an optional table_name.
470+
471+
`filenames` : str or list of str
472+
Required. One or more Parquet file paths to be loaded into DSI.
473+
If multiple files are provided, all data must correspond to the same table.
474+
475+
`table_name` : str, optional
476+
Optional name to assign to the loaded table.
477+
If not provided, DSI will default to using "Parquet" as the table name.
478+
"""
479+
super().__init__(filenames, **kwargs)
480+
self.parquet_data = OrderedDict()
481+
if isinstance(filenames, str):
482+
self.filenames = [filenames]
483+
else:
484+
self.filenames = filenames
485+
self.table_name = table_name
486+
487+
def add_rows(self) -> None:
488+
"""Parses Parquet data and stores data into a table as an Ordered Dictionary."""
489+
total_df = DataFrame()
490+
491+
for filename in self.filenames:
492+
table = pq.read_table(filename).to_pandas()
493+
try:
494+
total_df = concat([total_df, table], axis=0, ignore_index=True)
495+
except:
496+
raise TypeError(f"Error in adding {filename} to the existing Parquet data. Please recheck column names and data structure")
497+
498+
table_data = OrderedDict(total_df.to_dict(orient='list'))
499+
for col, coldata in table_data.items(): # replace NaNs with None
500+
table_data[col] = [None if type(item) == float and isnan(item) else item for item in coldata]
501+
502+
if self.table_name is not None:
503+
self.parquet_data[self.table_name] = table_data
504+
else:
505+
self.parquet_data = table_data
506+
507+
self.set_schema_2(self.parquet_data)
508+
462509
class Ensemble(FileReader):
463510
"""
464511
DSI Reader that loads ensemble simulation data stored in a CSV file.

0 commit comments

Comments
 (0)