|
63 | 63 | log = logging.getLogger(__name__) |
64 | 64 |
|
65 | 65 | __all__ = ( |
| 66 | + "PulsarARCJobRunner", |
66 | 67 | "PulsarLegacyJobRunner", |
67 | 68 | "PulsarRESTJobRunner", |
68 | 69 | "PulsarMQJobRunner", |
@@ -1061,6 +1062,44 @@ def _populate_parameter_defaults(self, job_destination): |
1061 | 1062 | pulsar_app_config["staging_directory"] = params.get("jobs_directory") |
1062 | 1063 |
|
1063 | 1064 |
|
| 1065 | +ARC_DESTINATION_DEFAULTS: Dict[str, Any] = { |
| 1066 | + **COEXECUTION_DESTINATION_DEFAULTS, |
| 1067 | + "default_file_action": "json_transfer", |
| 1068 | +} |
| 1069 | + |
| 1070 | + |
| 1071 | +class PulsarARCJobRunner(PulsarCoexecutionJobRunner): |
| 1072 | + runner_name = "PulsarARCJobRunner" |
| 1073 | + |
| 1074 | + destination_defaults = ARC_DESTINATION_DEFAULTS |
| 1075 | + |
| 1076 | + use_mq = False |
| 1077 | + poll = True |
| 1078 | + |
| 1079 | + def get_client_from_state(self, job_state): |
| 1080 | + client = super().get_client_from_state(job_state) |
| 1081 | + client._arc_job_id = job_state.job_id # used by the client to get the job state |
| 1082 | + return client |
| 1083 | + |
| 1084 | + def queue_job(self, job_wrapper): |
| 1085 | + """ |
| 1086 | + Inject user's own ARC endpoint and OIDC token (if defined) as destination parameters. |
| 1087 | + """ |
| 1088 | + destination_arc_url = job_wrapper.job_destination.params.get("arc_url") |
| 1089 | + destination_oidc_token = job_wrapper.job_destination.params.get("oidc_token") |
| 1090 | + user_arc_url = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_resources") |
| 1091 | + user_oidc_token = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_token") |
| 1092 | + |
| 1093 | + job_wrapper.job_destination.params.update( |
| 1094 | + { |
| 1095 | + "arc_url": user_arc_url or destination_arc_url, |
| 1096 | + "oidc_token": user_oidc_token or destination_oidc_token, |
| 1097 | + } |
| 1098 | + ) |
| 1099 | + |
| 1100 | + return super().queue_job(job_wrapper) |
| 1101 | + |
| 1102 | + |
1064 | 1103 | KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS} |
1065 | 1104 |
|
1066 | 1105 |
|
|
0 commit comments