Skip to content

Commit 3c3ca64

Browse files
committed
Splitting up the ecr_credentials to a individual function #193
1 parent 9611a0a commit 3c3ca64

File tree

3 files changed

+230
-104
lines changed

3 files changed

+230
-104
lines changed

awswrangler/emr.py

Lines changed: 105 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,6 @@ def _get_default_logging_path(
6161
return f"s3://aws-logs-{_account_id}-{_region}/elasticmapreduce/"
6262

6363

64-
def _get_ecr_credentials_command() -> str:
65-
return (
66-
"sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email) && "
67-
"sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/"
68-
)
69-
70-
7164
def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-statements
7265
account_id: str = _utils.get_account_id(boto3_session=pars["boto3_session"])
7366
region: str = _utils.get_region_from_subnet(subnet_id=pars["subnet_id"], boto3_session=pars["boto3_session"])
@@ -139,7 +132,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s
139132
args["Configurations"] = [
140133
{"Classification": "spark-log4j", "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"}}
141134
]
142-
if (pars["docker"] is True) or (pars["spark_docker"] is True) or (pars["hive_docker"] is True):
135+
if pars["docker"] is True:
143136
if pars.get("extra_registries") is None:
144137
extra_registries: List[str] = []
145138
else: # pragma: no cover
@@ -162,26 +155,6 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s
162155
],
163156
}
164157
)
165-
if pars["spark_docker"] is True:
166-
if pars.get("spark_docker_image") is None: # pragma: no cover
167-
raise exceptions.InvalidArgumentCombination("You must pass a spark_docker_image if spark_docker is True.")
168-
pars["spark_defaults"] = {} if pars["spark_defaults"] is None else pars["spark_defaults"]
169-
pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker"
170-
pars["spark_defaults"][
171-
"spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG"
172-
] = "hdfs:///user/hadoop/config.json"
173-
pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars["spark_docker_image"]
174-
pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"] = "/etc/passwd:/etc/passwd:ro"
175-
pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker"
176-
pars["spark_defaults"][
177-
"spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG"
178-
] = "hdfs:///user/hadoop/config.json"
179-
pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars[
180-
"spark_docker_image"
181-
]
182-
pars["spark_defaults"][
183-
"spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"
184-
] = "/etc/passwd:/etc/passwd:ro"
185158
if spark_env is not None:
186159
args["Configurations"].append(
187160
{
@@ -216,21 +189,12 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s
216189
"Configurations": [],
217190
}
218191
)
219-
220-
hive_conf: Optional[Dict[str, Any]] = None
221-
if (pars["hive_glue_catalog"] is True) or (pars["hive_docker"] is True):
222-
hive_conf: Optional[Dict[str, Any]] = {"Classification": "hive-site", "Properties": {}, "Configurations": []}
223-
224192
if pars["hive_glue_catalog"] is True:
193+
hive_conf: Optional[Dict[str, Any]] = {"Classification": "hive-site", "Properties": {}, "Configurations": []}
225194
hive_conf["Properties"][
226195
"hive.metastore.client.factory.class"
227196
] = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
228-
if pars["hive_docker"] is True:
229-
hive_conf["Properties"]["hive.execution.mode"] = "container"
230-
231-
if hive_conf is not None:
232197
args["Configurations"].append(hive_conf)
233-
234198
if pars["presto_glue_catalog"] is True:
235199
args["Configurations"].append(
236200
{
@@ -282,17 +246,6 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s
282246
"HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]},
283247
}
284248
)
285-
if pars["ecr_credentials_step"] is True:
286-
args["Steps"].append(
287-
build_step(
288-
name="ECR Credentials Setup",
289-
command=_get_ecr_credentials_command(),
290-
action_on_failure="TERMINATE_CLUSTER",
291-
script=False,
292-
region=region,
293-
boto3_session=pars["boto3_session"],
294-
)
295-
)
296249
if pars["steps"] is not None:
297250
args["Steps"] += pars["steps"]
298251

@@ -462,15 +415,11 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused
462415
security_groups_slave_additional: Optional[List[str]] = None,
463416
security_group_service_access: Optional[str] = None,
464417
docker: bool = False,
418+
extra_public_registries: Optional[List[str]] = None,
465419
spark_log_level: str = "WARN",
466420
spark_jars_path: Optional[List[str]] = None,
467421
spark_defaults: Optional[Dict[str, str]] = None,
468422
spark_pyarrow: bool = False,
469-
spark_docker: bool = False,
470-
spark_docker_image: str = None,
471-
hive_docker: bool = False,
472-
ecr_credentials_step: bool = False,
473-
extra_public_registries: Optional[List[str]] = None,
474423
custom_classifications: Optional[List[Dict[str, Any]]] = None,
475424
maximize_resource_allocation: bool = False,
476425
steps: Optional[List[Dict[str, Any]]] = None,
@@ -600,6 +549,8 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused
600549
service to access clusters in VPC private subnets.
601550
docker : bool
602551
Enable Docker Hub and ECR registries access.
552+
extra_public_registries: List[str], optional
553+
Additional docker registries.
603554
spark_log_level : str
604555
log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE).
605556
spark_jars_path : List[str], optional
@@ -610,16 +561,6 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused
610561
spark_pyarrow : bool
611562
Enable PySpark to use PyArrow behind the scenes.
612563
P.S. You must install pyarrow by your self via bootstrap
613-
spark_docker : bool = False
614-
Add necessary Spark Defaults to run on Docker
615-
spark_docker_image : str, optional
616-
E.g. {ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}
617-
hive_docker : bool
618-
Add necessary configurations to run on Docker
619-
ecr_credentials_step : bool
620-
Add a extra step during the Cluster launch to retrieve ECR auth files.
621-
extra_public_registries: List[str], optional
622-
Additional registries.
623564
custom_classifications: List[Dict[str, Any]], optional
624565
Extra classifications.
625566
maximize_resource_allocation : bool
@@ -669,16 +610,6 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused
669610
>>> ],
670611
>>> )
671612
672-
Minimal Example on Docker
673-
674-
>>> import awswrangler as wr
675-
>>> cluster_id = wr.emr.create_cluster(
676-
>>> subnet_id="SUBNET_ID",
677-
>>> spark_docker=True,
678-
>>> spark_docker_image="{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}",
679-
>>> ecr_credentials_step=True
680-
>>> )
681-
682613
Full Example
683614
684615
>>> import awswrangler as wr
@@ -971,15 +902,17 @@ def get_step_state(cluster_id: str, step_id: str, boto3_session: Optional[boto3.
971902
return response["Step"]["Status"]["State"]
972903

973904

974-
def update_ecr_credentials(
975-
cluster_id: str, action_on_failure: str = "CONTINUE", boto3_session: Optional[boto3.Session] = None
905+
def submit_ecr_credentials_refresh(
906+
cluster_id: str, path: str, action_on_failure: str = "CONTINUE", boto3_session: Optional[boto3.Session] = None
976907
) -> str:
977908
"""Update internal ECR credentials.
978909
979910
Parameters
980911
----------
981912
cluster_id : str
982913
Cluster ID.
914+
path : str
915+
Amazon S3 path where Wrangler will stage the script ecr_credentials_refresh.py (e.g. s3://bucket/emr/)
983916
action_on_failure : str
984917
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
985918
boto3_session : boto3.Session(), optional
@@ -993,16 +926,109 @@ def update_ecr_credentials(
993926
Examples
994927
--------
995928
>>> import awswrangler as wr
996-
>>> step_id = wr.emr.update_ecr_credentials("cluster_id")
929+
>>> step_id = wr.emr.submit_ecr_credentials_refresh("cluster_id", "s3://bucket/emr/")
997930
998931
"""
999-
name: str = "Update ECR Credentials"
1000-
command: str = _get_ecr_credentials_command()
932+
path = path[:-1] if path.endswith("/") else path
933+
path_script: str = f"{path}/ecr_credentials_refresh.py"
1001934
session: boto3.Session = _utils.ensure_session(session=boto3_session)
935+
client_s3: boto3.client = _utils.client(service_name="s3", session=session)
936+
bucket, key = _utils.parse_path(path=path_script)
937+
client_s3.put_object(Body=_get_ecr_credentials_refresh_content().encode(encoding="utf-8"), Bucket=bucket, Key=key)
938+
command: str = f"spark-submit --deploy-mode cluster {path_script}"
939+
name: str = "ECR Credentials Refresh"
1002940
step: Dict[str, Any] = build_step(
1003941
name=name, command=command, action_on_failure=action_on_failure, script=False, boto3_session=session
1004942
)
1005943
client_emr: boto3.client = _utils.client(service_name="emr", session=session)
1006944
response: Dict[str, Any] = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
1007945
_logger.debug(f"response: \n{json.dumps(response, default=str, indent=4)}")
1008946
return response["StepIds"][0]
947+
948+
949+
def _get_ecr_credentials_refresh_content() -> str:
950+
return """
951+
import subprocess
952+
from pyspark.sql import SparkSession
953+
spark = SparkSession.builder.appName("ECR Setup Job").getOrCreate()
954+
955+
COMMANDS = [
956+
"sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email)",
957+
"sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/"
958+
]
959+
960+
for command in COMMANDS:
961+
subprocess.run(command.split(" "), timeout=6.0, check=True)
962+
963+
print("done!")
964+
"""
965+
966+
967+
def build_spark_step(
968+
path: str,
969+
deploy_mode: str = "cluster",
970+
docker_image: Optional[str] = None,
971+
name: str = "my-step",
972+
action_on_failure: str = "CONTINUE",
973+
region: Optional[str] = None,
974+
boto3_session: Optional[boto3.Session] = None,
975+
) -> Dict[str, Any]:
976+
"""Build the Step structure (dictionary).
977+
978+
Parameters
979+
----------
980+
path : str
981+
Script path. (e.g. s3://bucket/app.py)
982+
deploy_mode : str
983+
"cluster" | "client"
984+
docker_image : str, optional
985+
e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}"
986+
name : str, optional
987+
Step name.
988+
action_on_failure : str
989+
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
990+
region: str, optional
991+
Region name to not get it from boto3.Session. (e.g. `us-east-1`)
992+
boto3_session : boto3.Session(), optional
993+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
994+
995+
Returns
996+
-------
997+
Dict[str, Any]
998+
Step structure.
999+
1000+
Examples
1001+
--------
1002+
>>> import awswrangler as wr
1003+
>>> step_id = wr.emr.submit_steps(
1004+
>>> cluster_id="cluster-id",
1005+
>>> steps=[
1006+
>>> wr.emr.build_spark_step(path="s3://bucket/app.py")
1007+
>>> ]
1008+
>>> )
1009+
1010+
"""
1011+
if docker_image is None: # pragma: no cover
1012+
cmd: str = f"spark-submit --deploy-mode {deploy_mode} {path}"
1013+
else:
1014+
config: str = "hdfs:///user/hadoop/config.json"
1015+
cmd = (
1016+
f"spark-submit --deploy-mode cluster "
1017+
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker "
1018+
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} "
1019+
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} "
1020+
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro "
1021+
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker "
1022+
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} "
1023+
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} "
1024+
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro "
1025+
f"{path}"
1026+
)
1027+
return build_step(
1028+
command=cmd,
1029+
name=name,
1030+
action_on_failure=action_on_failure,
1031+
script=False,
1032+
region=region,
1033+
boto3_session=boto3_session,
1034+
)

testing/test_awswrangler/test_emr.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,10 @@ def test_default_logging_path(cloudformation_outputs):
156156
wr.emr._get_default_logging_path()
157157

158158

159-
def test_docker(cloudformation_outputs):
159+
def test_docker(bucket, cloudformation_outputs):
160160
cluster_id = wr.emr.create_cluster(
161161
subnet_id=cloudformation_outputs["SubnetId"],
162162
docker=True,
163-
spark_docker=True,
164-
spark_docker_image="123456789123.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr",
165-
hive_docker=True,
166-
ecr_credentials_step=True,
167163
custom_classifications=[
168164
{
169165
"Classification": "livy-conf",
@@ -176,6 +172,14 @@ def test_docker(cloudformation_outputs):
176172
],
177173
steps=[wr.emr.build_step("spark-submit --deploy-mode cluster s3://bucket/emr.py")],
178174
)
179-
wr.emr.submit_step(cluster_id=cluster_id, command="spark-submit --deploy-mode cluster s3://bucket/emr.py")
180-
wr.emr.update_ecr_credentials(cluster_id=cluster_id)
175+
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/emr/")
176+
wr.emr.submit_steps(
177+
cluster_id=cluster_id,
178+
steps=[
179+
wr.emr.build_spark_step(
180+
path=f"s3://{bucket}/emr/test_docker.py",
181+
docker_image="123456789123.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr",
182+
)
183+
],
184+
)
181185
wr.emr.terminate_cluster(cluster_id=cluster_id)

0 commit comments

Comments
 (0)