77 RedshiftLoadError ,
88 UnsupportedType ,
99 InvalidDataframeType ,
10+ InvalidRedshiftDiststyle ,
11+ InvalidRedshiftDistkey ,
12+ InvalidRedshiftSortstyle ,
13+ InvalidRedshiftSortkey ,
1014)
1115
1216logger = logging .getLogger (__name__ )
1317
18+ DISTSTYLES = [
19+ "AUTO" ,
20+ "EVEN" ,
21+ "ALL" ,
22+ "KEY" ,
23+ ]
24+
25+ SORTSTYLES = [
26+ "COMPOUND" ,
27+ "INTERLEAVED" ,
28+ ]
29+
1430
1531class Redshift :
1632 def __init__ (self , session ):
@@ -87,24 +103,48 @@ def load_table(
87103 redshift_conn ,
88104 num_files ,
89105 iam_role ,
106+ diststyle = "AUTO" ,
107+ distkey = None ,
108+ sortstyle = "COMPOUND" ,
109+ sortkey = None ,
90110 mode = "append" ,
91111 preserve_index = False ,
92112 ):
113+ """
114+ Load Parquet files into a Redshift table using a manifest file.
115+ Creates the table if necessary.
116+ :param dataframe: Pandas or Spark Dataframe
117+ :param dataframe_type: "pandas" or "spark"
118+ :param manifest_path: S3 path for manifest file (E.g. S3://...)
119+ :param schema_name: Redshift schema
120+ :param table_name: Redshift table name
121+ :param redshift_conn: A PEP 249 compatible connection (Can be generated with Redshift.generate_connection())
122+ :param num_files: Number of files to be loaded
123+ :param iam_role: AWS IAM role with the related permissions
124+ :param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]
125+ https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
126+ :param distkey: Specifies a column name or positional number for the distribution key
127+ :param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED"
128+ https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html
129+ :param sortkey: List of columns to be sorted
130+ :param mode: append or overwrite
131+ :param preserve_index: Should we preserve the Dataframe index? (ONLY for Pandas Dataframe)
132+ :return: None
133+ """
93134 cursor = redshift_conn .cursor ()
94135 if mode == "overwrite" :
95- cursor .execute ("-- AWS DATA WRANGLER\n "
96- f"DROP TABLE IF EXISTS { schema_name } .{ table_name } " )
97- schema = Redshift ._get_redshift_schema (
136+ Redshift ._create_table (
137+ cursor = cursor ,
98138 dataframe = dataframe ,
99139 dataframe_type = dataframe_type ,
140+ schema_name = schema_name ,
141+ table_name = table_name ,
142+ diststyle = diststyle ,
143+ distkey = distkey ,
144+ sortstyle = sortstyle ,
145+ sortkey = sortkey ,
100146 preserve_index = preserve_index ,
101147 )
102- cols_str = "" .join ([f"{ col [0 ]} { col [1 ]} ,\n " for col in schema ])[:- 2 ]
103- sql = (
104- "-- AWS DATA WRANGLER\n "
105- f"CREATE TABLE IF NOT EXISTS { schema_name } .{ table_name } (\n { cols_str } "
106- ") DISTSTYLE AUTO" )
107- cursor .execute (sql )
108148 sql = ("-- AWS DATA WRANGLER\n "
109149 f"COPY { schema_name } .{ table_name } FROM '{ manifest_path } '\n "
110150 f"IAM_ROLE '{ iam_role } '\n "
@@ -129,6 +169,111 @@ def load_table(
129169 redshift_conn .commit ()
130170 cursor .close ()
131171
172+ @staticmethod
173+ def _create_table (
174+ cursor ,
175+ dataframe ,
176+ dataframe_type ,
177+ schema_name ,
178+ table_name ,
179+ diststyle = "AUTO" ,
180+ distkey = None ,
181+ sortstyle = "COMPOUND" ,
182+ sortkey = None ,
183+ preserve_index = False ,
184+ ):
185+ """
186+ Creates Redshift table.
187+ :param cursor: A PEP 249 compatible cursor
188+ :param dataframe: Pandas or Spark Dataframe
189+ :param dataframe_type: "pandas" or "spark"
190+ :param schema_name: Redshift schema
191+ :param table_name: Redshift table name
192+ :param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]
193+ https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
194+ :param distkey: Specifies a column name or positional number for the distribution key
195+ :param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED"
196+ https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html
197+ :param sortkey: List of columns to be sorted
198+ :param preserve_index: Should we preserve the Dataframe index? (ONLY for Pandas Dataframe)
199+ :return: None
200+ """
201+ sql = f"-- AWS DATA WRANGLER\n " \
202+ f"DROP TABLE IF EXISTS { schema_name } .{ table_name } "
203+ logger .debug (f"Drop table query:\n { sql } " )
204+ cursor .execute (sql )
205+ schema = Redshift ._get_redshift_schema (
206+ dataframe = dataframe ,
207+ dataframe_type = dataframe_type ,
208+ preserve_index = preserve_index ,
209+ )
210+ if diststyle :
211+ diststyle = diststyle .upper ()
212+ else :
213+ diststyle = "AUTO"
214+ if sortstyle :
215+ sortstyle = sortstyle .upper ()
216+ else :
217+ sortstyle = "COMPOUND"
218+ Redshift ._validate_parameters (schema = schema ,
219+ diststyle = diststyle ,
220+ distkey = distkey ,
221+ sortstyle = sortstyle ,
222+ sortkey = sortkey )
223+ cols_str = "" .join ([f"{ col [0 ]} { col [1 ]} ,\n " for col in schema ])[:- 2 ]
224+ distkey_str = ""
225+ if distkey and diststyle == "KEY" :
226+ distkey_str = f"\n DISTKEY({ distkey } )"
227+ sortkey_str = ""
228+ if sortkey :
229+ sortkey_str = f"\n { sortstyle } SORTKEY({ ',' .join (sortkey )} )"
230+ sql = (f"-- AWS DATA WRANGLER\n "
231+ f"CREATE TABLE IF NOT EXISTS { schema_name } .{ table_name } (\n "
232+ f"{ cols_str } "
233+ f")\n DISTSTYLE { diststyle } "
234+ f"{ distkey_str } "
235+ f"{ sortkey_str } " )
236+ logger .debug (f"Create table query:\n { sql } " )
237+ cursor .execute (sql )
238+
239+ @staticmethod
240+ def _validate_parameters (schema , diststyle , distkey , sortstyle , sortkey ):
241+ """
242+ Validates the sanity of Redshift's parameters
243+ :param schema: List of tuples (column name, column type)
244+ :param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]
245+ https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
246+ :param distkey: Specifies a column name or positional number for the distribution key
247+ :param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED"
248+ https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html
249+ :param sortkey: List of columns to be sorted
250+ :return: None
251+ """
252+ if diststyle not in DISTSTYLES :
253+ raise InvalidRedshiftDiststyle (
254+ f"diststyle must be in { DISTSTYLES } " )
255+ cols = [x [0 ] for x in schema ]
256+ logger .debug (f"Redshift columns: { cols } " )
257+ if (diststyle == "KEY" ) and (not distkey ):
258+ raise InvalidRedshiftDistkey (
259+ "You must pass a distkey if you intend to use KEY diststyle" )
260+ if distkey and distkey not in cols :
261+ raise InvalidRedshiftDistkey (
262+ f"distkey ({ distkey } ) must be in the columns list: { cols } )" )
263+ if sortstyle and sortstyle not in SORTSTYLES :
264+ raise InvalidRedshiftSortstyle (
265+ f"sortstyle must be in { SORTSTYLES } " )
266+ if sortkey :
267+ if type (sortkey ) != list :
268+ raise InvalidRedshiftSortkey (
269+ f"sortkey must be a List of items in the columns list: { cols } . "
270+ f"Currently value: { sortkey } " )
271+ for key in sortkey :
272+ if key not in cols :
273+ raise InvalidRedshiftSortkey (
274+ f"sortkey must be a List of items in the columns list: { cols } . "
275+ f"Currently value: { key } " )
276+
132277 @staticmethod
133278 def _get_redshift_schema (dataframe , dataframe_type , preserve_index = False ):
134279 schema_built = []
0 commit comments