Skip to content

Commit 4bc9bcb

Browse files
committed
plug-and-play functions to copy HDF files to PostgreSQL
1 parent 4c41d08 commit 4bc9bcb

File tree

3 files changed

+99
-1
lines changed

3 files changed

+99
-1
lines changed

pandas_to_postgres/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
from .copy_df import DataFrameCopy
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
3+
from .hdf_to_postgres import (
4+
hdf_to_postgres,
5+
multiprocess_hdf_to_postgres,
6+
create_hdf_table_objects,
7+
)
38
from .utilities import (
49
logger,
510
HDFMetadata,

pandas_to_postgres/hdf_to_postgres.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from typing import List
2+
from multiprocessing import Pool
3+
import SQLAlchemy
4+
from pandas_to_postgres import HDFTableCopy, HDFMetadata
5+
6+
7+
def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
8+
tables = []
9+
10+
for sql_table, hdf_tables in hdf_meta.sql_to_hdf.items():
11+
tables.append(
12+
HDFTableCopy(
13+
hdf_tables,
14+
hdf_meta,
15+
defer_sql_objs=True,
16+
sql_table=sql_table,
17+
csv_chunksize=csv_chunksize,
18+
)
19+
)
20+
21+
return tables
22+
23+
24+
def copy_worker(db: SQLAlchemy, copy_obj: HDFTableCopy, defer_sql_objs: bool = True):
25+
db.engine.dispose()
26+
with db.engine.connect() as conn:
27+
conn.execution_options(autocommit=True)
28+
conn.execute("SET maintenance_work_mem TO 1000000;")
29+
30+
if defer_sql_objs:
31+
table_obj = db.metadata.tables[copy_obj.sql_table]
32+
copy_obj.instantiate_sql_objs(conn, table_obj)
33+
34+
copy_obj.copy()
35+
36+
37+
def hdf_to_postgres(
38+
file_name: str, db: SQLAlchemy, keys: List[str] = [], csv_chunksize: int = 10 ** 6
39+
):
40+
"""
41+
Copy tables in a HDF file to PostgreSQL database
42+
43+
Parameters
44+
----------
45+
file_name: name of file or path to file of HDF to use to copy
46+
db: SQLAlchemy object of destination database
47+
keys: list of HDF keys to copy
48+
csv_chunksize: maximum number of StringIO CSV rows to keep in memory at a time
49+
"""
50+
hdf = HDFMetadata(
51+
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
52+
)
53+
54+
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
55+
56+
for table in tables:
57+
copy_worker(table, defer_sql_objs=True)
58+
59+
60+
def multiprocess_hdf_to_postgres(
61+
file_name: str,
62+
db: SQLAlchemy,
63+
keys: List[str] = [],
64+
processes: int = 4,
65+
csv_chunksize: int = 10 ** 6,
66+
):
67+
"""
68+
Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
69+
70+
Parameters
71+
----------
72+
file_name: name of file or path to file of HDF to use to copy
73+
db: SQLAlchemy object of destination database
74+
keys: list of HDF keys to copy
75+
processes: number of processes in the Pool
76+
csv_chunksize: maximum number of StringIO CSV rows to keep in memory at a time
77+
"""
78+
79+
hdf = HDFMetadata(
80+
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
81+
)
82+
83+
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
84+
85+
try:
86+
p = Pool(processes)
87+
p.map(copy_worker, tables, chunksize=1)
88+
finally:
89+
del tables
90+
del hdf
91+
p.close()
92+
p.join()

pandas_to_postgres/utilities.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ def __init__(
3434
----------
3535
file_name: path to hdf file to copy from
3636
keys: list of hdf keys to copy data from
37-
chunksize: maximum rows read from an hdf file into a pandas dataframe
37+
chunksize: maximum rows read from an hdf file into a pandas dataframe when using
38+
the BigTable protocol
3839
metadata_attr: location of relevant metadata in store.get_storer().attrs
3940
metadata_keys: list of keys to get from metadata store
4041
"""

0 commit comments

Comments
 (0)