Skip to content

Commit f8f6c82

Browse files
committed
Refactor the hdf_to_postgres functions into one, create the engine in
the worker threads instead of passing it in.
1 parent 912c7fb commit f8f6c82

File tree

2 files changed

+57
-57
lines changed

2 files changed

+57
-57
lines changed

pandas_to_postgres/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
33
from .hdf_to_postgres import (
44
hdf_to_postgres,
5-
multiprocess_hdf_to_postgres,
65
create_hdf_table_objects,
76
)
87
from .utilities import (

pandas_to_postgres/hdf_to_postgres.py

Lines changed: 57 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from multiprocessing import Pool
22

3+
from sqlalchemy import MetaData, create_engine
4+
35
from .copy_hdf import HDFTableCopy
46
from .utilities import HDFMetadata
57

@@ -36,91 +38,90 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
3638
return tables
3739

3840

39-
def _copy_worker(copy_obj, defer_sql_objs=True):
40-
"""
41-
Handle a SQLAlchemy connection and copy using HDFTableCopy object
41+
def _copy_worker(copy_obj, engine_args, engine_kwargs, maintenance_work_mem="1G"):
42+
43+
# Since we fork()ed into a new process, the engine contains process
44+
# specific stuff that shouldn't be shared - this creates a fresh Engine
45+
# with the same settings but without those.
46+
47+
engine = create_engine(*engine_args, **engine_kwargs)
48+
metadata = MetaData(bind=engine)
49+
metadata.reflect()
50+
51+
with engine.connect() as conn:
4252

43-
copy_obj: HDFTableCopy or subclass
44-
Object to use to run the copy() method on
45-
defer_sql_objs: bool
46-
If True, SQL objects were not build upon instantiation of copy_obj and should
47-
be built before copying data to db (needed for multiprocessing)
48-
"""
49-
database.engine.dispose()
50-
with database.engine.connect() as conn:
5153
conn.execution_options(autocommit=True)
52-
conn.execute("SET maintenance_work_mem TO 1000000;")
5354

54-
if defer_sql_objs:
55-
table_obj = database.metadata.tables[copy_obj.sql_table]
56-
copy_obj.instantiate_sql_objs(conn, table_obj)
55+
if maintenance_work_mem is not None:
56+
conn.execute("SET maintenance_work_mem TO {};".format(maintenance_work_mem))
5757

58+
# Get SQLAlchemy Table object
59+
table_obj = metadata.tables.get(copy_obj.sql_table, None)
60+
if table_obj is None:
61+
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))
62+
63+
copy_obj.instantiate_sql_objs(conn, table_obj)
64+
65+
# Run the task
5866
copy_obj.copy()
5967

6068

61-
def hdf_to_postgres(file_name, db, keys=[], csv_chunksize=10 ** 6):
69+
def hdf_to_postgres(file_name, engine_args, engine_kwargs={}, keys=[],
70+
csv_chunksize=10 ** 6, processes=None,
71+
maintenance_work_mem=None):
6272
"""
6373
Copy tables in a HDF file to PostgreSQL database
6474
6575
Parameters
6676
----------
6777
file_name: str
6878
name of file or path to file of HDF to use to copy
69-
db: SQLAlchemy database object
70-
destination database
79+
engine_args: list
80+
arguments to pass into create_engine()
81+
engine_kwargs: dict
82+
keyword arguments to pass into create_engine()
7183
keys: list of strings
7284
HDF keys to copy
7385
csv_chunksize: int
7486
Maximum number of StringIO CSV rows to keep in memory at a time
87+
processes: int or None
88+
If None, run single threaded. If integer, number of processes in the
89+
multiprocessing Pool
90+
maintenance_work_mem: str or None
91+
What to set postgresql's maintenance_work_mem option to: this helps
92+
when rebuilding large indexes, etc.
7593
"""
7694

77-
global database
78-
database = db
79-
8095
hdf = HDFMetadata(
8196
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
8297
)
8398

8499
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
85100

86-
for table in tables:
87-
_copy_worker(table, defer_sql_objs=True)
101+
if processes is None:
88102

103+
# Single-threaded run
104+
for table in tables:
105+
_copy_worker(table, engine_args, engine_kwargs, maintenance_work_mem)
89106

90-
def multiprocess_hdf_to_postgres(
91-
file_name, db, keys=[], processes=4, csv_chunksize=10 ** 6
92-
):
93-
"""
94-
Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
95-
96-
Parameters
97-
----------
98-
file_name: str
99-
Name of file or path to file of HDF to use to copy
100-
db: SQLAlchemy object
101-
Destination database
102-
keys: list of strings
103-
HDF keys to copy
104-
processes: int
105-
Number of processes in the Pool
106-
csv_chunksize: int
107-
Maximum number of StringIO CSV rows to keep in memory at a time
108-
"""
107+
elif type(processes) is int:
109108

110-
global database
111-
database = db
109+
args = zip(
110+
tables,
111+
[engine_args] * len(tables),
112+
[engine_kwargs] * len(tables),
113+
[maintenance_work_mem] * len(tables)
114+
)
112115

113-
hdf = HDFMetadata(
114-
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
115-
)
116+
try:
117+
p = Pool(processes)
118+
p.starmap(_copy_worker, args, chunksize=1)
116119

117-
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
120+
finally:
121+
del tables
122+
del hdf
123+
p.close()
124+
p.join()
118125

119-
try:
120-
p = Pool(processes)
121-
p.map(_copy_worker, tables, chunksize=1)
122-
finally:
123-
del tables
124-
del hdf
125-
p.close()
126-
p.join()
126+
else:
127+
raise ValueError("processes should be int or None.")

0 commit comments

Comments
 (0)