diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index 746fec10..4eea98b7 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -184,6 +184,7 @@ def __init__( max_dedicated_nodes=0, max_low_pri_nodes=0, subnet_id=None, + plugins=None, scheduling_target: SchedulingTarget = None, worker_on_master=None, ): @@ -199,6 +200,7 @@ def __init__( self.max_dedicated_nodes = max_dedicated_nodes self.max_low_pri_nodes = max_low_pri_nodes self.subnet_id = subnet_id + self.plugins = plugins self.worker_on_master = worker_on_master self.scheduling_target = scheduling_target @@ -210,6 +212,7 @@ def to_cluster_config(self): size=self.max_dedicated_nodes, size_low_priority=self.max_low_pri_nodes, subnet_id=self.subnet_id, + plugins=self.plugins, worker_on_master=self.worker_on_master, spark_configuration=self.spark_configuration, scheduling_target=self.scheduling_target, diff --git a/aztk_cli/config.py b/aztk_cli/config.py index 10fe3d4c..d8d9b7a7 100644 --- a/aztk_cli/config.py +++ b/aztk_cli/config.py @@ -178,6 +178,7 @@ def __init__(self): self.spark_env_sh = None self.core_site_xml = None self.subnet_id = None + self.plugins = [] self.worker_on_master = None self.scheduling_target = None self.jars = [] @@ -197,6 +198,10 @@ def _merge_dict(self, config): if cluster_configuration.get("size_low_priority") is not None: self.max_low_pri_nodes = cluster_configuration.get("size_low_priority") self.subnet_id = cluster_configuration.get("subnet_id") + if cluster_configuration.get("plugins") is not None: + for plugin in cluster_configuration.get("plugins"): + ref = PluginReference.from_dict(plugin) + self.plugins.append(ref.get_plugin()) self.worker_on_master = cluster_configuration.get("worker_on_master") scheduling_target = cluster_configuration.get("scheduling_target") if scheduling_target: diff --git a/aztk_cli/spark/endpoints/job/submit.py b/aztk_cli/spark/endpoints/job/submit.py index 8f236a32..efadc9fa 100644 --- a/aztk_cli/spark/endpoints/job/submit.py +++ b/aztk_cli/spark/endpoints/job/submit.py @@ -47,6 +47,7 @@ def execute(args: typing.NamedTuple): max_dedicated_nodes=job_conf.max_dedicated_nodes, max_low_pri_nodes=job_conf.max_low_pri_nodes, subnet_id=job_conf.subnet_id, + plugins=job_conf.plugins, worker_on_master=job_conf.worker_on_master, scheduling_target=job_conf.scheduling_target, )