Skip to content

Commit b3e2d45

Browse files
committed
Initial commit! Mostly copy from WIP cid-harvard/atlas_core
1 parent 5217cfc commit b3e2d45

File tree

6 files changed

+575
-0
lines changed

6 files changed

+575
-0
lines changed

pandas_to_postgres/__init__.py

Whitespace-only changes.

pandas_to_postgres/copy_df.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from .utilities import (
2+
create_file_object,
3+
df_generator,
4+
logger,
5+
cast_pandas,
6+
add_level_metadata,
7+
)
8+
9+
import pandas as pd
10+
from sqlalchemy.sql.schema import Table
11+
from sqlalchemy.engine.base import Connection
12+
from sqlalchemy.schema import AddConstraint, DropConstraint
13+
from sqlalchemy.exc import SQLAlchemyError
14+
15+
16+
class DataFrameCopy(object):
17+
def __init__(
18+
self,
19+
conn: Connection,
20+
table_obj: Table,
21+
df: pd.DataFrame,
22+
levels: dict = None,
23+
csv_chunksize: int = 10 ** 6,
24+
):
25+
self.conn = conn
26+
self.table_obj = table_obj
27+
self.sql_table = self.table_obj.name
28+
self.df = df
29+
self.levels = levels
30+
self.columns = self.df.columns
31+
self.rows = self.df.shape[0]
32+
self.csv_chunksize = csv_chunksize
33+
self.primary_key = self.table_obj.primary_key
34+
self.foreign_keys = self.table_obj.foreign_key_constraints
35+
36+
def close_conn(self):
37+
self.conn.close()
38+
del self.conn
39+
40+
def drop_pk(self):
41+
logger.info(f"Dropping {self.sql_table} primary key")
42+
try:
43+
with self.conn.begin_nested():
44+
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
45+
except SQLAlchemyError:
46+
logger.info(f"{self.sql_table} primary key not found. Skipping")
47+
48+
def create_pk(self):
49+
logger.info(f"Creating {self.sql_table} primary key")
50+
self.conn.execute(AddConstraint(self.primary_key))
51+
52+
def drop_fks(self):
53+
for fk in self.foreign_keys:
54+
logger.info(f"Dropping foreign key {fk.name}")
55+
try:
56+
with self.conn.begin_nested():
57+
self.conn.execute(DropConstraint(fk))
58+
except SQLAlchemyError:
59+
logger.warn(f"Foreign key {fk.name} not found")
60+
61+
def create_fks(self):
62+
for fk in self.foreign_keys:
63+
try:
64+
logger.info(f"Creating foreign key {fk.name}")
65+
self.conn.execute(AddConstraint(fk))
66+
except SQLAlchemyError:
67+
logger.warn(f"Error creating foreign key {fk.name}")
68+
69+
def truncate(self):
70+
logger.info(f"Truncating {self.sql_table}")
71+
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
72+
73+
def analyze(self):
74+
logger.info(f"Analyzing {self.sql_table}")
75+
self.conn.execute(f"ANALYZE {self.sql_table};")
76+
77+
def copy_from_file(self, file_object):
78+
cur = self.conn.connection.cursor()
79+
cols = ", ".join([f"{col}" for col in self.columns])
80+
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
81+
cur.copy_expert(sql=sql, file=file_object)
82+
83+
def format_df(self):
84+
# Handle NaN --> None type casting
85+
self.df = cast_pandas(self.df, self.table_obj)
86+
87+
# Add level (constant) data to frames from dict
88+
if self.levels:
89+
self.df = add_level_metadata(self.df, self.levels)
90+
91+
def copy(self):
92+
self.drop_fks()
93+
self.drop_pk()
94+
self.format_df()
95+
with self.conn.begin():
96+
self.truncate()
97+
98+
logger.info("Creating generator for chunking dataframe")
99+
for chunk in df_generator(self.df, self.csv_chunksize):
100+
101+
logger.info("Creating CSV in memory")
102+
fo = create_file_object(chunk)
103+
104+
logger.info("Copying chunk to database")
105+
self.copy_from_file(fo)
106+
del fo
107+
108+
logger.info(f"All chunks copied ({self.rows} rows)")
109+
110+
self.create_pk()
111+
self.create_fks()
112+
self.analyze()

pandas_to_postgres/copy_hdf.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
from .utilities import (
2+
create_file_object,
3+
df_generator,
4+
logger,
5+
classification_to_pandas,
6+
cast_pandas,
7+
add_level_metadata,
8+
)
9+
10+
import pandas as pd
11+
from sqlalchemy.schema import AddConstraint, DropConstraint
12+
from sqlalchemy.exc import SQLAlchemyError
13+
14+
15+
class HDFTableCopy(object):
16+
17+
rows = 0
18+
columns = None
19+
20+
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
21+
self.sql_table = sql_table
22+
self.hdf_tables = hdf_tables
23+
self.csv_chunksize = csv_chunksize
24+
25+
# Info from the HDFMetadata object
26+
self.levels = hdf_meta.levels
27+
self.file_name = hdf_meta.file_name
28+
self.hdf_chunksize = hdf_meta.chunksize
29+
30+
def table_metadata(self):
31+
self.table_obj = db.metadata.tables[self.sql_table]
32+
self.primary_key = self.table_obj.primary_key
33+
self.foreign_keys = self.table_obj.foreign_key_constraints
34+
35+
def set_conn(self, conn):
36+
self.conn = conn
37+
38+
def delete_conn(self):
39+
del self.conn
40+
41+
def drop_pk(self):
42+
logger.info(f"Dropping {self.sql_table} primary key")
43+
try:
44+
with self.conn.begin_nested():
45+
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
46+
except SQLAlchemyError:
47+
logger.info(f"{self.sql_table} primary key not found. Skipping")
48+
49+
def create_pk(self):
50+
logger.info(f"Creating {self.sql_table} primary key")
51+
self.conn.execute(AddConstraint(self.primary_key))
52+
53+
def drop_fks(self):
54+
for fk in self.foreign_keys:
55+
logger.info(f"Dropping foreign key {fk.name}")
56+
try:
57+
with self.conn.begin_nested():
58+
self.conn.execute(DropConstraint(fk))
59+
except SQLAlchemyError:
60+
logger.warn(f"Foreign key {fk.name} not found")
61+
62+
def create_fks(self):
63+
for fk in self.foreign_keys:
64+
try:
65+
logger.info(f"Creating foreign key {fk.name}")
66+
self.conn.execute(AddConstraint(fk))
67+
except SQLAlchemyError:
68+
logger.warn(f"Error creating foreign key {fk.name}")
69+
70+
def truncate(self):
71+
logger.info(f"Truncating {self.sql_table}")
72+
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
73+
74+
def analyze(self):
75+
logger.info(f"Analyzing {self.sql_table}")
76+
self.conn.execute(f"ANALYZE {self.sql_table};")
77+
78+
def copy_from_file(self, file_object):
79+
cur = self.conn.connection.cursor()
80+
cols = ", ".join([f"{col}" for col in self.columns])
81+
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
82+
cur.copy_expert(sql=sql, file=file_object)
83+
84+
def copy_table(self):
85+
self.table_metadata()
86+
self.drop_fks()
87+
self.drop_pk()
88+
with self.conn.begin():
89+
self.truncate()
90+
self.hdf_to_pg()
91+
self.create_pk()
92+
self.create_fks()
93+
self.analyze()
94+
95+
def hdf_to_pg(self):
96+
if self.hdf_tables is None:
97+
logger.warn(f"No HDF table found for SQL table {self.sql_table}")
98+
return
99+
100+
for hdf_table in self.hdf_tables:
101+
logger.info(f"*** {hdf_table} ***")
102+
hdf_levels = self.levels.get(hdf_table)
103+
104+
logger.info("Reading HDF table")
105+
df = pd.read_hdf(self.file_name, key=hdf_table)
106+
self.rows += len(df)
107+
108+
# Handle NaN --> None type casting and adding const level data
109+
df = cast_pandas(df, self.table_obj)
110+
df = add_level_metadata(df, hdf_levels)
111+
112+
if self.columns is None:
113+
self.columns = df.columns
114+
115+
logger.info("Creating generator for chunking dataframe")
116+
for chunk in df_generator(df, self.csv_chunksize):
117+
118+
logger.info("Creating CSV in memory")
119+
fo = create_file_object(chunk)
120+
121+
logger.info("Copying chunk to database")
122+
self.copy_from_file(fo)
123+
del fo
124+
del df
125+
logger.info(f"All chunks copied ({self.rows} rows)")
126+
127+
128+
class ClassificationHDFTableCopy(HDFTableCopy):
129+
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
130+
HDFTableCopy.__init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize)
131+
132+
def hdf_to_pg(self):
133+
if self.hdf_tables is None:
134+
logger.warn("No HDF table found for SQL table {self.sql_table}")
135+
return
136+
137+
for hdf_table in self.hdf_tables:
138+
logger.info(f"*** {hdf_table} ***")
139+
logger.info("Reading HDF table")
140+
df = pd.read_hdf(self.file_name, key=hdf_table)
141+
self.rows += len(df)
142+
143+
logger.info("Formatting classification")
144+
df = classification_to_pandas(df)
145+
df = cast_pandas(df, self.table_obj)
146+
147+
if self.columns is None:
148+
self.columns = df.columns
149+
150+
logger.info("Creating CSV in memory")
151+
fo = create_file_object(df)
152+
153+
logger.info("Copying table to database")
154+
self.copy_from_file(fo)
155+
del df
156+
del fo
157+
logger.info(f"All chunks copied ({self.rows} rows)")
158+
159+
160+
class BigHDFTableCopy(HDFTableCopy):
161+
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
162+
HDFTableCopy.__init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize)
163+
164+
def hdf_to_pg(self):
165+
if self.hdf_tables is None:
166+
logger.warn(f"No HDF table found for SQL table {self.sql_table}")
167+
return
168+
169+
for hdf_table in self.hdf_tables:
170+
logger.info(f"*** {hdf_table} ***")
171+
hdf_levels = self.levels.get(hdf_table)
172+
173+
with pd.HDFStore(self.file_name) as store:
174+
nrows = store.get_storer(hdf_table).nrows
175+
176+
self.rows += nrows
177+
if nrows % self.hdf_chunksize:
178+
n_chunks = (nrows // self.hdf_chunksize) + 1
179+
else:
180+
n_chunks = nrows // self.hdf_chunksize
181+
182+
start = 0
183+
184+
for i in range(n_chunks):
185+
logger.info(f"*** HDF chunk {i + 1} of {n_chunks} ***")
186+
logger.info("Reading HDF table")
187+
stop = min(start + self.hdf_chunksize, nrows)
188+
df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)
189+
190+
start += self.hdf_chunksize
191+
192+
# Handle NaN --> None type casting and adding const level data
193+
df = cast_pandas(df, self.table_obj)
194+
df = add_level_metadata(df, hdf_levels)
195+
196+
if self.columns is None:
197+
self.columns = df.columns
198+
199+
logger.info("Creating generator for chunking dataframe")
200+
for chunk in df_generator(df, self.csv_chunksize):
201+
logger.info("Creating CSV in memory")
202+
fo = create_file_object(chunk)
203+
204+
logger.info("Copying chunk to database")
205+
self.copy_from_file(fo)
206+
del fo
207+
del df
208+
logger.info(f"All chunks copied ({self.rows} rows)")

0 commit comments

Comments
 (0)