@@ -201,7 +201,6 @@ def _get_load_sql(path: str, schema_name: str, table_name: str, engine: str, reg
201201 f"'({ bucket } ,{ key } ,{ region } )')" )
202202 elif "mysql" in engine .lower ():
203203 sql = ("-- AWS DATA WRANGLER\n "
204- "SELECT aws_s3.table_import_from_s3(\n "
205204 f"LOAD DATA FROM S3 MANIFEST '{ path } '\n "
206205 "REPLACE\n "
207206 f"INTO TABLE { schema_name } .{ table_name } \n "
@@ -240,7 +239,7 @@ def _create_table(cursor,
240239 preserve_index = preserve_index ,
241240 engine = engine )
242241 cols_str : str = "" .join ([f"{ col [0 ]} { col [1 ]} ,\n " for col in schema ])[:- 2 ]
243- sql = ( f"-- AWS DATA WRANGLER\n " f"CREATE TABLE IF NOT EXISTS { schema_name } .{ table_name } (\n " f"{ cols_str } )" )
242+ sql = f"-- AWS DATA WRANGLER\n " f"CREATE TABLE IF NOT EXISTS { schema_name } .{ table_name } (\n " f"{ cols_str } )"
244243 logger .debug (f"Create table query:\n { sql } " )
245244 cursor .execute (sql )
246245
@@ -265,3 +264,34 @@ def _get_schema(dataframe,
265264 else :
266265 raise InvalidDataframeType (f"{ dataframe_type } is not a valid DataFrame type. Please use 'pandas'!" )
267266 return schema_built
267+
268+ def to_s3 (self , sql : str , path : str , connection : Any , engine : str = "mysql" ) -> str :
269+ """
270+ Write a query result on S3
271+
272+ :param sql: SQL Query
273+ :param path: AWS S3 path to write the data (e.g. s3://...)
274+ :param connection: A PEP 249 compatible connection (Can be generated with Redshift.generate_connection())
275+ :param engine: Only "mysql" by now
276+ :return: Manifest S3 path
277+ """
278+ if "mysql" not in engine .lower ():
279+ raise InvalidEngine (f"{ engine } is not a valid engine. Please use 'mysql'!" )
280+ path = path [- 1 ] if path [- 1 ] == "/" else path
281+ self ._session .s3 .delete_objects (path = path )
282+ sql = f"{ sql } \n " \
283+ f"INTO OUTFILE S3 '{ path } '\n " \
284+ "FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\" ' ESCAPED BY '\\ \\ '\n " \
285+ "LINES TERMINATED BY '\\ n'\n " \
286+ "MANIFEST ON\n " \
287+ "OVERWRITE ON"
288+ with connection .cursor () as cursor :
289+ logger .debug (sql )
290+ cursor .execute (sql )
291+ connection .commit ()
292+ return path + ".manifest"
293+
294+ def extract_manifest_paths (self , path : str ) -> List [str ]:
295+ bucket_name , key_path = Aurora ._parse_path (path )
296+ body : bytes = self ._client_s3 .get_object (Bucket = bucket_name , Key = key_path )["Body" ].read ()
297+ return [x ["url" ] for x in json .loads (body .decode ('utf-8' ))["entries" ]]
0 commit comments