Skip to content

Commit c082a75

Browse files
authored
Merge pull request #4 from cid-harvard/feature/hdf-to-postgres-refactor
Feature/hdf to postgres refactor
2 parents 74edfb7 + 48d735f commit c082a75

File tree

2 files changed

+64
-57
lines changed

2 files changed

+64
-57
lines changed

pandas_to_postgres/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
33
from .hdf_to_postgres import (
44
hdf_to_postgres,
5-
multiprocess_hdf_to_postgres,
65
create_hdf_table_objects,
76
)
87
from .utilities import (

pandas_to_postgres/hdf_to_postgres.py

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from multiprocessing import Pool
2-
from .copy_hdf import HDFTableCopy, HDFMetadata
2+
3+
from sqlalchemy import MetaData, create_engine
4+
5+
from .copy_hdf import HDFTableCopy
6+
from .utilities import HDFMetadata
37

48

59
def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
@@ -34,91 +38,95 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
3438
return tables
3539

3640

37-
def _copy_worker(copy_obj, defer_sql_objs=True):
38-
"""
39-
Handle a SQLAlchemy connection and copy using HDFTableCopy object
41+
def _copy_worker(copy_obj, engine_args, engine_kwargs, maintenance_work_mem="1G"):
42+
43+
# Since we fork()ed into a new process, the engine contains process
44+
# specific stuff that shouldn't be shared - this creates a fresh Engine
45+
# with the same settings but without those.
46+
47+
engine = create_engine(*engine_args, **engine_kwargs)
48+
metadata = MetaData(bind=engine)
49+
metadata.reflect()
50+
51+
with engine.connect() as conn:
4052

41-
copy_obj: HDFTableCopy or subclass
42-
Object to use to run the copy() method on
43-
defer_sql_objs: bool
44-
If True, SQL objects were not build upon instantiation of copy_obj and should
45-
be built before copying data to db (needed for multiprocessing)
46-
"""
47-
database.engine.dispose()
48-
with database.engine.connect() as conn:
4953
conn.execution_options(autocommit=True)
50-
conn.execute("SET maintenance_work_mem TO 1000000;")
5154

52-
if defer_sql_objs:
53-
table_obj = database.metadata.tables[copy_obj.sql_table]
54-
copy_obj.instantiate_sql_objs(conn, table_obj)
55+
if maintenance_work_mem is not None:
56+
conn.execute("SET maintenance_work_mem TO {};".format(maintenance_work_mem))
57+
58+
# Get SQLAlchemy Table object
59+
table_obj = metadata.tables.get(copy_obj.sql_table, None)
60+
if table_obj is None:
61+
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))
62+
63+
copy_obj.instantiate_sql_objs(conn, table_obj)
5564

65+
# Run the task
5666
copy_obj.copy()
5767

5868

59-
def hdf_to_postgres(file_name, db, keys=[], csv_chunksize=10 ** 6):
69+
def hdf_to_postgres(file_name, engine_args, engine_kwargs={}, keys=[],
70+
csv_chunksize=10 ** 6, processes=None,
71+
maintenance_work_mem=None):
6072
"""
6173
Copy tables in a HDF file to PostgreSQL database
6274
6375
Parameters
6476
----------
6577
file_name: str
6678
name of file or path to file of HDF to use to copy
67-
db: SQLAlchemy database object
68-
destination database
79+
engine_args: list
80+
arguments to pass into create_engine()
81+
engine_kwargs: dict
82+
keyword arguments to pass into create_engine()
6983
keys: list of strings
7084
HDF keys to copy
7185
csv_chunksize: int
7286
Maximum number of StringIO CSV rows to keep in memory at a time
87+
processes: int or None
88+
If None, run single threaded. If integer, number of processes in the
89+
multiprocessing Pool
90+
maintenance_work_mem: str or None
91+
What to set postgresql's maintenance_work_mem option to: this helps
92+
when rebuilding large indexes, etc.
7393
"""
7494

75-
global database
76-
database = db
77-
7895
hdf = HDFMetadata(
7996
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
8097
)
8198

8299
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
83100

84-
for table in tables:
85-
_copy_worker(table, defer_sql_objs=True)
101+
if processes is None:
86102

103+
# Single-threaded run
104+
for table in tables:
105+
_copy_worker(table, engine_args, engine_kwargs, maintenance_work_mem)
87106

88-
def multiprocess_hdf_to_postgres(
89-
file_name, db, keys=[], processes=4, csv_chunksize=10 ** 6
90-
):
91-
"""
92-
Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
107+
elif type(processes) is int:
93108

94-
Parameters
95-
----------
96-
file_name: str
97-
Name of file or path to file of HDF to use to copy
98-
db: SQLAlchemy object
99-
Destination database
100-
keys: list of strings
101-
HDF keys to copy
102-
processes: int
103-
Number of processes in the Pool
104-
csv_chunksize: int
105-
Maximum number of StringIO CSV rows to keep in memory at a time
106-
"""
109+
args = zip(
110+
tables,
111+
[engine_args] * len(tables),
112+
[engine_kwargs] * len(tables),
113+
[maintenance_work_mem] * len(tables)
114+
)
107115

108-
global database
109-
database = db
116+
try:
117+
p = Pool(processes)
118+
result = p.starmap_async(_copy_worker, args, chunksize=1)
110119

111-
hdf = HDFMetadata(
112-
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
113-
)
120+
finally:
121+
del tables
122+
del hdf
123+
p.close()
124+
p.join()
114125

115-
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
126+
if not result.successful():
127+
# If there's an exception, throw it, but we don't care about the
128+
# results
129+
result.get()
116130

117-
try:
118-
p = Pool(processes)
119-
p.map(_copy_worker, tables, chunksize=1)
120-
finally:
121-
del tables
122-
del hdf
123-
p.close()
124-
p.join()
131+
else:
132+
raise ValueError("processes should be int or None.")

0 commit comments

Comments
 (0)