22Module to handle all utilities related to EMR (Elastic Map Reduce)
33https://aws.amazon.com/emr/
44"""
5- from typing import Optional , List , Dict
5+ from typing import Optional , List , Dict , Any , Union , Collection
66import logging
77import json
88
@@ -29,8 +29,8 @@ def _build_cluster_args(**pars):
2929 "JobFlowRole" : pars ["emr_ec2_role" ],
3030 "ServiceRole" : pars ["emr_role" ],
3131 "Instances" : {
32- "KeepJobFlowAliveWhenNoSteps" : True ,
33- "TerminationProtected" : False ,
32+ "KeepJobFlowAliveWhenNoSteps" : pars [ "keep_cluster_alive_when_no_steps" ] ,
33+ "TerminationProtected" : pars [ "termination_protected" ] ,
3434 "Ec2SubnetId" : pars ["subnet_id" ],
3535 "InstanceFleets" : []
3636 }
@@ -53,47 +53,68 @@ def _build_cluster_args(**pars):
5353 args ["Instances" ]["ServiceAccessSecurityGroup" ] = pars ["security_group_service_access" ]
5454
5555 # Configurations
56- if pars ["python3" ] or pars ["spark_glue_catalog" ] or pars ["hive_glue_catalog" ] or pars ["presto_glue_catalog" ]:
57- args ["Configurations" ]: List = []
58- if pars ["python3" ]:
59- args ["Configurations" ].append ({
60- "Classification" :
61- "spark-env" ,
62- "Properties" : {},
63- "Configurations" : [{
64- "Classification" : "export" ,
65- "Properties" : {
66- "PYSPARK_PYTHON" : "/usr/bin/python3"
67- },
68- "Configurations" : []
69- }]
70- })
71- if pars ["spark_glue_catalog" ]:
72- args ["Configurations" ].append ({
73- "Classification" : "spark-hive-site" ,
74- "Properties" : {
75- "hive.metastore.client.factory.class" :
76- "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" ,
77- },
78- "Configurations" : []
79- })
80- if pars ["hive_glue_catalog" ]:
81- args ["Configurations" ].append ({
82- "Classification" : "hive-site" ,
83- "Properties" : {
84- "hive.metastore.client.factory.class" :
85- "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
86- },
87- "Configurations" : []
88- })
89- if pars ["presto_glue_catalog" ]:
90- args ["Configurations" ].append ({
91- "Classification" : "presto-connector-hive" ,
56+ args ["Configurations" ]: List [Dict [str , Any ]] = [{
57+ "Classification" : "spark-log4j" ,
58+ "Properties" : {
59+ "log4j.rootCategory" : f"{ pars ['spark_log_level' ]} , console"
60+ }
61+ }]
62+ if pars ["python3" ]:
63+ args ["Configurations" ].append ({
64+ "Classification" :
65+ "spark-env" ,
66+ "Properties" : {},
67+ "Configurations" : [{
68+ "Classification" : "export" ,
9269 "Properties" : {
93- "hive.metastore.glue.datacatalog.enabled " : "true "
70+ "PYSPARK_PYTHON " : "/usr/bin/python3 "
9471 },
9572 "Configurations" : []
96- })
73+ }]
74+ })
75+ if pars ["spark_glue_catalog" ]:
76+ args ["Configurations" ].append ({
77+ "Classification" : "spark-hive-site" ,
78+ "Properties" : {
79+ "hive.metastore.client.factory.class" :
80+ "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" ,
81+ },
82+ "Configurations" : []
83+ })
84+ if pars ["hive_glue_catalog" ]:
85+ args ["Configurations" ].append ({
86+ "Classification" : "hive-site" ,
87+ "Properties" : {
88+ "hive.metastore.client.factory.class" :
89+ "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
90+ },
91+ "Configurations" : []
92+ })
93+ if pars ["presto_glue_catalog" ]:
94+ args ["Configurations" ].append ({
95+ "Classification" : "presto-connector-hive" ,
96+ "Properties" : {
97+ "hive.metastore.glue.datacatalog.enabled" : "true"
98+ },
99+ "Configurations" : []
100+ })
101+ if pars ["maximize_resource_allocation" ]:
102+ args ["Configurations" ].append ({
103+ "Classification" : "spark" ,
104+ "Properties" : {
105+ "maximizeResourceAllocation" : "true"
106+ }
107+ })
108+ if (pars ["spark_jars_path" ] is not None ) or (pars ["spark_defaults" ] is not None ):
109+ spark_defaults : Dict [str , Union [str , Dict [str , str ]]] = {
110+ "Classification" : "spark-defaults" ,
111+ "Properties" : {}
112+ }
113+ if pars ["spark_jars_path" ] is not None :
114+ spark_defaults ["Properties" ]["spark.jars" ] = pars ["spark_jars_path" ]
115+ for k , v in pars ["spark_defaults" ].items ():
116+ spark_defaults ["Properties" ][k ] = v
117+ args ["Configurations" ].append (spark_defaults )
97118
98119 # Applications
99120 if pars ["applications" ]:
@@ -108,16 +129,20 @@ def _build_cluster_args(**pars):
108129 }
109130 } for x in pars ["bootstraps_paths" ]]
110131
111- # Debugging
112- if pars ["debugging" ]:
113- args ["Steps" ]: List [Dict ] = [{
114- "Name" : "Setup Hadoop Debugging" ,
115- "ActionOnFailure" : "TERMINATE_CLUSTER" ,
116- "HadoopJarStep" : {
117- "Jar" : "command-runner.jar" ,
118- "Args" : ["state-pusher-script" ]
119- }
120- }]
132+ # Debugging and Steps
133+ if (pars ["debugging" ] is True ) or (pars ["steps" ] is not None ):
134+ args ["Steps" ]: List [Dict [str , Collection [str ]]] = []
135+ if pars ["debugging" ] is True :
136+ args ["Steps" ].append ({
137+ "Name" : "Setup Hadoop Debugging" ,
138+ "ActionOnFailure" : "TERMINATE_CLUSTER" ,
139+ "HadoopJarStep" : {
140+ "Jar" : "command-runner.jar" ,
141+ "Args" : ["state-pusher-script" ]
142+ }
143+ })
144+ if pars ["steps" ] is not None :
145+ args ["Steps" ] += pars ["steps" ]
121146
122147 # Master Instance Fleet
123148 timeout_action_master : str = "SWITCH_TO_ON_DEMAND" if pars [
@@ -161,7 +186,8 @@ def _build_cluster_args(**pars):
161186
162187 # Core Instance Fleet
163188 if (pars ["instance_num_spot_core" ] > 0 ) or pars ["instance_num_on_demand_core" ] > 0 :
164- timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars ["spot_timeout_to_on_demand_core" ] else "TERMINATE_CLUSTER"
189+ timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars [
190+ "spot_timeout_to_on_demand_core" ] else "TERMINATE_CLUSTER"
165191 fleet_core : Dict = {
166192 "Name" :
167193 "CORE" ,
@@ -284,7 +310,14 @@ def create_cluster(self,
284310 security_groups_master_additional : Optional [List [str ]] = None ,
285311 security_group_slave : Optional [str ] = None ,
286312 security_groups_slave_additional : Optional [List [str ]] = None ,
287- security_group_service_access : Optional [str ] = None ):
313+ security_group_service_access : Optional [str ] = None ,
314+ spark_log_level : str = "WARN" ,
315+ spark_jars_path : Optional [str ] = None ,
316+ spark_defaults : Dict [str , str ] = None ,
317+ maximize_resource_allocation : bool = False ,
318+ steps : Optional [List [Dict [str , Collection [str ]]]] = None ,
319+ keep_cluster_alive_when_no_steps : bool = True ,
320+ termination_protected : bool = False ):
288321 """
289322 Create a EMR cluster with instance fleets configuration
290323 https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html
@@ -329,6 +362,13 @@ def create_cluster(self,
329362 :param security_group_slave: The identifier of the Amazon EC2 security group for the core and task nodes.
330363 :param security_groups_slave_additional: A list of additional Amazon EC2 security group IDs for the core and task nodes.
331364 :param security_group_service_access: The identifier of the Amazon EC2 security group for the Amazon EMR service to access clusters in VPC private subnets.
365+ :param spark_log_level: log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE)
366+ :param spark_jars_path: spark.jars (https://spark.apache.org/docs/latest/configuration.html) (e.g. s3://...)
367+ :param spark_defaults: (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#spark-defaults)
368+ :param maximize_resource_allocation: Configure your executors to utilize the maximum resources possible (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation)
369+ :param steps: Steps definitions (Obs: Use EMR.build_step() to build that)
370+ :param keep_cluster_alive_when_no_steps: Specifies whether the cluster should remain available after completing all steps
371+ :param termination_protected: Specifies whether the Amazon EC2 instances in the cluster are protected from termination by API calls, user intervention, or in the event of a job-flow error.
332372 :return: Cluster ID (string)
333373 """
334374 args = EMR ._build_cluster_args (** locals ())
@@ -358,28 +398,60 @@ def terminate_cluster(self, cluster_id: str) -> None:
358398 ])
359399 logger .info (f"response: \n { json .dumps (response , default = str , indent = 4 )} " )
360400
361- def submit_step (self , cluster_id : str , name : str , cmd : str , action_on_failure : str = "CONTINUE" ) -> str :
401+ def submit_steps (self , cluster_id : str , steps : List [Dict [str , Collection [str ]]]) -> List [str ]:
402+ """
403+ Submit a list of steps
404+ :param cluster_id: EMR Cluster ID
405+ :param steps: Steps definitions (Obs: Use EMR.build_step() to build that)
406+ :return: List of step IDs
407+ """
408+ response : Dict = self ._client_emr .add_job_flow_steps (JobFlowId = cluster_id , Steps = steps )
409+ logger .info (f"response: \n { json .dumps (response , default = str , indent = 4 )} " )
410+ return response ["StepIds" ]
411+
412+ def submit_step (self ,
413+ cluster_id : str ,
414+ name : str ,
415+ command : str ,
416+ action_on_failure : str = "CONTINUE" ,
417+ script : bool = False ) -> str :
362418 """
363419 Submit new job in the EMR Cluster
364420 :param cluster_id: EMR Cluster ID
365421 :param name: Step name
366- :param cmd: Command to be executed
422+ :param command: e.g. 'echo "Hello!"' | e.g. for script 's3://.../script.sh arg1 arg2'
367423 :param action_on_failure: 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
424+ :param script: True for raw command or False for script runner (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
368425 :return: Step ID
369426 """
370- region : str = self ._session .region_name
371- logger .info (f"region: { region } " )
427+ step = EMR .build_step (self , name = name , command = command , action_on_failure = action_on_failure , script = script )
428+ response : Dict = self ._client_emr .add_job_flow_steps (JobFlowId = cluster_id , Steps = [step ])
429+ logger .info (f"response: \n { json .dumps (response , default = str , indent = 4 )} " )
430+ return response ["StepIds" ][0 ]
431+
432+ def build_step (self , name : str , command : str , action_on_failure : str = "CONTINUE" ,
433+ script : bool = False ) -> Dict [str , Collection [str ]]:
434+ """
435+ Build the Step dictionary
436+ :param name: Step name
437+ :param command: e.g. 'echo "Hello!"' | e.g. for script 's3://.../script.sh arg1 arg2'
438+ :param action_on_failure: 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
439+ :param script: True for raw command or False for script runner (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
440+ :return: Step Dict
441+ """
442+ jar : str = "command-runner.jar"
443+ if script is True :
444+ region : str = self ._session .region_name
445+ jar = f"s3://{ region } .elasticmapreduce/libs/script-runner/script-runner.jar"
372446 step = {
373447 "Name" : name ,
374448 "ActionOnFailure" : action_on_failure ,
375449 "HadoopJarStep" : {
376- "Jar" : f"s3:// { region } .elasticmapreduce/libs/script-runner/script-runner. jar" ,
377- "Args" : cmd .split (" " )
450+ "Jar" : jar ,
451+ "Args" : command .split (" " )
378452 }
379453 }
380- response : Dict = self ._client_emr .add_job_flow_steps (JobFlowId = cluster_id , Steps = [step ])
381- logger .info (f"response: \n { json .dumps (response , default = str , indent = 4 )} " )
382- return response ["StepIds" ][0 ]
454+ return step
383455
384456 def get_step_state (self , cluster_id : str , step_id : str ) -> str :
385457 """
0 commit comments