@@ -339,6 +339,54 @@ def _resolve_query_without_cache_ctas(
339339 )
340340
341341
342+ def _resolve_query_without_cache_unload (
343+ sql : str ,
344+ file_format : str ,
345+ compression : Optional [str ],
346+ field_delimiter : Optional [str ],
347+ partitioned_by : Optional [List [str ]],
348+ database : Optional [str ],
349+ data_source : Optional [str ],
350+ s3_output : str ,
351+ keep_files : bool ,
352+ chunksize : Union [int , bool , None ],
353+ categories : Optional [List [str ]],
354+ encryption : Optional [str ],
355+ kms_key : Optional [str ],
356+ wg_config : _WorkGroupConfig ,
357+ use_threads : Union [bool , int ],
358+ s3_additional_kwargs : Optional [Dict [str , Any ]],
359+ boto3_session : boto3 .Session ,
360+ pyarrow_additional_kwargs : Optional [Dict [str , Any ]] = None ,
361+ ) -> Union [pd .DataFrame , Iterator [pd .DataFrame ]]:
362+ query_metadata = _unload (
363+ sql ,
364+ s3_output ,
365+ file_format ,
366+ compression ,
367+ field_delimiter ,
368+ partitioned_by ,
369+ wg_config ,
370+ database ,
371+ encryption ,
372+ kms_key ,
373+ boto3_session ,
374+ data_source ,
375+ )
376+ if file_format == "PARQUET" :
377+ return _fetch_parquet_result (
378+ query_metadata = query_metadata ,
379+ keep_files = keep_files ,
380+ categories = categories ,
381+ chunksize = chunksize ,
382+ use_threads = use_threads ,
383+ s3_additional_kwargs = s3_additional_kwargs ,
384+ boto3_session = boto3_session ,
385+ pyarrow_additional_kwargs = pyarrow_additional_kwargs ,
386+ )
387+ raise exceptions .InvalidArgumentValue ("Only PARQUET file format is supported when unload_approach=True." )
388+
389+
342390def _resolve_query_without_cache_regular (
343391 sql : str ,
344392 database : Optional [str ],
@@ -390,6 +438,8 @@ def _resolve_query_without_cache(
390438 database : str ,
391439 data_source : Optional [str ],
392440 ctas_approach : bool ,
441+ unload_approach : bool ,
442+ unload_parameters : Optional [Dict [str , Any ]],
393443 categories : Optional [List [str ]],
394444 chunksize : Union [int , bool , None ],
395445 s3_output : Optional [str ],
@@ -443,6 +493,29 @@ def _resolve_query_without_cache(
443493 catalog .delete_table_if_exists (
444494 database = ctas_database_name or database , table = name , boto3_session = boto3_session
445495 )
496+ elif unload_approach is True :
497+ if unload_parameters is None :
498+ unload_parameters = {}
499+ return _resolve_query_without_cache_unload (
500+ sql = sql ,
501+ file_format = unload_parameters .get ("file_format" ) or "PARQUET" ,
502+ compression = unload_parameters .get ("compression" ),
503+ field_delimiter = unload_parameters .get ("field_delimiter" ),
504+ partitioned_by = unload_parameters .get ("partitioned_by" ),
505+ database = database ,
506+ data_source = data_source ,
507+ s3_output = _s3_output ,
508+ keep_files = keep_files ,
509+ chunksize = chunksize ,
510+ categories = categories ,
511+ encryption = encryption ,
512+ kms_key = kms_key ,
513+ wg_config = wg_config ,
514+ use_threads = use_threads ,
515+ s3_additional_kwargs = s3_additional_kwargs ,
516+ boto3_session = boto3_session ,
517+ pyarrow_additional_kwargs = pyarrow_additional_kwargs ,
518+ )
446519 return _resolve_query_without_cache_regular (
447520 sql = sql ,
448521 database = database ,
@@ -461,11 +534,81 @@ def _resolve_query_without_cache(
461534 )
462535
463536
537+ def _unload (
538+ sql : str ,
539+ path : str ,
540+ file_format : str ,
541+ compression : Optional [str ],
542+ field_delimiter : Optional [str ],
543+ partitioned_by : Optional [List [str ]],
544+ wg_config : _WorkGroupConfig ,
545+ database : Optional [str ],
546+ encryption : Optional [str ],
547+ kms_key : Optional [str ],
548+ boto3_session : boto3 .Session ,
549+ data_source : Optional [str ],
550+ ) -> _QueryMetadata :
551+ # Set UNLOAD parameters
552+ unload_parameters = f" format='{ file_format } '"
553+ if compression :
554+ unload_parameters += f" , compression='{ compression } '"
555+ if field_delimiter :
556+ unload_parameters += f" , field_delimiter='{ field_delimiter } '"
557+ if partitioned_by :
558+ unload_parameters += f" , partitioned_by=ARRAY{ partitioned_by } "
559+
560+ sql = f"UNLOAD ({ sql } ) " f"TO '{ path } ' " f"WITH ({ unload_parameters } )"
561+ _logger .debug ("sql: %s" , sql )
562+ try :
563+ query_id : str = _start_query_execution (
564+ sql = sql ,
565+ wg_config = wg_config ,
566+ database = database ,
567+ data_source = data_source ,
568+ s3_output = path ,
569+ encryption = encryption ,
570+ kms_key = kms_key ,
571+ boto3_session = boto3_session ,
572+ )
573+ except botocore .exceptions .ClientError as ex :
574+ msg : str = str (ex )
575+ error : Dict [str , Any ] = ex .response ["Error" ]
576+ if error ["Code" ] == "InvalidRequestException" :
577+ raise exceptions .InvalidArgumentValue (f"Exception parsing query. Root error message: { msg } " )
578+ raise ex
579+ _logger .debug ("query_id: %s" , query_id )
580+ try :
581+ query_metadata : _QueryMetadata = _get_query_metadata (
582+ query_execution_id = query_id ,
583+ boto3_session = boto3_session ,
584+ metadata_cache_manager = _cache_manager ,
585+ )
586+ except exceptions .QueryFailed as ex :
587+ msg = str (ex )
588+ if "Column name" in msg and "specified more than once" in msg :
589+ raise exceptions .InvalidArgumentValue (
590+ f"Please, define distinct names for your columns. Root error message: { msg } "
591+ )
592+ if "Column name not specified" in msg :
593+ raise exceptions .InvalidArgumentValue (
594+ "Please, define all columns names in your query. (E.g. 'SELECT MAX(col1) AS max_col1, ...')"
595+ )
596+ if "Column type is unknown" in msg :
597+ raise exceptions .InvalidArgumentValue (
598+ "Please, don't leave undefined columns types in your query. You can cast to ensure it. "
599+ "(E.g. 'SELECT CAST(NULL AS INTEGER) AS MY_COL, ...')"
600+ )
601+ raise ex
602+ return query_metadata
603+
604+
464605@apply_configs
465606def read_sql_query (
466607 sql : str ,
467608 database : str ,
468609 ctas_approach : bool = True ,
610+ unload_approach : bool = False ,
611+ unload_parameters : Optional [Dict [str , Any ]] = None ,
469612 categories : Optional [List [str ]] = None ,
470613 chunksize : Optional [Union [int , bool ]] = None ,
471614 s3_output : Optional [str ] = None ,
@@ -498,7 +641,7 @@ def read_sql_query(
498641 - `Global Configurations <https://aws-data-wrangler.readthedocs.io/en/2.13.0/
499642 tutorials/021%20-%20Global%20Configurations.html>`_
500643
501- **There are two approaches to be defined through ctas_approach parameter :**
644+ **There are three approaches available through ctas_approach and unload_approach parameters :**
502645
503646 **1** - ctas_approach=True (Default):
504647
@@ -518,7 +661,25 @@ def read_sql_query(
518661 - A temporary table will be created and then deleted immediately.
519662 - Does not support custom data_source/catalog_id.
520663
521- **2** - ctas_approach=False:
664+ **2** - unload_approach=True and ctas_approach=False:
665+
666+ Does an UNLOAD query on Athena and parse the Parquet result on s3.
667+
668+ PROS:
669+
670+ - Faster for mid and big result sizes.
671+ - Can handle some level of nested types.
672+ - Does not modify Glue Data Catalog
673+
674+ CONS:
675+
676+ - Output S3 path must be empty.
677+ - Does not support timestamp with time zone.
678+ - Does not support columns with repeated names.
679+ - Does not support columns with undefined data types.
680+ - Does not support custom data_source/catalog_id.
681+
682+ **3** - ctas_approach=False:
522683
523684 Does a regular query on Athena and parse the regular CSV result on s3.
524685
@@ -534,7 +695,6 @@ def read_sql_query(
534695 - Slower for big results (But stills faster than other libraries that uses the regular Athena's API)
535696 - Does not handle nested types at all.
536697
537-
538698 Note
539699 ----
540700 The resulting DataFrame (or every DataFrame in the returned Iterator for chunked queries) have a
@@ -574,7 +734,7 @@ def read_sql_query(
574734 `P.S.` `chunksize=True` is faster and uses less memory while `chunksize=INTEGER` is more precise
575735 in number of rows for each Dataframe.
576736
577- `P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an interador with a
737+ `P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an iterator with a
578738 single DataFrame because regular Athena queries only produces a single output file.
579739
580740 Note
@@ -593,6 +753,11 @@ def read_sql_query(
593753 ctas_approach: bool
594754 Wraps the query using a CTAS, and read the resulted parquet data on S3.
595755 If false, read the regular CSV on S3.
756+ unload_approach: bool
757+ Wraps the query using UNLOAD, and read the results from S3.
758+ Only PARQUET format is supported.
759+ unload_parameters : Optional[Dict[str, Any]]
760+ Params of the UNLOAD such as format, compression, field_delimiter, and partitioned_by.
596761 categories: List[str], optional
597762 List of columns names that should be returned as pandas.Categorical.
598763 Recommended for memory restricted environments.
@@ -691,6 +856,10 @@ def read_sql_query(
691856 "(https://github.com/awslabs/aws-data-wrangler/blob/main/"
692857 "tutorials/006%20-%20Amazon%20Athena.ipynb)"
693858 )
859+ if ctas_approach and unload_approach :
860+ raise exceptions .InvalidArgumentCombination ("Only one of ctas_approach=True or unload_approach=True is allowed" )
861+ if unload_parameters and unload_parameters .get ("file_format" ) not in (None , "PARQUET" ):
862+ raise exceptions .InvalidArgumentCombination ("Only PARQUET file format is supported if unload_approach=True" )
694863 chunksize = sys .maxsize if ctas_approach is False and chunksize is True else chunksize
695864 session : boto3 .Session = _utils .ensure_session (session = boto3_session )
696865 if params is None :
@@ -730,6 +899,8 @@ def read_sql_query(
730899 database = database ,
731900 data_source = data_source ,
732901 ctas_approach = ctas_approach ,
902+ unload_approach = unload_approach ,
903+ unload_parameters = unload_parameters ,
733904 categories = categories ,
734905 chunksize = chunksize ,
735906 s3_output = s3_output ,
@@ -979,3 +1150,95 @@ def read_sql_table(
9791150 s3_additional_kwargs = s3_additional_kwargs ,
9801151 pyarrow_additional_kwargs = pyarrow_additional_kwargs ,
9811152 )
1153+
1154+
1155+ @apply_configs
1156+ def unload (
1157+ sql : str ,
1158+ path : str ,
1159+ database : str ,
1160+ file_format : str = "PARQUET" ,
1161+ compression : Optional [str ] = None ,
1162+ field_delimiter : Optional [str ] = None ,
1163+ partitioned_by : Optional [List [str ]] = None ,
1164+ workgroup : Optional [str ] = None ,
1165+ encryption : Optional [str ] = None ,
1166+ kms_key : Optional [str ] = None ,
1167+ boto3_session : Optional [boto3 .Session ] = None ,
1168+ data_source : Optional [str ] = None ,
1169+ params : Optional [Dict [str , Any ]] = None ,
1170+ ) -> _QueryMetadata :
1171+ """Write query results from a SELECT statement to the specified data format using UNLOAD.
1172+
1173+ https://docs.aws.amazon.com/athena/latest/ug/unload.html
1174+
1175+ Parameters
1176+ ----------
1177+ sql : str
1178+ SQL query.
1179+ path : str, optional
1180+ Amazon S3 path.
1181+ database : str
1182+ AWS Glue/Athena database name - It is only the origin database from where the query will be launched.
1183+ You can still using and mixing several databases writing the full table name within the sql
1184+ (e.g. `database.table`).
1185+ file_format : str
1186+ File format of the output. Possible values are ORC, PARQUET, AVRO, JSON, or TEXTFILE
1187+ compression : Optional[str]
1188+ This option is specific to the ORC and Parquet formats. For ORC, possible values are lz4, snappy, zlib, or zstd.
1189+ For Parquet, possible values are gzip or snappy. For ORC, the default is zlib, and for Parquet,
1190+ the default is gzip.
1191+ field_delimiter : str
1192+ A single-character field delimiter for files in CSV, TSV, and other text formats.
1193+ partitioned_by : Optional[List[str]]
1194+ An array list of columns by which the output is partitioned.
1195+ workgroup : str, optional
1196+ Athena workgroup.
1197+ encryption : str, optional
1198+ Valid values: [None, 'SSE_S3', 'SSE_KMS']. Notice: 'CSE_KMS' is not supported.
1199+ kms_key : str, optional
1200+ For SSE-KMS, this is the KMS key ARN or ID.
1201+ boto3_session : boto3.Session(), optional
1202+ Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1203+ data_source : str, optional
1204+ Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
1205+ params: Dict[str, any], optional
1206+ Dict of parameters that will be used for constructing the SQL query. Only named parameters are supported.
1207+ The dict needs to contain the information in the form {'name': 'value'} and the SQL query needs to contain
1208+ `:name;`. Note that for varchar columns and similar, you must surround the value in single quotes.
1209+
1210+ Returns
1211+ -------
1212+ _QueryMetadata
1213+ Query metadata including query execution id, dtypes, manifest & output location.
1214+
1215+ Examples
1216+ --------
1217+ >>> import awswrangler as wr
1218+ >>> res = wr.athena.unload(
1219+ ... sql="SELECT * FROM my_table WHERE name=:name; AND city=:city;",
1220+ ... params={"name": "'filtered_name'", "city": "'filtered_city'"}
1221+ ... )
1222+
1223+ """
1224+ session : boto3 .Session = _utils .ensure_session (session = boto3_session )
1225+ wg_config : _WorkGroupConfig = _get_workgroup_config (session = session , workgroup = workgroup )
1226+ # Substitute query parameters
1227+ if params is None :
1228+ params = {}
1229+ for key , value in params .items ():
1230+ sql = sql .replace (f":{ key } ;" , str (value ))
1231+ return _unload (
1232+ sql ,
1233+ path ,
1234+ file_format ,
1235+ compression ,
1236+ field_delimiter ,
1237+ partitioned_by ,
1238+ wg_config ,
1239+ database ,
1240+ encryption ,
1241+ kms_key ,
1242+ session ,
1243+ data_source ,
1244+ )
0 commit comments