Skip to content

Commit bb6139a

Browse files
authored
Merge pull request #206 from lanl/issue198
allow users to overwrite existing schema, update er diagram writer and summary()
2 parents 193be79 + b25b21f commit bb6139a

File tree

12 files changed

+281
-534
lines changed

12 files changed

+281
-534
lines changed

docs/index.rst

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ The Data Science Infrastructure Project (DSI)
88

99
.. toctree::
1010
:maxdepth: 2
11-
:caption: Contents:
1211

1312
introduction
1413
installation
@@ -17,9 +16,15 @@ The Data Science Infrastructure Project (DSI)
1716
examples
1817
contributors
1918

20-
Indices and tables
21-
==================
19+
Indices
20+
=======
2221

2322
* :ref:`genindex`
2423
* :ref:`modindex`
25-
* :ref:`search`
24+
25+
Contact Us
26+
==========
27+
28+
For general inquiries or help, reach us at dsi-help (at) lanl.gov
29+
30+
Bugs/Feature Requests: `DSI GitHub Requests <https://github.com/lanl/dsi/issues>`_

docs/introduction.rst

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,11 @@ DSI Core
111111

112112
DSI basic functionality is contained within the middleware known as the *core*.
113113
Users will leverage Core to employ Readers, Writers, and Backends to interact with their data.
114-
The two primary methods to achieve this are with the :ref:`python_api_label` or the :ref:`cli_api_label`
114+
The two primary methods to achieve this are with the :ref:`python_api_label` or the :ref:`cli_api_label`
115+
116+
Contact Us
117+
----------
118+
119+
For general inquiries or help, reach us at dsi-help (at) lanl.gov
120+
121+
Bugs/Feature Requests: `DSI GitHub Requests <https://github.com/lanl/dsi/issues>`_

dsi/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.1.3"
1+
__version__ = "1.1.4"

dsi/backends/duckdb.py

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,56 @@ def ingest_artifacts(self, collection, isVerbose=False):
175175
"""
176176
artifacts = collection
177177

178+
self.cur.execute("BEGIN TRANSACTION")
179+
180+
if self.list() is not None and list(artifacts.keys()) == ["dsi_relations"]:
181+
pk_list = artifacts["dsi_relations"]["primary_key"]
182+
fk_list = artifacts["dsi_relations"]["foreign_key"]
183+
pk_tables = set(t[0] for t in pk_list)
184+
fk_tables = set(t[0] for t in fk_list if t[0] != None)
185+
all_schema_tables = pk_tables.union(fk_tables)
186+
db_tables = [t[0] for t in self.list() if t[0] != "dsi_units"]
187+
188+
# check if tables from dsi_relations are all in the db
189+
if all_schema_tables.issubset(set(db_tables)):
190+
circ, _ = self.check_table_relations(all_schema_tables, artifacts["dsi_relations"])
191+
if circ:
192+
return (ValueError, f"A complex schema with a circular dependency cannot be ingested into a DuckDB backend.")
193+
194+
drop_order = all_schema_tables
195+
collect = self.process_artifacts()
196+
if "dsi_relations" in collect.keys():
197+
curr_pk_tables = set(t[0] for t in collect["dsi_relations"]["primary_key"])
198+
curr_fk_tables = set(t[0] for t in collect["dsi_relations"]["foreign_key"] if t[0] != None)
199+
curr_schema_tables = curr_pk_tables.union(curr_fk_tables)
200+
201+
# need to drop and reingest all tables in old schema and new schema
202+
all_schema_tables = all_schema_tables.union(curr_schema_tables)
203+
204+
_, ord_tables1 = self.check_table_relations(all_schema_tables, collect["dsi_relations"])
205+
drop_order = ord_tables1
206+
207+
for table in drop_order:
208+
self.cur.execute(f'DROP TABLE IF EXISTS "{table}";')
209+
try:
210+
self.con.commit()
211+
except Exception as e:
212+
self.cur.execute("ROLLBACK")
213+
self.cur.execute("CHECKPOINT")
214+
return (duckdb.Error, e)
215+
216+
#do not reingest tables not in old or new schema as they will be the same
217+
non_schema_tables = set(db_tables) - all_schema_tables
218+
for t in non_schema_tables:
219+
del collect[t]
220+
221+
collect["dsi_relations"] = artifacts["dsi_relations"]
222+
artifacts = collect
223+
224+
else:
225+
print("WARNING: Complex schemas can only be ingested if all referenced data tables are loaded into DSI.")
226+
227+
178228
table_order = artifacts.keys()
179229
if "dsi_relations" in artifacts.keys():
180230
circular, ordered_tables = self.check_table_relations(artifacts.keys(), artifacts["dsi_relations"])
@@ -184,10 +234,8 @@ def ingest_artifacts(self, collection, isVerbose=False):
184234
else:
185235
table_order = list(reversed(ordered_tables)) # ingest primary key tables first then children
186236

187-
self.cur.execute("BEGIN TRANSACTION")
188237
if self.runTable:
189-
runTable_create = "CREATE TABLE IF NOT EXISTS runTable " \
190-
"(run_id INTEGER PRIMARY KEY, run_timestamp TEXT UNIQUE);"
238+
runTable_create = "CREATE TABLE IF NOT EXISTS runTable (run_id INTEGER PRIMARY KEY, run_timestamp TEXT UNIQUE);"
191239
self.cur.execute(runTable_create)
192240

193241
sequence_run_id = "CREATE SEQUENCE IF NOT EXISTS seq_run_id START 1;"
@@ -387,13 +435,16 @@ def notebook(self, interactive=False):
387435
def read_to_artifact(self):
388436
return self.process_artifacts()
389437

390-
def process_artifacts(self):
438+
def process_artifacts(self, only_units_relations = False):
391439
"""
392440
Reads data from the DuckDB database into a nested OrderedDict.
393441
Keys are table names, and values are OrderedDicts containing table data.
394442
395443
If the database contains PK/FK relationships, they are stored in a special `dsi_relations` table.
396444
445+
`only_units_relations` : bool, default=False
446+
**USERS SHOULD IGNORE THIS FLAG.** Used internally by duckdb.py.
447+
397448
`return` : OrderedDict
398449
A nested OrderedDict containing all data from the DuckDB database.
399450
"""
@@ -404,20 +455,22 @@ def process_artifacts(self):
404455
SELECT table_name FROM information_schema.tables
405456
WHERE table_schema = 'main' AND table_type = 'BASE TABLE'
406457
""").fetchall()
407-
for item in tableList:
408-
tableName = self.duckdb_compatible_name(item[0])
409458

410-
tableInfo = self.cur.execute(f"PRAGMA table_info({tableName});").fetchdf()
411-
colDict = OrderedDict((self.duckdb_compatible_name(col), []) for col in tableInfo['name'])
459+
if only_units_relations == False:
460+
for item in tableList:
461+
tableName = self.duckdb_compatible_name(item[0])
462+
463+
tableInfo = self.cur.execute(f"PRAGMA table_info({tableName});").fetchdf()
464+
colDict = OrderedDict((self.duckdb_compatible_name(col), []) for col in tableInfo['name'])
412465

413-
data = self.cur.execute(f"SELECT * FROM {tableName};").fetchall()
414-
for row in data:
415-
for colName, val in zip(colDict.keys(), row):
416-
if val == "NULL":
417-
colDict[colName].append(None)
418-
else:
419-
colDict[colName].append(val)
420-
artifact[tableName] = colDict
466+
data = self.cur.execute(f"SELECT * FROM {tableName};").fetchall()
467+
for row in data:
468+
for colName, val in zip(colDict.keys(), row):
469+
if val == "NULL":
470+
colDict[colName].append(None)
471+
else:
472+
colDict[colName].append(val)
473+
artifact[tableName] = colDict
421474

422475
pk_list = []
423476
fkData = self.cur.execute(f"""
@@ -743,6 +796,8 @@ def list(self):
743796
SELECT table_name FROM information_schema.tables
744797
WHERE table_schema = 'main' AND table_type = 'BASE TABLE'
745798
""").fetchall()
799+
if not tableList:
800+
return None
746801
tableList = [self.duckdb_compatible_name(table[0]) for table in tableList]
747802

748803
info_list = []
@@ -839,12 +894,13 @@ def summary_helper(self, table_name):
839894
col_info = self.cur.execute(f"PRAGMA table_info({table_name})").fetchall()
840895

841896
numeric_types = {'INTEGER', 'REAL', 'FLOAT', 'NUMERIC', 'DECIMAL', 'DOUBLE', 'BIGINT'}
842-
headers = ['column', 'type', 'min', 'max', 'avg', 'std_dev']
897+
headers = ['column', 'type', 'unique', 'min', 'max', 'avg', 'std_dev']
843898
rows = []
844899

845900
for col in col_info:
846901
col_name = col[1]
847902
col_type = col[2].upper()
903+
unique_vals = self.cur.execute(f"SELECT COUNT(DISTINCT {col_name}) FROM {table_name};").fetchone()[0]
848904
is_primary = col[5] > 0
849905
display_name = f"{col_name}*" if is_primary else col_name
850906

@@ -863,7 +919,7 @@ def summary_helper(self, table_name):
863919

864920
if avg_val != None and std_dev == None:
865921
std_dev = 0
866-
rows.append([display_name, col_type, min_val, max_val, avg_val, std_dev])
922+
rows.append([display_name, col_type, unique_vals, min_val, max_val, avg_val, std_dev])
867923

868924
return headers, rows
869925

@@ -1007,7 +1063,7 @@ def visit(node):
10071063
if any(visit(node) for node in list(graph.keys())):
10081064
return True, None # Circular dependency detected
10091065

1010-
# Step 3: Order tables from least dependencies to most (if no circular dependencies)
1066+
# Order tables from least dependencies to most (if no circular dependencies)
10111067
in_degree = {table: 0 for table in tables}
10121068
for child in graph:
10131069
for parent in graph[child]:

dsi/backends/sqlite.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,80 @@ def ingest_artifacts(self, collection, isVerbose=False):
183183
# self.cur.execute("PRAGMA FOREIGN KEYS = ON;")
184184
# self.con.commit()
185185

186+
#there are tables in db, only dsi_relations to be ingested
187+
if self.list() is not None and list(artifacts.keys()) == ["dsi_relations"]:
188+
pk_list = artifacts["dsi_relations"]["primary_key"]
189+
fk_list = artifacts["dsi_relations"]["foreign_key"]
190+
pk_tables = set(t[0] for t in pk_list)
191+
fk_tables = set(t[0] for t in fk_list if t[0] != None)
192+
all_schema_tables = pk_tables.union(fk_tables)
193+
db_tables = [t[0] for t in self.list() if t[0] != "dsi_units"]
194+
# check if tables from dsi_relations are all in the db
195+
if all_schema_tables.issubset(set(db_tables)):
196+
for table in all_schema_tables:
197+
pk_cols = list({v for k, v in pk_list if k == table})
198+
fk_dict = {fks[1]: pks for pks, fks, in zip(pk_list, fk_list) if fks[0] == table}
199+
200+
#1. create new table using existing table's columns but with new relations
201+
# use consistent naming scheme. ex: table1 --> table1_dsi_temmp
202+
create_stmt = f"CREATE TABLE {table}_dsi_temp ("
203+
table_info = self.cur.execute(f"PRAGMA table_info({table});").fetchall()
204+
for col in table_info:
205+
if col[1] in pk_cols:
206+
create_stmt += f"{col[1]} {col[2]} PRIMARY KEY, "
207+
else:
208+
create_stmt += f"{col[1]} {col[2]}, "
209+
210+
if fk_dict:
211+
fk_stmt = ""
212+
for k, v in fk_dict.items():
213+
if k not in create_stmt:
214+
msg = f"Input schema references a nonexistent column, {k}, in the foreign_key section of {table}"
215+
raise ValueError(msg)
216+
fk_stmt += f"FOREIGN KEY ({k}) REFERENCES {v[0]}({v[1]}), "
217+
create_stmt += fk_stmt
218+
create_stmt = create_stmt[:-2] + ");"
219+
220+
#2. move all data from existing to new table
221+
try:
222+
self.cur.execute(create_stmt)
223+
self.cur.execute(f"INSERT INTO {table}_dsi_temp SELECT * FROM {table};")
224+
except sqlite3.Error as e:
225+
self.con.rollback()
226+
return (sqlite3.Error, e)
227+
228+
# check if other non-schema tables have constraints -- if yes, make temp table, move data, and drop old table with schema tables
229+
other_tables = list(set(db_tables) - all_schema_tables)
230+
for table in other_tables:
231+
table_info = self.cur.execute(f"PRAGMA table_info({table});").fetchall()
232+
if any(col[5] > 0 for col in table_info) or len(self.cur.execute(f"PRAGMA foreign_key_list({table});").fetchall()) > 0:
233+
create_stmt = f"CREATE TABLE {table}_dsi_temp ("
234+
create_stmt += ", ".join(f"{col[1]} {col[2]}" for col in table_info)
235+
create_stmt += ");"
236+
try:
237+
self.cur.execute(create_stmt)
238+
self.cur.execute(f"INSERT INTO {table}_dsi_temp SELECT * FROM {table};")
239+
except sqlite3.Error as e:
240+
self.con.rollback()
241+
return (sqlite3.Error, e)
242+
all_schema_tables.add(table)
243+
244+
# reaching here means no errors with new schema --- SQLITE doesnt care about order of tables deleted
245+
# drop all tables in schema and rename temp tables to original names
246+
for table in all_schema_tables:
247+
self.cur.execute(f'DROP TABLE IF EXISTS "{table}";')
248+
self.cur.execute(f'ALTER TABLE {table}_dsi_temp RENAME TO {table};')
249+
250+
try:
251+
self.con.commit()
252+
except Exception as e:
253+
self.con.rollback()
254+
return (sqlite3.Error, e)
255+
256+
return #early return so dont make any other changes to db
257+
else:
258+
print("WARNING: Complex schemas can only be ingested if all referenced data tables are loaded into DSI.")
259+
186260
if self.runTable:
187261
runTable_create = "CREATE TABLE IF NOT EXISTS runTable (run_id INTEGER PRIMARY KEY AUTOINCREMENT, run_timestamp TEXT UNIQUE);"
188262
self.cur.execute(runTable_create)
@@ -831,6 +905,8 @@ def list(self):
831905
Return a list of all tables and their dimensions from this SQLite backend
832906
"""
833907
tableList = self.cur.execute("SELECT name FROM sqlite_master WHERE type ='table';").fetchall()
908+
if not tableList:
909+
return None
834910
tableList = [self.sqlite_compatible_name(table[0]) for table in tableList if table[0] != "sqlite_sequence"]
835911

836912
info_list = []
@@ -916,12 +992,13 @@ def summary_helper(self, table_name):
916992
col_info = self.cur.execute(f"PRAGMA table_info({table_name})").fetchall()
917993

918994
numeric_types = {'INTEGER', 'REAL', 'FLOAT', 'NUMERIC', 'DECIMAL', 'DOUBLE'}
919-
headers = ['column', 'type', 'min', 'max', 'avg', 'std_dev']
995+
headers = ['column', 'type', 'unique', 'min', 'max', 'avg', 'std_dev']
920996
rows = []
921997

922998
for col in col_info:
923999
col_name = col[1]
9241000
col_type = col[2].upper()
1001+
unique_vals = self.cur.execute(f"SELECT COUNT(DISTINCT {col_name}) FROM {table_name};").fetchone()[0]
9251002
is_primary = col[5] > 0
9261003
display_name = f"{col_name}*" if is_primary else col_name
9271004

@@ -955,7 +1032,7 @@ def summary_helper(self, table_name):
9551032

9561033
if avg_val != None and std_dev == None:
9571034
std_dev = 0
958-
rows.append([display_name, col_type, min_val, max_val, avg_val, std_dev])
1035+
rows.append([display_name, col_type, unique_vals, min_val, max_val, avg_val, std_dev])
9591036

9601037
return headers, rows
9611038

dsi/core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ def artifact_handler(self, interaction_type, query = None, **kwargs):
485485
if self.debug_level != 0:
486486
self.logger.error(f"Error ingesting data in {original_file} @ line {return_line_number} due to {errorMessage[1]}")
487487
if self.user_wrapper:
488+
if errorMessage[1].startswith("A complex schema"):
489+
raise errorMessage[0](errorMessage[1])
488490
raise errorMessage[0](f"Error ingesting data due to {errorMessage[1]}")
489491
else:
490492
raise errorMessage[0](f"Error ingesting data in {original_file} @ line {return_line_number} due to {errorMessage[1]}")

0 commit comments

Comments
 (0)