22
33> Utility belt to handle data on AWS.
44
5- [ ![ Release] ( https://img.shields.io/badge/release-0.0.25 -brightgreen.svg )] ( https://pypi.org/project/awswrangler/ )
5+ [ ![ Release] ( https://img.shields.io/badge/release-0.1.0 -brightgreen.svg )] ( https://pypi.org/project/awswrangler/ )
66[ ![ Downloads] ( https://img.shields.io/pypi/dm/awswrangler.svg )] ( https://pypi.org/project/awswrangler/ )
77[ ![ Python Version] ( https://img.shields.io/badge/python-3.6%20%7C%203.7-brightgreen.svg )] ( https://pypi.org/project/awswrangler/ )
88[ ![ Documentation Status] ( https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest )] ( https://aws-data-wrangler.readthedocs.io/en/latest/?badge=latest )
2525### Pandas
2626* Pandas -> Parquet (S3) (Parallel)
2727* Pandas -> CSV (S3) (Parallel)
28- * Pandas -> Glue Catalog
28+ * Pandas -> Glue Catalog Table
2929* Pandas -> Athena (Parallel)
3030* Pandas -> Redshift (Parallel)
31+ * Parquet (S3) -> Pandas (Parallel) (NEW :star : )
3132* CSV (S3) -> Pandas (One shot or Batching)
32- * Athena -> Pandas (One shot or Batching)
33+ * Glue Catalog Table -> Pandas (Parallel)
34+ * Athena -> Pandas (One shot, Batching or Parallel (NEW :star : ))
35+ * Redshift -> Pandas (Parallel) (NEW :star : )
36+ * Redshift -> Parquet (S3) (NEW :star : )
3337* CloudWatch Logs Insights -> Pandas
3438* Encrypt Pandas Dataframes on S3 with KMS keys
3539
3640### PySpark
3741* PySpark -> Redshift (Parallel)
3842* Register Glue table from Dataframe stored on S3
39- * Flatten nested DataFrames (NEW : star : )
43+ * Flatten nested DataFrames
4044
4145### General
4246* List S3 objects (Parallel)
4751* Get the size of S3 objects (Parallel)
4852* Get CloudWatch Logs Insights query results
4953* Load partitions on Athena/Glue table (repair table)
50- * Create EMR cluster (For humans) (NEW : star : )
51- * Terminate EMR cluster (NEW : star : )
52- * Get EMR cluster state (NEW : star : )
53- * Submit EMR step(s) (For humans) (NEW : star : )
54- * Get EMR step state (NEW : star : )
55- * Get EMR step state (NEW : star : )
56- * Athena query to receive the result as python primitives (* Iterable[ Dict[ str, Any] * ) (NEW : star : )
54+ * Create EMR cluster (For humans)
55+ * Terminate EMR cluster
56+ * Get EMR cluster state
57+ * Submit EMR step(s) (For humans)
58+ * Get EMR step state
59+ * Get EMR step state
60+ * Athena query to receive the result as python primitives (* Iterable[ Dict[ str, Any] * )
5761
5862## Installation
5963
@@ -74,8 +78,8 @@ Runs anywhere (AWS Lambda, AWS Glue Python Shell, EMR, EC2, on-premises, local,
7478#### Writing Pandas Dataframe to S3 + Glue Catalog
7579
7680``` py3
77- session = awswrangler.Session()
78- session .pandas.to_parquet(
81+ wrangler = awswrangler.Session()
82+ wrangler .pandas.to_parquet(
7983 dataframe = dataframe,
8084 database = " database" ,
8185 path = " s3://..." ,
@@ -92,17 +96,17 @@ extra_args = {
9296 " ServerSideEncryption" : " aws:kms" ,
9397 " SSEKMSKeyId" : " YOUR_KMY_KEY_ARN"
9498}
95- session = awswrangler.Session(s3_additional_kwargs = extra_args)
96- session .pandas.to_parquet(
99+ wrangler = awswrangler.Session(s3_additional_kwargs = extra_args)
100+ wrangler .pandas.to_parquet(
97101 path = " s3://..."
98102)
99103```
100104
101105#### Reading from AWS Athena to Pandas
102106
103107``` py3
104- session = awswrangler.Session()
105- dataframe = session .pandas.read_sql_athena(
108+ wrangler = awswrangler.Session()
109+ dataframe = wrangler .pandas.read_sql_athena(
106110 sql = " select * from table" ,
107111 database = " database"
108112)
@@ -111,8 +115,8 @@ dataframe = session.pandas.read_sql_athena(
111115#### Reading from AWS Athena to Pandas in chunks (For memory restrictions)
112116
113117``` py3
114- session = awswrangler.Session()
115- dataframe_iter = session .pandas.read_sql_athena(
118+ wrangler = awswrangler.Session()
119+ dataframe_iter = wrangler .pandas.read_sql_athena(
116120 sql = " select * from table" ,
117121 database = " database" ,
118122 max_result_size = 512_000_000 # 512 MB
@@ -121,18 +125,28 @@ for dataframe in dataframe_iter:
121125 print (dataframe) # Do whatever you want
122126```
123127
128+ #### Reading from AWS Athena to Pandas with the blazing fast CTAS approach
129+
130+ ``` py3
131+ wrangler = awswrangler.Session(athena_ctas_approach = True )
132+ dataframe = wrangler.pandas.read_sql_athena(
133+ sql = " select * from table" ,
134+ database = " database"
135+ )
136+ ```
137+
124138#### Reading from S3 (CSV) to Pandas
125139
126140``` py3
127- session = awswrangler.Session()
128- dataframe = session .pandas.read_csv(path = " s3://..." )
141+ wrangler = awswrangler.Session()
142+ dataframe = wrangler .pandas.read_csv(path = " s3://..." )
129143```
130144
131145#### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)
132146
133147``` py3
134- session = awswrangler.Session()
135- dataframe_iter = session .pandas.read_csv(
148+ wrangler = awswrangler.Session()
149+ dataframe_iter = wrangler .pandas.read_csv(
136150 path = " s3://..." ,
137151 max_result_size = 512_000_000 # 512 MB
138152)
@@ -143,8 +157,8 @@ for dataframe in dataframe_iter:
143157#### Reading from CloudWatch Logs Insights to Pandas
144158
145159``` py3
146- session = awswrangler.Session()
147- dataframe = session .pandas.read_log_query(
160+ wrangler = awswrangler.Session()
161+ dataframe = wrangler .pandas.read_log_query(
148162 log_group_names = [LOG_GROUP_NAME ],
149163 query = " fields @timestamp, @message | sort @timestamp desc | limit 5" ,
150164)
@@ -160,22 +174,49 @@ df = pandas.read_... # Read from anywhere
160174
161175# Typical Pandas, Numpy or Pyarrow transformation HERE!
162176
163- session = awswrangler.Session()
164- session .pandas.to_parquet( # Storing the data and metadata to Data Lake
177+ wrangler = awswrangler.Session()
178+ wrangler .pandas.to_parquet( # Storing the data and metadata to Data Lake
165179 dataframe = dataframe,
166180 database = " database" ,
167181 path = " s3://..." ,
168182 partition_cols = [" col_name" ],
169183)
170184```
171185
186+ #### Loading Pandas Dataframe to Redshift
187+
188+ ``` py3
189+ wrangler = awswrangler.Session()
190+ wrangler.pandas.to_redshift(
191+ dataframe = dataframe,
192+ path = " s3://temp_path" ,
193+ schema = " ..." ,
194+ table = " ..." ,
195+ connection = con,
196+ iam_role = " YOUR_ROLE_ARN" ,
197+ mode = " overwrite" ,
198+ preserve_index = False ,
199+ )
200+ ```
201+
202+ #### Extract Redshift query to Pandas DataFrame
203+
204+ ``` py3
205+ wrangler = awswrangler.Session()
206+ dataframe = session.pandas.read_sql_redshift(
207+ sql = " SELECT ..." ,
208+ iam_role = " YOUR_ROLE_ARN" ,
209+ connection = con,
210+ temp_s3_path = " s3://temp_path" )
211+ ```
212+
172213### PySpark
173214
174215#### Loading PySpark Dataframe to Redshift
175216
176217``` py3
177- session = awswrangler.Session(spark_session = spark)
178- session .spark.to_redshift(
218+ wrangler = awswrangler.Session(spark_session = spark)
219+ wrangler .spark.to_redshift(
179220 dataframe = df,
180221 path = " s3://..." ,
181222 connection = conn,
@@ -194,20 +235,21 @@ dataframe.write \
194235 .format(" parquet" ) \
195236 .partitionBy([" year" , " month" ]) \
196237 .save(compression = " gzip" , path = " s3://..." )
197- session = awswrangler.Session(spark_session = spark)
198- session.spark.create_glue_table(dataframe = dataframe,
199- file_format = " parquet" ,
200- partition_by = [" year" , " month" ],
201- path = " s3://..." ,
202- compression = " gzip" ,
203- database = " my_database" )
238+ wrangler = awswrangler.Session(spark_session = spark)
239+ wrangler.spark.create_glue_table(
240+ dataframe = dataframe,
241+ file_format = " parquet" ,
242+ partition_by = [" year" , " month" ],
243+ path = " s3://..." ,
244+ compression = " gzip" ,
245+ database = " my_database" )
204246```
205247
206248#### Flatten nested PySpark DataFrame
207249
208250``` py3
209- session = awswrangler.Session(spark_session = spark)
210- dfs = session .spark.flatten(dataframe = df_nested)
251+ wrangler = awswrangler.Session(spark_session = spark)
252+ dfs = wrangler .spark.flatten(dataframe = df_nested)
211253for name, df_flat in dfs.items():
212254 print (name)
213255 df_flat.show()
@@ -218,15 +260,15 @@ for name, df_flat in dfs.items():
218260#### Deleting a bunch of S3 objects (parallel)
219261
220262``` py3
221- session = awswrangler.Session()
222- session .s3.delete_objects(path = " s3://..." )
263+ wrangler = awswrangler.Session()
264+ wrangler .s3.delete_objects(path = " s3://..." )
223265```
224266
225267#### Get CloudWatch Logs Insights query results
226268
227269``` py3
228- session = awswrangler.Session()
229- results = session .cloudwatchlogs.query(
270+ wrangler = awswrangler.Session()
271+ results = wrangler .cloudwatchlogs.query(
230272 log_group_names = [LOG_GROUP_NAME ],
231273 query = " fields @timestamp, @message | sort @timestamp desc | limit 5" ,
232274)
@@ -235,15 +277,15 @@ results = session.cloudwatchlogs.query(
235277#### Load partitions on Athena/Glue table (repair table)
236278
237279``` py3
238- session = awswrangler.Session()
239- session .athena.repair_table(database = " db_name" , table = " tbl_name" )
280+ wrangler = awswrangler.Session()
281+ wrangler .athena.repair_table(database = " db_name" , table = " tbl_name" )
240282```
241283
242284#### Create EMR cluster
243285
244286``` py3
245- session = awswrangler.Session()
246- cluster_id = session .emr.create_cluster(
287+ wrangler = awswrangler.Session()
288+ cluster_id = wrangler .emr.create_cluster(
247289 cluster_name = " wrangler_cluster" ,
248290 logging_s3_path = f " s3://BUCKET_NAME/emr-logs/ " ,
249291 emr_release = " emr-5.27.0" ,
@@ -284,6 +326,7 @@ cluster_id = session.emr.create_cluster(
284326 maximize_resource_allocation = True ,
285327 keep_cluster_alive_when_no_steps = True ,
286328 termination_protected = False ,
329+ spark_pyarrow = True ,
287330 tags = {
288331 " foo" : " boo"
289332 }
@@ -294,8 +337,8 @@ print(cluster_id)
294337#### Athena query to receive the result as python primitives (* Iterable[ Dict[ str, Any] * )
295338
296339``` py3
297- session = awswrangler.Session()
298- for row in session .athena.query(query = " ..." , database = " ..." ):
340+ wrangler = awswrangler.Session()
341+ for row in wrangler .athena.query(query = " ..." , database = " ..." ):
299342 print (row)
300343```
301344
0 commit comments