Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions docs/backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,3 @@ GUFI
.. automodule:: dsi.backends.gufi
:members:
:special-members: __init__

Parquet
--------

.. automodule:: dsi.backends.parquet
:members:
:special-members: __init__
1 change: 0 additions & 1 deletion docs/cr_intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ All DSI backends include:
- DuckDB: In-process SQL database designed for fast queries on large data files
- GUFI: the `Grand Unified File Index system <https://github.com/mar-file-system/GUFI>`_ ; developed at LANL.
GUFI is a fast, secure metadata search across a filesystem accessible to both privileged and unprivileged users.
- Parquet: a columnar storage format for `Apache Hadoop <https://hadoop.apache.org>`_.

DSI Core
~~~~~~~~
Expand Down
1 change: 0 additions & 1 deletion dsi/backends/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class Filesystem(Backend):
# Declare store types
GUFI_STORE = "gufi"
SQLITE_STORE = "sqlite"
PARQUET_STORE = "parquet"

# Declare comparison types
GT = ">"
Expand Down
114 changes: 0 additions & 114 deletions dsi/backends/parquet.py

This file was deleted.

29 changes: 0 additions & 29 deletions dsi/backends/tests/test_parquet.py

This file was deleted.

14 changes: 3 additions & 11 deletions dsi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ def export_table(self, table_name, filename):
elif file_extension.lower() == "csv":
self.t.load_module('plugin', "Csv_Writer", "writer", filename = filename, table_name = table_name)
elif file_extension.lower() in ['pq', 'parquet']:
table_data = self.t.active_metadata[table_name]
df = pd.DataFrame(table_data)
df.to_parquet(filename, engine='pyarrow', index=False)
self.t.load_module('plugin', "Parquet_Writer", "writer", filename = filename, table_name = table_name)
else:
success_load = False
except Exception as e:
Expand Down Expand Up @@ -481,14 +479,8 @@ def read(self, args):
self.t.load_module('plugin', "YAML1", "reader", filenames = dbfile)
elif file_extension.lower() == 'json':
self.t.load_module('plugin', "JSON", "reader", filenames = dbfile)
elif file_extension.lower() == 'pq' or file_extension.lower() == 'parquet':
self.t.load_module('backend','Parquet','back-write', filename=dbfile)
data = OrderedDict(self.t.artifact_handler(interaction_type="query")) #Parquet's query() returns a normal dict
if table_name is not None:
self.t.active_metadata[table_name] = data
else:
self.t.active_metadata["Parquet"] = data
self.t.unload_module('backend','Parquet','back-write')
elif file_extension.lower() in ['pq', 'parquet']:
self.t.load_module('plugin', "Parquet", "reader", filenames = dbfile, table_name = table_name)
except Exception as e:
print(f"read ERROR: {e}\n")
self.t.active_metadata = OrderedDict()
Expand Down
18 changes: 5 additions & 13 deletions dsi/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ class Terminal():
for more information.
"""
BACKEND_PREFIX = ['dsi.backends']
BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'parquet', 'duckdb', 'hpss']
BACKEND_IMPLEMENTATIONS = ['gufi', 'sqlite', 'duckdb', 'hpss']
PLUGIN_PREFIX = ['dsi.plugins']
PLUGIN_IMPLEMENTATIONS = ['env', 'file_reader', 'file_writer', 'collection_reader']
VALID_ENV = ['Hostname', 'SystemKernel', 'GitInfo']
VALID_READERS = ['Bueno', 'Csv', 'YAML1', 'TOML1', 'Schema', 'JSON', 'MetadataReader1', 'Ensemble', 'Cloverleaf', 'Dict']
VALID_READERS = ['Bueno', 'Csv', 'YAML1', 'TOML1', 'Parquet', 'Schema', 'JSON', 'MetadataReader1', 'Ensemble', 'Cloverleaf', 'Dict']
VALID_DATACARDS = ['Oceans11Datacard', 'DublinCoreDatacard', 'SchemaOrgDatacard', 'GoogleDatacard']
VALID_WRITERS = ['ER_Diagram', 'Table_Plot', 'Csv_Writer']
VALID_WRITERS = ['ER_Diagram', 'Table_Plot', 'Csv_Writer', 'Parquet_Writer']
VALID_PLUGINS = VALID_ENV + VALID_READERS + VALID_WRITERS + VALID_DATACARDS
VALID_BACKENDS = ['Gufi', 'Sqlite', 'Parquet', 'DuckDB', 'SqlAlchemy', 'HPSS']
VALID_BACKENDS = ['Gufi', 'Sqlite', 'DuckDB', 'SqlAlchemy', 'HPSS']
VALID_MODULES = VALID_PLUGINS + VALID_BACKENDS
VALID_MODULE_FUNCTIONS = {'plugin': ['reader', 'writer'],
'backend': ['back-read', 'back-write']}
Expand Down Expand Up @@ -503,13 +503,7 @@ def artifact_handler(self, interaction_type, query = None, **kwargs):
self.logger.info(f"{first_backend.__class__.__name__} backend - {interaction_type.upper()} the data")
start = datetime.now()
if interaction_type in ['query', 'get']:
# Only used when reading data from Parquet backend in CLI API (Parquet uses query not process) -
# TODO fix this passthrough by updating Parquet to use process_artifacts()
# TODO query all backends
if len(self.loaded_backends) > 1:
if parent_backend == "Filesystem" and ".temp.db" in first_backend.filename:
first_backend = self.loaded_backends[1]
parent_backend = first_backend.__class__.__bases__[0].__name__
# TODO query all backends together
if self.valid_backend(first_backend, parent_backend):
if "query" in first_backend.query_artifacts.__code__.co_varnames:
self.logger.info(f"Query to get data: {query}")
Expand Down Expand Up @@ -1362,8 +1356,6 @@ def valid_backend(self, backend, parent_name):
valid = True
if backend.__class__.__name__ == "DuckDB" and os.path.getsize(backend.filename) > 13000:
valid = True
if backend.__class__.__name__ == "Parquet" and os.path.getsize(backend.filename) > 100:
valid = True
return valid


Expand Down
14 changes: 11 additions & 3 deletions dsi/dsi.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def list_readers(self):
print("\nValid Readers for `reader_name` in read():\n" + "-"*50)
print("Collection : Loads data from an Ordered Dict. If multiple tables, each table must be a nested OrderedDict.")
print("CSV : Loads data from CSV files (one table per call)")
print("Parquet : Loads data from Parquet - a columnar storage format for Apache Hadoop (one table per call)")
print("YAML1 : Loads data from YAML files of a certain structure")
print("TOML1 : Loads data from TOML files of a certain structure")
print("JSON : Loads single-table data from JSON files")
Expand All @@ -139,6 +140,7 @@ def read(self, filenames, reader_name, table_name = None):
The expected input type depends on the selected `reader_name`:
- "Collection" → Ordered Dictionary of table(s)
- "CSV" → .csv
- "Parquet" → .pq
- "YAML1" → .yaml or .yml
- "TOML1" → .toml
- "JSON" → .json
Expand All @@ -163,7 +165,7 @@ def read(self, filenames, reader_name, table_name = None):

Required when using the `Collection` reader to load an Ordered Dictionary representing only one table.

Recommended when the input file contains a single table for the `CSV`, `JSON`, or `Ensemble` reader.
Recommended when the input file contains a single table for the `CSV`, `Parquet`, `JSON`, or `Ensemble` reader.
"""
if isinstance(filenames, str) and not os.path.exists(filenames):
sys.exit("read() ERROR: The input file must be a valid filepath. Please check again.")
Expand Down Expand Up @@ -234,6 +236,8 @@ def read(self, filenames, reader_name, table_name = None):
self.t.load_module('plugin', 'Bueno', 'reader', filenames=filenames)
elif reader_name.lower() == "csv":
self.t.load_module('plugin', 'Csv', 'reader', filenames=filenames, table_name=table_name)
elif reader_name.lower() == "parquet":
self.t.load_module('plugin', 'Parquet', 'reader', filenames=filenames, table_name=table_name)
elif reader_name.lower() == "yaml1":
self.t.load_module('plugin', 'YAML1', 'reader', filenames=filenames)
elif reader_name.lower() == "toml1":
Expand All @@ -257,7 +261,7 @@ def read(self, filenames, reader_name, table_name = None):

if correct_reader == False:
print("read() ERROR: Please check your spelling of the 'reader_name' argument as it does not exist in DSI\n")
elg = "Collection, CSV, YAML1, TOML1, JSON, Ensemble, Cloverleaf, Bueno, DublinCoreDatacard, SchemaOrgDatacard"
elg = "Collection, CSV, Parquet, YAML1, TOML1, JSON, Ensemble, Cloverleaf, Bueno, DublinCoreDatacard, SchemaOrgDatacard"
sys.exit(f"Eligible readers are: {elg}, GoogleDatacard, Oceans11Datacard")

table_keys = [k for k in self.t.new_tables if k not in ("dsi_relations", "dsi_units")]
Expand Down Expand Up @@ -672,6 +676,7 @@ def list_writers(self):
print("ER_Diagram : Creates a visual ER diagram image based on all tables in DSI.")
print("Table_Plot : Generates a plot of numerical data from a specified table.")
print("Csv : Exports the data of a specified table to a CSV file.")
print("Parquet : Exports the data of a specified table to a Parquet file.")
print()

def write(self, filename, writer_name, table_name = None):
Expand All @@ -685,6 +690,7 @@ def write(self, filename, writer_name, table_name = None):
- "ER_Diagram" → .png, .pdf, .jpg, .jpeg
- "Table_Plot" → .png, .jpg, .jpeg
- "Csv" → .csv
- "Parquet" → .pq

`writer_name` : str
Name of the DSI Writer to export data.
Expand All @@ -695,7 +701,7 @@ def write(self, filename, writer_name, table_name = None):
For guidance on creating a DSI-compatible Writer, view :ref:`custom_writer`.

`table_name`: str, optional
Required when using "Table_Plot" or "Csv" to specify which table to export.
Required when using "Table_Plot", "Csv" or "Parquet" to specify which table to export.
"""
if not self.t.valid_backend(self.main_backend_obj, self.main_backend_obj.__class__.__bases__[0].__name__):
sys.exit("ERROR: Cannot write() data from an empty backend. Please ensure there is data in it.")
Expand Down Expand Up @@ -764,6 +770,8 @@ def write(self, filename, writer_name, table_name = None):
self.t.load_module('plugin', 'Table_Plot', 'writer', filename=filename, table_name = table_name)
elif writer_name.lower() in ["csv", "csv writer", "csv_writer"]:
self.t.load_module('plugin', 'Csv_Writer', 'writer', filename=filename, table_name = table_name)
elif writer_name.lower() in ["parquet", "parquet writer", "parquet_writer"]:
self.t.load_module('plugin', 'Parquet_Writer', 'writer', filename=filename, table_name = table_name)
else:
correct_writer = False
except Exception as e:
Expand Down
47 changes: 47 additions & 0 deletions dsi/plugins/file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
try: import tomllib
except ModuleNotFoundError: import pip._vendor.tomli as tomllib
import os
from pyarrow import parquet as pq
# import ast

from dsi.plugins.metadata import StructuredMetadata
Expand Down Expand Up @@ -459,6 +460,52 @@ def add_rows(self) -> None:

self.set_schema_2(self.toml_data)

class Parquet(FileReader):
"""
DSI Reader that loads data stored in a Parquet file as a table. Users can choose to specify the table name upon reading too.
"""
def __init__(self, filenames, table_name = None, **kwargs):
"""
Initializes the Parquet Reader with user specified filenames and an optional table_name.

`filenames` : str or list of str
Required. One or more Parquet file paths to be loaded into DSI.
If multiple files are provided, all data must correspond to the same table.

`table_name` : str, optional
Optional name to assign to the loaded table.
If not provided, DSI will default to using "Parquet" as the table name.
"""
super().__init__(filenames, **kwargs)
self.parquet_data = OrderedDict()
if isinstance(filenames, str):
self.filenames = [filenames]
else:
self.filenames = filenames
self.table_name = table_name

def add_rows(self) -> None:
"""Parses Parquet data and stores data into a table as an Ordered Dictionary."""
total_df = DataFrame()

for filename in self.filenames:
table = pq.read_table(filename).to_pandas()
try:
total_df = concat([total_df, table], axis=0, ignore_index=True)
except:
raise TypeError(f"Error in adding {filename} to the existing Parquet data. Please recheck column names and data structure")

table_data = OrderedDict(total_df.to_dict(orient='list'))
for col, coldata in table_data.items(): # replace NaNs with None
table_data[col] = [None if type(item) == float and isnan(item) else item for item in coldata]

if self.table_name is not None:
self.parquet_data[self.table_name] = table_data
else:
self.parquet_data = table_data

self.set_schema_2(self.parquet_data)

class Ensemble(FileReader):
"""
DSI Reader that loads ensemble simulation data stored in a CSV file.
Expand Down
Loading