Skip to content

Commit 147349b

Browse files
bleonard33Brendan Leonard
andauthored
Rewrite updates (#13)
* Allow Postgres schemas besides public * Bump version * SQL functionality updates * Parquet copying updates * remove explicit commits() * commit execute commands * Update requirements.txt * Update setup.py --------- Co-authored-by: Brendan Leonard <[email protected]>
1 parent 220bb90 commit 147349b

File tree

9 files changed

+195
-41
lines changed

9 files changed

+195
-41
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
.DS_Store
12
# Byte-compiled / optimized / DLL files
23
__pycache__/
34
*.py[cod]

README.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@ Pandas-to-postgres allows you to bulk load the contents of large dataframes into
1111
- Removes indexing overhead by automatically detecting and dropping indexes before load, and then re-creating them afterwards
1212
- Allows you to load multiple separate HDF tables in parallel using multiprocessing.Pool
1313
- Works around pandas null value representation issues: float pandas columns that have an integer SQL type get converted into an object column with int values where applicable and NaN elsewhere.
14-
- Provides hooks to modify data as it's loaded
14+
- Provides hooks to modify data as it's loaded
1515

1616
Anecdotally, we use this to load approximately 640 million rows of data from a 7.1GB HDF file (zlib compressed), 75% of it spread across 3 of 23 tables, with a mean number of columns of 6. We load this into an m4.xlarge RDS instance running postgres 10.3 in 54 minutes (approximately 10-15 minutes of which is recreating indexes), using 4 threads.
1717

18-
# Dependencies
18+
# Dependencies
1919

2020
- Python 3
2121
- psycopg2 (for the low level COPY from stdin)
2222
- sqlalchemy (for reflection for indexes)
2323
- pandas
24+
- pyarrow (for copying Parquet files)
2425

2526
# Usage Example
2627

@@ -41,9 +42,14 @@ hdf_to_postgres('./data.h5', engine_args=["psycopg://..."])
4142

4243
# Parallel HDF from file
4344
hdf_to_postgres('./data.h5', engine_args=["psycopg://..."], processes=4)
45+
46+
# Parquet file
47+
with db.engine.connect() as c:
48+
ParquetCopy("/path/to/file.parquet", conn=c, table_obj=table_model).copy()
49+
4450
```
4551

4652
# Other Comparisons
47-
- [Odo](http://odo.pydata.org/): A much more general tool that provides some similar features across many formats and databases, but missing a lot of our specific features. Unfortunately currently buggy and unmaintained.
53+
- [Odo](http://odo.pydata.org/): A much more general tool that provides some similar features across many formats and databases, but missing a lot of our specific features. Unfortunately currently buggy and unmaintained.
4854
- [Postgres Binary Parser](https://github.com/spitz-dan-l/postgres-binary-parser): Uses `COPY WITH BINARY` to remove the pandas to csv bottleneck, but didn't provide as good an improvement for us.
4955
- [pg_bulkload](https://github.com/ossc-db/pg_bulkload): The industry standard, has some overlap with us. Works extremely well if you have CSV files, but not if you have any other format (you'd have to write your own chunked read/write code and pipe it through, at which point you might as well use ours). Judging by benchmarks we're in the same ballpark. Could perhaps replace psycopg2 as our backend eventually.

pandas_to_postgres/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .copy_df import DataFrameCopy
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
3+
from .copy_parquet import ParquetCopy
34
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
45
from .utilities import (
56
hdf_metadata,

pandas_to_postgres/_base_copy.py

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .utilities import get_logger
22
from sqlalchemy.schema import AddConstraint, DropConstraint
33
from sqlalchemy.exc import SQLAlchemyError
4+
from sqlalchemy.sql import text
45

56

67
class BaseCopy(object):
@@ -14,7 +15,7 @@ def __init__(
1415
conn=None,
1516
table_obj=None,
1617
sql_table=None,
17-
csv_chunksize=10 ** 6,
18+
csv_chunksize=10**6,
1819
):
1920
"""
2021
Parameters
@@ -55,7 +56,10 @@ def instantiate_attrs(self, conn, table_obj):
5556
"""
5657
self.conn = conn
5758
self.table_obj = table_obj
58-
self.sql_table = table_obj.name
59+
if table_obj.schema:
60+
self.sql_table = f"{table_obj.schema}.{table_obj.name}"
61+
else:
62+
self.sql_table = table_obj.name
5963
self.logger = get_logger(self.sql_table)
6064
self.primary_key = table_obj.primary_key
6165
self.foreign_keys = table_obj.foreign_key_constraints
@@ -65,48 +69,63 @@ def drop_pk(self):
6569
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
6670
constraints that may rely on the PK
6771
"""
68-
self.logger.info("Dropping {} primary key".format(self.sql_table))
72+
self.logger.info(f"Dropping {self.sql_table} primary key")
6973
try:
70-
with self.conn.begin_nested():
71-
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
74+
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
75+
self.conn.commit()
7276
except SQLAlchemyError:
77+
self.conn.rollback()
7378
self.logger.info(
74-
"{} primary key not found. Skipping".format(self.sql_table)
79+
f"{self.sql_table} primary key not found. Skipping"
7580
)
7681

7782
def create_pk(self):
7883
"""Create primary key constraints on PostgreSQL table"""
79-
self.logger.info("Creating {} primary key".format(self.sql_table))
80-
self.conn.execute(AddConstraint(self.primary_key))
84+
self.logger.info(f"Creating {self.sql_table} primary key")
85+
try:
86+
self.conn.execute(AddConstraint(self.primary_key))
87+
self.conn.commit()
88+
except SQLAlchemyError:
89+
self.conn.rollback()
90+
self.logger.warn(
91+
f"Error creating foreign key {self.primary_key.name}"
92+
)
8193

8294
def drop_fks(self):
8395
"""Drop foreign key constraints on PostgreSQL table"""
8496
for fk in self.foreign_keys:
85-
self.logger.info("Dropping foreign key {}".format(fk.name))
97+
self.logger.info(f"Dropping foreign key {fk.name}")
8698
try:
87-
with self.conn.begin_nested():
88-
self.conn.execute(DropConstraint(fk))
99+
self.conn.execute(DropConstraint(fk))
100+
self.conn.commit()
89101
except SQLAlchemyError:
90-
self.logger.warn("Foreign key {} not found".format(fk.name))
102+
self.conn.rollback()
103+
self.logger.warn(f"Foreign key {fk.name} not found")
91104

92105
def create_fks(self):
93106
"""Create foreign key constraints on PostgreSQL table"""
94107
for fk in self.foreign_keys:
95108
try:
96-
self.logger.info("Creating foreign key {}".format(fk.name))
109+
self.logger.info(f"Creating foreign key {fk.name}")
97110
self.conn.execute(AddConstraint(fk))
111+
self.conn.commit()
98112
except SQLAlchemyError:
99-
self.logger.warn("Error creating foreign key {}".format(fk.name))
113+
self.conn.rollback()
114+
self.logger.warn(f"Error creating foreign key {fk.name}")
100115

101116
def truncate(self):
102117
"""TRUNCATE PostgreSQL table"""
103-
self.logger.info("Truncating {}".format(self.sql_table))
104-
self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table))
118+
self.logger.info(f"Truncating {self.sql_table}")
119+
self.conn.execution_options(autocommit=True).execute(
120+
text(f"TRUNCATE TABLE {self.sql_table};")
121+
)
105122

106123
def analyze(self):
107124
"""Run ANALYZE on PostgreSQL table"""
108-
self.logger.info("Analyzing {}".format(self.sql_table))
109-
self.conn.execute("ANALYZE {};".format(self.sql_table))
125+
self.logger.info(f"Analyzing {self.sql_table}")
126+
self.conn.execution_options(autocommit=True).execute(
127+
text(f"ANALYZE {self.sql_table};")
128+
)
110129

111130
def copy_from_file(self, file_object):
112131
"""
@@ -120,9 +139,8 @@ def copy_from_file(self, file_object):
120139
cur = self.conn.connection.cursor()
121140
file_object.seek(0)
122141
columns = file_object.readline()
123-
sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE".format(
124-
table=self.sql_table, columns=columns
125-
)
142+
143+
sql = f"COPY {self.sql_table} ({columns}) FROM STDIN WITH CSV FREEZE"
126144
cur.copy_expert(sql=sql, file=file_object)
127145

128146
def data_formatting(self, df, functions=[], **kwargs):

pandas_to_postgres/copy_df.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class DataFrameCopy(BaseCopy):
99
"""
1010

1111
def __init__(
12-
self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10 ** 6
12+
self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10**6
1313
):
1414
"""
1515
Parameters
@@ -35,20 +35,20 @@ def copy(self, functions=[cast_pandas]):
3535
self.drop_fks()
3636
self.drop_pk()
3737
self.df = self.data_formatting(self.df, functions=functions)
38+
3839
with self.conn.begin():
3940
self.truncate()
4041

4142
self.logger.info("Creating generator for chunking dataframe")
42-
for chunk in df_generator(self.df, self.csv_chunksize):
43-
43+
for chunk in df_generator(self.df, self.csv_chunksize, logger=self.logger):
4444
self.logger.info("Creating CSV in memory")
4545
fo = create_file_object(chunk)
4646

4747
self.logger.info("Copying chunk to database")
4848
self.copy_from_file(fo)
4949
del fo
5050

51-
self.logger.info("All chunks copied ({} rows)".format(self.rows))
51+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
5252

5353
self.create_pk()
5454
self.create_fks()

pandas_to_postgres/copy_parquet.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import pyarrow.parquet as pq
2+
from .utilities import create_file_object, df_generator, cast_pandas
3+
from ._base_copy import BaseCopy
4+
from typing import Optional
5+
6+
7+
class ParquetCopy(BaseCopy):
8+
"""
9+
Class for handling a standard case of reading a Parquet file into a Pandas
10+
DataFrame, iterating over it in chunks, and COPYing to PostgreSQL via StringIO CSV
11+
"""
12+
13+
def __init__(
14+
self,
15+
file_name: str,
16+
defer_sql_objs: bool = False,
17+
conn=None,
18+
table_obj=None,
19+
sql_table: Optional[str] = None,
20+
schema: Optional[str] = None,
21+
csv_chunksize=10**6,
22+
parquet_chunksize=10**7,
23+
):
24+
super().__init__(defer_sql_objs, conn, table_obj, sql_table, csv_chunksize)
25+
26+
self.parquet_file = pq.ParquetFile(file_name)
27+
self.parquet_chunksize = parquet_chunksize
28+
self.total_rows = self.parquet_file.metadata.num_rows
29+
30+
self.logger.info("*** {} ***".format(file_name))
31+
32+
if self.total_rows > self.parquet_chunksize:
33+
if self.total_rows % self.parquet_chunksize:
34+
self.total_chunks = (self.total_rows // self.parquet_chunksize) + 1
35+
else:
36+
self.total_chunks = self.total_rows // self.parquet_chunksize
37+
38+
self.big_copy = True
39+
else:
40+
self.total_chunks = 1
41+
self.big_copy = False
42+
43+
def copy(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
44+
"""
45+
Go through sequence to COPY data to PostgreSQL table, including dropping Primary
46+
and Foreign Keys to optimize speed, TRUNCATE table, COPY data, recreate keys,
47+
and run ANALYZE.
48+
49+
Parameters
50+
----------
51+
data_formatters: list of functions to apply to df during sequence. Note that
52+
each of these functions should be able to handle kwargs for one another
53+
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
54+
"""
55+
self.drop_fks()
56+
self.drop_pk()
57+
58+
# These need to be one transaction to use COPY FREEZE
59+
with self.conn.begin():
60+
self.truncate()
61+
if self.big_copy:
62+
self.big_parquet_to_pg(
63+
data_formatters=data_formatters,
64+
data_formatter_kwargs=data_formatter_kwargs,
65+
)
66+
else:
67+
self.parquet_to_pg(
68+
data_formatters=data_formatters,
69+
data_formatter_kwargs=data_formatter_kwargs,
70+
)
71+
72+
self.create_pk()
73+
self.create_fks()
74+
self.analyze()
75+
76+
def parquet_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
77+
self.logger.info("Reading Parquet file")
78+
df = self.parquet_file.read().to_pandas()
79+
80+
self.logger.info("Formatting data")
81+
df = self.data_formatting(
82+
df, functions=data_formatters, **data_formatter_kwargs
83+
)
84+
85+
self.logger.info("Creating generator for chunking dataframe")
86+
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
87+
self.logger.info("Creating CSV in memory")
88+
fo = create_file_object(chunk)
89+
90+
self.logger.info("Copying chunk to database")
91+
self.copy_from_file(fo)
92+
del fo
93+
del df
94+
self.logger.info(f"All chunks copied ({self.total_rows} rows)")
95+
96+
def big_parquet_to_pg(
97+
self, data_formatters=[cast_pandas], data_formatter_kwargs={}
98+
):
99+
copied_rows = 0
100+
n_chunk = 0
101+
for batch in self.parquet_file.iter_batches(batch_size=self.parquet_chunksize):
102+
n_chunk += 1
103+
self.logger.info(f"*** Parquet chunk {n_chunk} of {self.total_chunks} ***")
104+
df = batch.to_pandas()
105+
batch_rows = df.shape[0]
106+
107+
self.logger.info("Formatting data")
108+
df = self.data_formatting(
109+
df, functions=data_formatters, **data_formatter_kwargs
110+
)
111+
112+
self.logger.info("Creating generator for chunking dataframe")
113+
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
114+
self.logger.info("Creating CSV in memory")
115+
fo = create_file_object(chunk)
116+
117+
self.logger.info("Copying chunk to database")
118+
self.copy_from_file(fo)
119+
del fo
120+
del df
121+
122+
copied_rows += batch_rows
123+
124+
self.logger.info(f"Copied {copied_rows:,} of {self.total_rows:,} rows")
125+
126+
self.logger.info(f"All chunks copied ({self.total_rows:,} rows)")

pandas_to_postgres/utilities.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def create_file_object(df):
9292
return file_object
9393

9494

95-
def df_generator(df, chunksize=10 ** 6, logger=None):
95+
def df_generator(df, chunksize=10**6, logger=None):
9696
"""
9797
Create a generator to iterate over chunks of a dataframe
9898
@@ -149,9 +149,10 @@ def cast_pandas(df, columns=None, copy_obj=None, logger=None, **kwargs):
149149
for col in columns:
150150
try:
151151
if str(col.type) in ["INTEGER", "BIGINT"]:
152-
df[col.name] = df[col.name].apply(
153-
lambda x: None if isna(x) else int(x), convert_dtype=False
154-
)
152+
df[col.name] = df[col.name].astype("Int64")
153+
# df[col.name] = df[col.name].apply(
154+
# lambda x: None if isna(x) else int(x), convert_dtype=False
155+
# )
155156
elif str(col.type) == "BOOLEAN":
156157
df[col.name] = df[col.name].apply(
157158
lambda x: None if isna(x) else bool(x), convert_dtype=False

requirements.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
pandas>=0.22.0
2-
psycopg2-binary==2.7.5
3-
SQLAlchemy==1.3.5
4-
tables
1+
pandas==2.1.2
2+
psycopg2-binary==2.9.9
3+
SQLAlchemy==2.0.32
4+
tables==3.10.0
5+
pyarrow==17.0.0

setup.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ def read(fname, lines=False):
1212

1313
setup(
1414
name="pandas_to_postgres",
15-
version="v0.0.3",
16-
author="Brendan Leonard <Harvard CID>",
15+
version="v0.0.4",
16+
author="Brendan Leonard <Harvard Growth Lab>",
1717
description=(
18-
"Utility to copy Pandas DataFrames and DataFrames stored in HDF5 files "
19-
"to PostgreSQL "
18+
"Utility to copy Pandas DataFrames and DataFrames stored in HDF5 "
19+
"or Parquet files to PostgreSQL databases."
2020
),
2121
url="http://github.com/cid-harvard/pandas-to-postgres",
2222
packages=find_packages(),
23-
install_requires=["SQLAlchemy", "pandas", "psycopg2", "tables"],
23+
install_requires=["SQLAlchemy", "pandas", "psycopg2-binary", "tables", "pyarrow"],
2424
long_description=read("README.md"),
2525
classifiers=[
2626
"Topic :: Database",

0 commit comments

Comments
 (0)