Skip to content

Commit d94dd80

Browse files
committed
Move CID-specific data corralling functions to atlas_core from pandas-to-postgres
1 parent c6f09e8 commit d94dd80

File tree

5 files changed

+81
-113
lines changed

5 files changed

+81
-113
lines changed

pandas_to_postgres/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .copy_df import DataFrameCopy
2+
from .copy_hdf import HDFTableCopy, ClassificationHDFTableCopy, BigHDFTableCopy
3+
from .utilities import (
4+
logger,
5+
HDFMetadata,
6+
create_file_object,
7+
df_generator,
8+
cast_pandas,
9+
)

pandas_to_postgres/_base_copy.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from .utilities import logger
22
from io import StringIO
3+
from pandas import DataFrame
4+
from typing import Callable, List
35
from sqlalchemy.schema import AddConstraint, DropConstraint
46
from sqlalchemy.exc import SQLAlchemyError
57
from sqlalchemy.sql.schema import Table
@@ -43,6 +45,11 @@ def instantiate_sql_objs(self, conn, table_obj):
4345
"""
4446
When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes
4547
issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs
48+
49+
Parameters
50+
----------
51+
conn: SQLAlchemy connection managed outside of the object
52+
table_obj: SQLAlchemy object for the destination SQL Table
4653
"""
4754
self.conn = conn
4855
self.table_obj = table_obj
@@ -97,8 +104,19 @@ def analyze(self):
97104
self.conn.execute(f"ANALYZE {self.sql_table};")
98105

99106
def copy_from_file(self, file_object: StringIO):
100-
"""COPY to PostgreSQL table using StringIO CSV object"""
107+
"""
108+
COPY to PostgreSQL table using StringIO CSV object
109+
110+
Parameters
111+
----------
112+
file_object: CSV formatted data to COPY from DataFrame to PostgreSQL
113+
"""
101114
cur = self.conn.connection.cursor()
102115
cols = ", ".join([f"{col}" for col in self.columns])
103116
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
104117
cur.copy_expert(sql=sql, file=file_object)
118+
119+
def data_formatting(self, df: DataFrame, functions: List[Callable] = [], **kwargs):
120+
for f in functions:
121+
df = f(df, copy_obj=self, **kwargs)
122+
return df

pandas_to_postgres/copy_df.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
1-
from .utilities import (
2-
create_file_object,
3-
df_generator,
4-
logger,
5-
cast_pandas,
6-
add_level_metadata,
7-
)
8-
1+
from .utilities import create_file_object, df_generator, logger, cast_pandas
92
from ._base_copy import BaseCopy
103

114
import pandas as pd
@@ -30,18 +23,10 @@ def __init__(
3023
self.columns = self.df.columns
3124
self.rows = self.df.shape[0]
3225

33-
def format_df(self):
34-
# Handle NaN --> None type casting
35-
self.df = cast_pandas(self.df, self.table_obj)
36-
37-
# Add level (constant) data to frames from dict
38-
if self.levels:
39-
self.df = add_level_metadata(self.df, self.levels)
40-
41-
def copy(self):
26+
def copy(self, functions=[cast_pandas]):
4227
self.drop_fks()
4328
self.drop_pk()
44-
self.format_df()
29+
self.df = self.data_formatting(self.df, functions=functions)
4530
with self.conn.begin():
4631
self.truncate()
4732

pandas_to_postgres/copy_hdf.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
create_file_object,
33
df_generator,
44
logger,
5-
classification_to_pandas,
65
cast_pandas,
7-
add_level_metadata,
86
HDFMetadata,
97
)
108

@@ -26,9 +24,7 @@ def __init__(
2624
sql_table: str = None,
2725
csv_chunksize: int = 10 ** 6,
2826
):
29-
super().__init__(
30-
self, defer_sql_objs, conn, table_obj, sql_table, csv_chunksize
31-
)
27+
super().__init__(defer_sql_objs, conn, table_obj, sql_table, csv_chunksize)
3228

3329
self.hdf_tables = hdf_tables
3430

@@ -37,35 +33,40 @@ def __init__(
3733
self.file_name = hdf_meta.file_name
3834
self.hdf_chunksize = hdf_meta.chunksize
3935

40-
def copy_table(self):
36+
def copy(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
4137
self.drop_fks()
4238
self.drop_pk()
4339

4440
# These need to be one transaction to use COPY FREEZE
4541
with self.conn.begin():
4642
self.truncate()
47-
self.hdf_to_pg()
43+
self.hdf_to_pg(
44+
data_formatters=data_formatters,
45+
data_formatter_kwargs=data_formatter_kwargs,
46+
)
4847

4948
self.create_pk()
5049
self.create_fks()
5150
self.analyze()
5251

53-
def hdf_to_pg(self):
52+
def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
5453
if self.hdf_tables is None:
5554
logger.warn(f"No HDF table found for SQL table {self.sql_table}")
5655
return
5756

5857
for hdf_table in self.hdf_tables:
5958
logger.info(f"*** {hdf_table} ***")
60-
hdf_levels = self.levels.get(hdf_table)
6159

6260
logger.info("Reading HDF table")
6361
df = pd.read_hdf(self.file_name, key=hdf_table)
6462
self.rows += len(df)
6563

66-
# Handle NaN --> None type casting and adding const level data
67-
df = cast_pandas(df, self.table_obj)
68-
df = add_level_metadata(df, hdf_levels)
64+
data_formatter_kwargs["hdf_table"] = hdf_table
65+
66+
logger.info("Formatting data")
67+
df = self.data_formatting(
68+
df, functions=data_formatters, **data_formatter_kwargs
69+
)
6970

7071
if self.columns is None:
7172
self.columns = df.columns
@@ -95,7 +96,6 @@ def __init__(
9596
csv_chunksize: int = 10 ** 6,
9697
):
9798
super().__init__(
98-
self,
9999
hdf_tables,
100100
hdf_meta,
101101
defer_sql_objs,
@@ -105,7 +105,7 @@ def __init__(
105105
csv_chunksize,
106106
)
107107

108-
def hdf_to_pg(self):
108+
def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
109109
if self.hdf_tables is None:
110110
logger.warn("No HDF table found for SQL table {self.sql_table}")
111111
return
@@ -116,9 +116,11 @@ def hdf_to_pg(self):
116116
df = pd.read_hdf(self.file_name, key=hdf_table)
117117
self.rows += len(df)
118118

119-
logger.info("Formatting classification")
120-
df = classification_to_pandas(df)
121-
df = cast_pandas(df, self.table_obj)
119+
data_formatter_kwargs["hdf_table"] = hdf_table
120+
logger.info("Formatting data")
121+
df = self.data_formatting(
122+
df, functions=data_formatters, **data_formatter_kwargs
123+
)
122124

123125
if self.columns is None:
124126
self.columns = df.columns
@@ -145,7 +147,6 @@ def __init__(
145147
csv_chunksize: int = 10 ** 6,
146148
):
147149
super().__init__(
148-
self,
149150
hdf_tables,
150151
hdf_meta,
151152
defer_sql_objs,
@@ -155,14 +156,13 @@ def __init__(
155156
csv_chunksize,
156157
)
157158

158-
def hdf_to_pg(self):
159+
def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
159160
if self.hdf_tables is None:
160161
logger.warn(f"No HDF table found for SQL table {self.sql_table}")
161162
return
162163

163164
for hdf_table in self.hdf_tables:
164165
logger.info(f"*** {hdf_table} ***")
165-
hdf_levels = self.levels.get(hdf_table)
166166

167167
with pd.HDFStore(self.file_name) as store:
168168
nrows = store.get_storer(hdf_table).nrows
@@ -183,9 +183,11 @@ def hdf_to_pg(self):
183183

184184
start += self.hdf_chunksize
185185

186-
# Handle NaN --> None type casting and adding const level data
187-
df = cast_pandas(df, self.table_obj)
188-
df = add_level_metadata(df, hdf_levels)
186+
data_formatter_kwargs["hdf_table"] = hdf_table
187+
logger.info("Formatting data")
188+
df = self.data_formatting(
189+
df, functions=data_formatters, **data_formatter_kwargs
190+
)
189191

190192
if self.columns is None:
191193
self.columns = df.columns

pandas_to_postgres/utilities.py

Lines changed: 25 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import List
23
import pandas as pd
34
from sqlalchemy.sql.schema import Table
45

@@ -16,7 +17,12 @@
1617

1718

1819
class HDFMetadata(object):
19-
def __init__(self, file_name="./data.h5", keys=None, chunksize=10 ** 7):
20+
def __init__(
21+
self,
22+
file_name: str = "./data.h5",
23+
keys: List[str] = None,
24+
chunksize: int = 10 ** 7,
25+
):
2026
self.file_name = file_name
2127
self.chunksize = chunksize
2228
self.sql_to_hdf = defaultdict(set)
@@ -42,7 +48,7 @@ def __init__(self, file_name="./data.h5", keys=None, chunksize=10 ** 7):
4248
logger.warn(f"No SQL table name found for {key}")
4349

4450

45-
def create_file_object(df):
51+
def create_file_object(df: pd.DataFrame) -> StringIO:
4652
"""
4753
Writes pandas dataframe to an in-memory StringIO file object. Adapted from
4854
https://gist.github.com/mangecoeur/1fbd63d4758c2ba0c470#gistcomment-2086007
@@ -53,16 +59,14 @@ def create_file_object(df):
5359
return file_object
5460

5561

56-
def df_generator(df, chunksize):
62+
def df_generator(df: pd.DataFrame, chunksize: int):
5763
"""
5864
Create a generator to iterate over chunks of a dataframe
5965
6066
Parameters
6167
----------
62-
df: pandas dataframe
63-
dataframe to iterate over
64-
chunksize: int
65-
max number of rows to return in a chunk
68+
df: pandas dataframe to iterate over
69+
chunksize: max number of rows to return in a chunk
6670
"""
6771
rows = 0
6872
if not df.shape[0] % chunksize:
@@ -76,29 +80,35 @@ def df_generator(df, chunksize):
7680
rows += chunksize
7781

7882

79-
def cast_pandas(df, sql_table):
83+
def cast_pandas(
84+
df: pd.DataFrame, columns: list = None, copy_obj: object = None, **kwargs
85+
) -> pd.DataFrame:
8086
"""
8187
Pandas does not handle null values in integer or boolean fields out of the
8288
box, so cast fields that should be these types in the database to object
8389
fields and change np.nan to None
8490
8591
Parameters
8692
----------
87-
df: pandas dataframe
88-
data frame with fields that are desired to be int or bool as float with
93+
df: data frame with fields that are desired to be int or bool as float with
8994
np.nan that should correspond to None
9095
91-
sql_table: SQLAlchemy model
92-
destination table object with field names corresponding to those in df
96+
columns: list of SQLAlchemy Columns to iterate through to determine data types
97+
98+
copy_obj: instance of BaseCopy passed from the BaseCopy.data_formatting method where
99+
we can access BaseCopy.table_obj.columns
93100
94101
Returns
95102
-------
96-
df: pandas dataframe
97-
dataframe with fields that correspond to Postgres int, bigint, and bool
103+
df: dataframe with fields that correspond to Postgres int, bigint, and bool
98104
fields changed to objects with None values for null
99105
"""
100106

101-
for col in sql_table.columns:
107+
if columns is None and copy_obj is None:
108+
raise ValueError("One of columns or copy_obj must be supplied")
109+
110+
columns = columns or copy_obj.table_obj.columns
111+
for col in columns:
102112
if str(col.type) in ["INTEGER", "BIGINT"]:
103113
df[col.name] = df[col.name].apply(
104114
lambda x: None if pd.isna(x) else int(x), convert_dtype=False
@@ -109,59 +119,3 @@ def cast_pandas(df, sql_table):
109119
)
110120

111121
return df
112-
113-
114-
def add_level_metadata(df, hdf_levels):
115-
"""
116-
Updates dataframe fields for constant "_level" fields
117-
118-
Parameters
119-
----------
120-
df: pandas DataFrame
121-
hdf_levels: dict
122-
dict of level:value fields that are constant for the entire dataframe
123-
124-
Returns
125-
------
126-
df: pandas DataFrame
127-
"""
128-
129-
if hdf_levels:
130-
logger.info("Adding level metadata values")
131-
for entity, level_value in hdf_levels.items():
132-
df[entity + "_level"] = level_value
133-
134-
return df
135-
136-
137-
def classification_to_pandas(
138-
df,
139-
optional_fields=[
140-
"name_es",
141-
"name_short_en",
142-
"name_short_es",
143-
"description_en",
144-
"description_es",
145-
"is_trusted",
146-
"in_rankings",
147-
],
148-
):
149-
"""Convert a classification from the format it comes in the classification
150-
file (which is the format from the 'classifications' github repository)
151-
into the format that the flask apps use. Mostly just a thing for dropping
152-
unneeded columns and renaming existing ones.
153-
154-
The optional_fields allows you to specify which fields should be considered
155-
optional, i.e. it'll still work if this field doesn't exist in the
156-
classification, like the description fields for example.
157-
"""
158-
159-
# Sort fields and change names appropriately
160-
new_df = df[["index", "code", "name", "level", "parent_id"]]
161-
new_df = new_df.rename(columns={"index": "id", "name": "name_en"})
162-
163-
for field in optional_fields:
164-
if field in df:
165-
new_df[field] = df[field]
166-
167-
return new_df

0 commit comments

Comments
 (0)