Skip to content

Commit fbcc4bf

Browse files
committed
Let Pulsar job runners choose a client manager
Add method to inject kwargs into the `build_client_manager` function.
1 parent e907e8e commit fbcc4bf

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

lib/galaxy/jobs/runners/pulsar.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,15 @@ def _init_client_manager(self):
247247
for kwd in self.runner_params.keys():
248248
if kwd.startswith("amqp_") or kwd.startswith("transport_"):
249249
client_manager_kwargs[kwd] = self.runner_params[kwd]
250+
251+
client_manager_kwargs.update(self._init_client_manager_extend_kwargs(**client_manager_kwargs))
252+
250253
self.client_manager = build_client_manager(**client_manager_kwargs)
251254

255+
def _init_client_manager_extend_kwargs(self, **kwargs):
256+
"""Override this method to pass additional keyword arguments to the client manager or alter existing ones."""
257+
return kwargs
258+
252259
def __init_pulsar_app(self, conf, pulsar_conf_path):
253260
if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
254261
self.pulsar_app = None
@@ -1097,6 +1104,10 @@ def queue_job(self, job_wrapper):
10971104

10981105
return super().queue_job(job_wrapper)
10991106

1107+
def _init_client_manager_extend_kwargs(self, **kwargs):
1108+
kwargs = super()._init_client_manager_extend_kwargs(**kwargs)
1109+
kwargs["arc_enabled"] = True
1110+
return kwargs
11001111

11011112
KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS}
11021113

0 commit comments

Comments
 (0)