Skip to content

Commit 6b65775

Browse files
committed
Let Pulsar job runners choose a client manager
Add method to inject kwargs into the `build_client_manager()` function.
1 parent f0e5816 commit 6b65775

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

lib/galaxy/jobs/runners/pulsar.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,15 @@ def _init_client_manager(self):
248248
for kwd in self.runner_params.keys():
249249
if kwd.startswith("amqp_") or kwd.startswith("transport_"):
250250
client_manager_kwargs[kwd] = self.runner_params[kwd]
251+
252+
client_manager_kwargs.update(self._init_client_manager_extend_kwargs(**client_manager_kwargs))
253+
251254
self.client_manager = build_client_manager(**client_manager_kwargs)
252255

256+
def _init_client_manager_extend_kwargs(self, **kwargs):
257+
"""Override this method to consider additional (or alter) keyword arguments when building the client manager."""
258+
return kwargs
259+
253260
def __init_pulsar_app(self, conf, pulsar_conf_path):
254261
if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
255262
self.pulsar_app = None
@@ -1099,6 +1106,11 @@ def queue_job(self, job_wrapper):
10991106

11001107
return super().queue_job(job_wrapper)
11011108

1109+
def _init_client_manager_extend_kwargs(self, **kwargs):
1110+
kwargs = super()._init_client_manager_extend_kwargs(**kwargs)
1111+
kwargs["arc_enabled"] = True
1112+
return kwargs
1113+
11021114

11031115
KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS}
11041116

0 commit comments

Comments
 (0)