1
- from .utilities import (
2
- create_file_object ,
3
- df_generator ,
4
- logger ,
5
- classification_to_pandas ,
6
- cast_pandas ,
7
- add_level_metadata ,
8
- HDFMetadata ,
9
- )
10
-
11
- import pandas as pd
1
+ from .utilities import logger
2
+ from io import StringIO
12
3
from sqlalchemy .schema import AddConstraint , DropConstraint
13
4
from sqlalchemy .exc import SQLAlchemyError
14
5
from sqlalchemy .sql .schema import Table
15
6
from sqlalchemy .engine .base import Connection
16
7
17
8
18
9
class BaseCopy (object ):
10
+ """
11
+ Parent class for all common attibutes and methods for copy objects
12
+ """
13
+
19
14
def __init__ (
20
15
self ,
21
16
defer_sql_objs : bool = False ,
22
- conn = None ,
23
- table_obj = None ,
24
- sql_table = None ,
17
+ conn : Connection = None ,
18
+ table_obj : Table = None ,
19
+ sql_table : str = None ,
25
20
csv_chunksize : int = 10 ** 6 ,
26
21
):
22
+ """
23
+ Parameters
24
+ ----------
25
+ defer_sql_objs: multiprocessing has issue with passing SQLALchemy objects, so if
26
+ True, defer attributing these to the object until after pickled by Pool
27
+ conn: SQLAlchemy connection managed outside of the object
28
+ table_obj: SQLAlchemy object for the destination SQL Table
29
+ sql_table: string of SQL table name
30
+ csv_chunksize: max rows to keep in memory when generating CSV for COPY
31
+ """
27
32
28
33
self .rows = 0
29
34
self .columns = None
@@ -46,6 +51,10 @@ def instantiate_sql_objs(self, conn, table_obj):
46
51
self .foreign_keys = table_obj .foreign_key_constraints
47
52
48
53
def drop_pk (self ):
54
+ """
55
+ Drop primary key constraints on PostgreSQL table as well as CASCADE any other
56
+ constraints that may rely on the PK
57
+ """
49
58
logger .info (f"Dropping { self .sql_table } primary key" )
50
59
try :
51
60
with self .conn .begin_nested ():
@@ -54,10 +63,12 @@ def drop_pk(self):
54
63
logger .info (f"{ self .sql_table } primary key not found. Skipping" )
55
64
56
65
def create_pk (self ):
66
+ """Create primary key constraints on PostgreSQL table"""
57
67
logger .info (f"Creating { self .sql_table } primary key" )
58
68
self .conn .execute (AddConstraint (self .primary_key ))
59
69
60
70
def drop_fks (self ):
71
+ """Drop foreign key constraints on PostgreSQL table"""
61
72
for fk in self .foreign_keys :
62
73
logger .info (f"Dropping foreign key { fk .name } " )
63
74
try :
@@ -67,6 +78,7 @@ def drop_fks(self):
67
78
logger .warn (f"Foreign key { fk .name } not found" )
68
79
69
80
def create_fks (self ):
81
+ """Create foreign key constraints on PostgreSQL table"""
70
82
for fk in self .foreign_keys :
71
83
try :
72
84
logger .info (f"Creating foreign key { fk .name } " )
@@ -75,14 +87,17 @@ def create_fks(self):
75
87
logger .warn (f"Error creating foreign key { fk .name } " )
76
88
77
89
def truncate (self ):
90
+ """TRUNCATE PostgreSQL table"""
78
91
logger .info (f"Truncating { self .sql_table } " )
79
92
self .conn .execute (f"TRUNCATE TABLE { self .sql_table } ;" )
80
93
81
94
def analyze (self ):
95
+ """Run ANALYZE on PostgreSQL table"""
82
96
logger .info (f"Analyzing { self .sql_table } " )
83
97
self .conn .execute (f"ANALYZE { self .sql_table } ;" )
84
98
85
- def copy_from_file (self , file_object ):
99
+ def copy_from_file (self , file_object : StringIO ):
100
+ """COPY to PostgreSQL table using StringIO CSV object"""
86
101
cur = self .conn .connection .cursor ()
87
102
cols = ", " .join ([f"{ col } " for col in self .columns ])
88
103
sql = f"COPY { self .sql_table } ({ cols } ) FROM STDIN WITH CSV HEADER FREEZE"
0 commit comments