Skip to content

Commit f0f154b

Browse files
committed
Add wr.emr.submit_spark_step
1 parent 2eefb3a commit f0f154b

File tree

4 files changed

+84
-26
lines changed

4 files changed

+84
-26
lines changed

awswrangler/emr.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,3 +1032,61 @@ def build_spark_step(
10321032
region=region,
10331033
boto3_session=boto3_session,
10341034
)
1035+
1036+
1037+
def submit_spark_step(
1038+
cluster_id: str,
1039+
path: str,
1040+
deploy_mode: str = "cluster",
1041+
docker_image: Optional[str] = None,
1042+
name: str = "my-step",
1043+
action_on_failure: str = "CONTINUE",
1044+
region: Optional[str] = None,
1045+
boto3_session: Optional[boto3.Session] = None,
1046+
) -> str:
1047+
"""Submit Spark Step.
1048+
1049+
Parameters
1050+
----------
1051+
cluster_id : str
1052+
Cluster ID.
1053+
path : str
1054+
Script path. (e.g. s3://bucket/app.py)
1055+
deploy_mode : str
1056+
"cluster" | "client"
1057+
docker_image : str, optional
1058+
e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}"
1059+
name : str, optional
1060+
Step name.
1061+
action_on_failure : str
1062+
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
1063+
region: str, optional
1064+
Region name to not get it from boto3.Session. (e.g. `us-east-1`)
1065+
boto3_session : boto3.Session(), optional
1066+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
1067+
1068+
Returns
1069+
-------
1070+
str
1071+
Step ID.
1072+
1073+
Examples
1074+
--------
1075+
>>> import awswrangler as wr
1076+
>>> step_id = wr.emr.submit_spark_step(
1077+
>>> cluster_id="cluster-id",
1078+
>>> path="s3://bucket/emr/app.py"
1079+
>>> )
1080+
1081+
"""
1082+
session: boto3.Session = _utils.ensure_session(session=boto3_session)
1083+
step = build_spark_step(
1084+
path=path,
1085+
deploy_mode=deploy_mode,
1086+
docker_image=docker_image,
1087+
name=name,
1088+
action_on_failure=action_on_failure,
1089+
region=region,
1090+
boto3_session=session,
1091+
)
1092+
return submit_steps(cluster_id=cluster_id, steps=[step], boto3_session=session)[0]

docs/source/api.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,12 @@ EMR
113113
get_cluster_state
114114
terminate_cluster
115115
submit_step
116+
submit_spark_step
117+
submit_ecr_credentials_refresh
116118
submit_steps
117119
build_step
120+
build_spark_step
118121
get_step_state
119-
update_ecr_credentials
120122

121123
CloudWatch Logs
122124
---------------

testing/test_awswrangler/test_emr.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,5 @@ def test_docker(bucket, cloudformation_outputs):
182182
)
183183
],
184184
)
185+
wr.emr.submit_spark_step(cluster_id=cluster_id, path=f"s3://{bucket}/emr/test_docker.py")
185186
wr.emr.terminate_cluster(cluster_id=cluster_id)

tutorials/16 - EMR & Docker.ipynb

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
},
1212
{
1313
"cell_type": "code",
14-
"execution_count": 1,
14+
"execution_count": null,
1515
"metadata": {},
1616
"outputs": [],
1717
"source": [
1818
"import awswrangler as wr\n",
19-
"import boto3"
19+
"import boto3\n",
20+
"import getpass"
2021
]
2122
},
2223
{
@@ -40,7 +41,6 @@
4041
}
4142
],
4243
"source": [
43-
"import getpass\n",
4444
"bucket = getpass.getpass()"
4545
]
4646
},
@@ -164,7 +164,7 @@
164164
{
165165
"data": {
166166
"text/plain": [
167-
"'s-3OPMPDCYGEGOT'"
167+
"'s-1B0O45RWJL8CL'"
168168
]
169169
},
170170
"execution_count": 5,
@@ -173,7 +173,7 @@
173173
}
174174
],
175175
"source": [
176-
"wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")"
176+
"wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/\")"
177177
]
178178
},
179179
{
@@ -185,7 +185,7 @@
185185
},
186186
{
187187
"cell_type": "code",
188-
"execution_count": 6,
188+
"execution_count": 7,
189189
"metadata": {},
190190
"outputs": [],
191191
"source": [
@@ -201,11 +201,7 @@
201201
"print(f\"Wrangler version: {wr.__version__}\")\n",
202202
"\"\"\"\n",
203203
"\n",
204-
"_ = boto3.client(\"s3\").put_object(\n",
205-
" Body=script,\n",
206-
" Bucket=bucket,\n",
207-
" Key=\"emr/test_docker.py\"\n",
208-
")"
204+
"boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\");"
209205
]
210206
},
211207
{
@@ -217,15 +213,17 @@
217213
},
218214
{
219215
"cell_type": "code",
220-
"execution_count": 7,
216+
"execution_count": 8,
221217
"metadata": {},
222218
"outputs": [],
223219
"source": [
224220
"DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n",
225221
"\n",
226-
"step = wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n",
227-
"\n",
228-
"steps_ids = wr.emr.submit_steps(cluster_id, steps=[step])"
222+
"step_id = wr.emr.submit_spark_step(\n",
223+
" cluster_id,\n",
224+
" f\"s3://{bucket}/test_docker.py\",\n",
225+
" docker_image=DOCKER_IMAGE\n",
226+
")"
229227
]
230228
},
231229
{
@@ -237,11 +235,11 @@
237235
},
238236
{
239237
"cell_type": "code",
240-
"execution_count": 8,
238+
"execution_count": null,
241239
"metadata": {},
242240
"outputs": [],
243241
"source": [
244-
"while wr.emr.get_step_state(cluster_id, steps_ids[0]) != \"COMPLETED\":\n",
242+
"while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
245243
" pass"
246244
]
247245
},
@@ -254,7 +252,7 @@
254252
},
255253
{
256254
"cell_type": "code",
257-
"execution_count": 9,
255+
"execution_count": null,
258256
"metadata": {},
259257
"outputs": [],
260258
"source": [
@@ -270,7 +268,7 @@
270268
},
271269
{
272270
"cell_type": "code",
273-
"execution_count": 12,
271+
"execution_count": 9,
274272
"metadata": {},
275273
"outputs": [],
276274
"source": [
@@ -313,11 +311,10 @@
313311
"\n",
314312
"DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n",
315313
"\n",
316-
"steps_ids = wr.emr.submit_steps(\n",
317-
" cluster_id=cluster_id,\n",
318-
" steps=[\n",
319-
" wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n",
320-
" ]\n",
314+
"step_id = wr.emr.submit_spark_step(\n",
315+
" cluster_id,\n",
316+
" f\"s3://{bucket}/test_docker.py\",\n",
317+
" docker_image=DOCKER_IMAGE\n",
321318
")"
322319
]
323320
},
@@ -327,7 +324,7 @@
327324
"metadata": {},
328325
"outputs": [],
329326
"source": [
330-
"while wr.emr.get_step_state(cluster_id, steps_ids[0]) != \"COMPLETED\":\n",
327+
"while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
331328
" pass\n",
332329
"\n",
333330
"wr.emr.terminate_cluster(cluster_id)"

0 commit comments

Comments
 (0)