Skip to content

Commit 0a34ac3

Browse files
committed
Refactor to BaseCopy class, other refactoring to make multiprocessing work with HDFTableCopy objects
1 parent b3e2d45 commit 0a34ac3

File tree

5 files changed

+193
-212
lines changed

5 files changed

+193
-212
lines changed

pandas_to_postgres/_base_copy.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from .utilities import (
2+
create_file_object,
3+
df_generator,
4+
logger,
5+
classification_to_pandas,
6+
cast_pandas,
7+
add_level_metadata,
8+
HDFMetadata,
9+
)
10+
11+
import pandas as pd
12+
from sqlalchemy.schema import AddConstraint, DropConstraint
13+
from sqlalchemy.exc import SQLAlchemyError
14+
from sqlalchemy.sql.schema import Table
15+
from sqlalchemy.engine.base import Connection
16+
17+
18+
class BaseCopy(object):
19+
def __init__(
20+
self,
21+
defer_sql_objs: bool = False,
22+
conn=None,
23+
table_obj=None,
24+
sql_table=None,
25+
csv_chunksize: int = 10 ** 6,
26+
):
27+
28+
self.rows = 0
29+
self.columns = None
30+
self.csv_chunksize = csv_chunksize
31+
32+
if not defer_sql_objs:
33+
self.instantiate_sql_objs(conn, table_obj)
34+
else:
35+
self.sql_table = sql_table
36+
37+
def instantiate_sql_objs(self, conn, table_obj):
38+
"""
39+
When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes
40+
issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs
41+
"""
42+
self.conn = conn
43+
self.table_obj = table_obj
44+
self.sql_table = table_obj.name
45+
self.primary_key = table_obj.primary_key
46+
self.foreign_keys = table_obj.foreign_key_constraints
47+
48+
def drop_pk(self):
49+
logger.info(f"Dropping {self.sql_table} primary key")
50+
try:
51+
with self.conn.begin_nested():
52+
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
53+
except SQLAlchemyError:
54+
logger.info(f"{self.sql_table} primary key not found. Skipping")
55+
56+
def create_pk(self):
57+
logger.info(f"Creating {self.sql_table} primary key")
58+
self.conn.execute(AddConstraint(self.primary_key))
59+
60+
def drop_fks(self):
61+
for fk in self.foreign_keys:
62+
logger.info(f"Dropping foreign key {fk.name}")
63+
try:
64+
with self.conn.begin_nested():
65+
self.conn.execute(DropConstraint(fk))
66+
except SQLAlchemyError:
67+
logger.warn(f"Foreign key {fk.name} not found")
68+
69+
def create_fks(self):
70+
for fk in self.foreign_keys:
71+
try:
72+
logger.info(f"Creating foreign key {fk.name}")
73+
self.conn.execute(AddConstraint(fk))
74+
except SQLAlchemyError:
75+
logger.warn(f"Error creating foreign key {fk.name}")
76+
77+
def truncate(self):
78+
logger.info(f"Truncating {self.sql_table}")
79+
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
80+
81+
def analyze(self):
82+
logger.info(f"Analyzing {self.sql_table}")
83+
self.conn.execute(f"ANALYZE {self.sql_table};")
84+
85+
def copy_from_file(self, file_object):
86+
cur = self.conn.connection.cursor()
87+
cols = ", ".join([f"{col}" for col in self.columns])
88+
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
89+
cur.copy_expert(sql=sql, file=file_object)

pandas_to_postgres/copy_df.py

Lines changed: 9 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,79 +6,29 @@
66
add_level_metadata,
77
)
88

9+
from ._base_copy import BaseCopy
10+
911
import pandas as pd
1012
from sqlalchemy.sql.schema import Table
1113
from sqlalchemy.engine.base import Connection
12-
from sqlalchemy.schema import AddConstraint, DropConstraint
13-
from sqlalchemy.exc import SQLAlchemyError
1414

1515

16-
class DataFrameCopy(object):
16+
class DataFrameCopy(BaseCopy):
1717
def __init__(
1818
self,
19-
conn: Connection,
20-
table_obj: Table,
2119
df: pd.DataFrame,
22-
levels: dict = None,
20+
defer_sql_objs: bool = False,
21+
conn: Connection = None,
22+
table_obj: Table = None,
2323
csv_chunksize: int = 10 ** 6,
24+
levels: dict = None,
2425
):
25-
self.conn = conn
26-
self.table_obj = table_obj
27-
self.sql_table = self.table_obj.name
26+
BaseCopy(defer_sql_objs, conn, table_obj, csv_chunksize)
27+
2828
self.df = df
2929
self.levels = levels
3030
self.columns = self.df.columns
3131
self.rows = self.df.shape[0]
32-
self.csv_chunksize = csv_chunksize
33-
self.primary_key = self.table_obj.primary_key
34-
self.foreign_keys = self.table_obj.foreign_key_constraints
35-
36-
def close_conn(self):
37-
self.conn.close()
38-
del self.conn
39-
40-
def drop_pk(self):
41-
logger.info(f"Dropping {self.sql_table} primary key")
42-
try:
43-
with self.conn.begin_nested():
44-
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
45-
except SQLAlchemyError:
46-
logger.info(f"{self.sql_table} primary key not found. Skipping")
47-
48-
def create_pk(self):
49-
logger.info(f"Creating {self.sql_table} primary key")
50-
self.conn.execute(AddConstraint(self.primary_key))
51-
52-
def drop_fks(self):
53-
for fk in self.foreign_keys:
54-
logger.info(f"Dropping foreign key {fk.name}")
55-
try:
56-
with self.conn.begin_nested():
57-
self.conn.execute(DropConstraint(fk))
58-
except SQLAlchemyError:
59-
logger.warn(f"Foreign key {fk.name} not found")
60-
61-
def create_fks(self):
62-
for fk in self.foreign_keys:
63-
try:
64-
logger.info(f"Creating foreign key {fk.name}")
65-
self.conn.execute(AddConstraint(fk))
66-
except SQLAlchemyError:
67-
logger.warn(f"Error creating foreign key {fk.name}")
68-
69-
def truncate(self):
70-
logger.info(f"Truncating {self.sql_table}")
71-
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
72-
73-
def analyze(self):
74-
logger.info(f"Analyzing {self.sql_table}")
75-
self.conn.execute(f"ANALYZE {self.sql_table};")
76-
77-
def copy_from_file(self, file_object):
78-
cur = self.conn.connection.cursor()
79-
cols = ", ".join([f"{col}" for col in self.columns])
80-
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
81-
cur.copy_expert(sql=sql, file=file_object)
8232

8333
def format_df(self):
8434
# Handle NaN --> None type casting

pandas_to_postgres/copy_hdf.py

Lines changed: 66 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,91 +5,49 @@
55
classification_to_pandas,
66
cast_pandas,
77
add_level_metadata,
8+
HDFMetadata,
89
)
910

10-
import pandas as pd
11-
from sqlalchemy.schema import AddConstraint, DropConstraint
12-
from sqlalchemy.exc import SQLAlchemyError
13-
11+
from ._base_copy import BaseCopy
1412

15-
class HDFTableCopy(object):
16-
17-
rows = 0
18-
columns = None
13+
import pandas as pd
14+
from sqlalchemy.sql.schema import Table
15+
from sqlalchemy.engine.base import Connection
16+
17+
18+
class HDFTableCopy(BaseCopy):
19+
def __init__(
20+
self,
21+
hdf_tables: list,
22+
hdf_meta: HDFMetadata,
23+
defer_sql_objs: bool = False,
24+
conn=None,
25+
table_obj=None,
26+
sql_table=None,
27+
csv_chunksize: int = 10 ** 6,
28+
):
29+
BaseCopy.__init__(
30+
self, defer_sql_objs, conn, table_obj, sql_table, csv_chunksize
31+
)
1932

20-
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
21-
self.sql_table = sql_table
2233
self.hdf_tables = hdf_tables
23-
self.csv_chunksize = csv_chunksize
2434

2535
# Info from the HDFMetadata object
2636
self.levels = hdf_meta.levels
2737
self.file_name = hdf_meta.file_name
2838
self.hdf_chunksize = hdf_meta.chunksize
2939

30-
def table_metadata(self):
31-
self.table_obj = db.metadata.tables[self.sql_table]
32-
self.primary_key = self.table_obj.primary_key
33-
self.foreign_keys = self.table_obj.foreign_key_constraints
34-
35-
def set_conn(self, conn):
36-
self.conn = conn
37-
38-
def delete_conn(self):
39-
del self.conn
40-
41-
def drop_pk(self):
42-
logger.info(f"Dropping {self.sql_table} primary key")
43-
try:
44-
with self.conn.begin_nested():
45-
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
46-
except SQLAlchemyError:
47-
logger.info(f"{self.sql_table} primary key not found. Skipping")
48-
49-
def create_pk(self):
50-
logger.info(f"Creating {self.sql_table} primary key")
51-
self.conn.execute(AddConstraint(self.primary_key))
52-
53-
def drop_fks(self):
54-
for fk in self.foreign_keys:
55-
logger.info(f"Dropping foreign key {fk.name}")
56-
try:
57-
with self.conn.begin_nested():
58-
self.conn.execute(DropConstraint(fk))
59-
except SQLAlchemyError:
60-
logger.warn(f"Foreign key {fk.name} not found")
61-
62-
def create_fks(self):
63-
for fk in self.foreign_keys:
64-
try:
65-
logger.info(f"Creating foreign key {fk.name}")
66-
self.conn.execute(AddConstraint(fk))
67-
except SQLAlchemyError:
68-
logger.warn(f"Error creating foreign key {fk.name}")
69-
70-
def truncate(self):
71-
logger.info(f"Truncating {self.sql_table}")
72-
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
73-
74-
def analyze(self):
75-
logger.info(f"Analyzing {self.sql_table}")
76-
self.conn.execute(f"ANALYZE {self.sql_table};")
77-
78-
def copy_from_file(self, file_object):
79-
cur = self.conn.connection.cursor()
80-
cols = ", ".join([f"{col}" for col in self.columns])
81-
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
82-
cur.copy_expert(sql=sql, file=file_object)
83-
8440
def copy_table(self):
85-
self.table_metadata()
8641
self.drop_fks()
8742
self.drop_pk()
43+
44+
# These need to be one transaction to use COPY FREEZE
8845
with self.conn.begin():
8946
self.truncate()
9047
self.hdf_to_pg()
91-
self.create_pk()
92-
self.create_fks()
48+
49+
self.create_pk()
50+
self.create_fks()
9351
self.analyze()
9452

9553
def hdf_to_pg(self):
@@ -126,8 +84,26 @@ def hdf_to_pg(self):
12684

12785

12886
class ClassificationHDFTableCopy(HDFTableCopy):
129-
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
130-
HDFTableCopy.__init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize)
87+
def __init__(
88+
self,
89+
hdf_tables: list,
90+
hdf_meta: HDFMetadata,
91+
defer_sql_objs: bool = False,
92+
conn=None,
93+
table_obj=None,
94+
sql_table: str = None,
95+
csv_chunksize: int = 10 ** 6,
96+
):
97+
HDFTableCopy.__init__(
98+
self,
99+
hdf_tables,
100+
hdf_meta,
101+
defer_sql_objs,
102+
conn,
103+
table_obj,
104+
sql_table,
105+
csv_chunksize,
106+
)
131107

132108
def hdf_to_pg(self):
133109
if self.hdf_tables is None:
@@ -158,8 +134,26 @@ def hdf_to_pg(self):
158134

159135

160136
class BigHDFTableCopy(HDFTableCopy):
161-
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
162-
HDFTableCopy.__init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize)
137+
def __init__(
138+
self,
139+
hdf_tables: list,
140+
hdf_meta: HDFMetadata,
141+
defer_sql_objs: bool = False,
142+
conn=None,
143+
table_obj=None,
144+
sql_table=None,
145+
csv_chunksize: int = 10 ** 6,
146+
):
147+
HDFTableCopy.__init__(
148+
self,
149+
hdf_tables,
150+
hdf_meta,
151+
defer_sql_objs,
152+
conn,
153+
table_obj,
154+
sql_table,
155+
csv_chunksize,
156+
)
163157

164158
def hdf_to_pg(self):
165159
if self.hdf_tables is None:

0 commit comments

Comments
 (0)