Skip to content

Commit 583a405

Browse files
committed
Improve configurability and OIDC support for the ARC Pulsar job runner
Rewrite the method `PulsarARCJobRunner.queue_job()` so that it: - Obtains the ARC endpoint URL either from the user's preferences or the destination parameters. - Requests an OIDC access token for the user running the job. - Decides which OIDC provider to get the token from if multiple are available, based on the user's preferences and the destination parameters. To let users configure their own settings, admins have to set the destination parameter "arc_user_preferences_key". Galaxy will then read the options "arc_url" and "arc_oidc_provider" under that key from the user extra preferences. Both are optional; if the user does not configure a value, the destination default will be used. If no destination default exists and the user account is associated with exactly one OIDC provider, then Galaxy will use that provider.
1 parent 6b65775 commit 583a405

File tree

1 file changed

+56
-10
lines changed

1 file changed

+56
-10
lines changed

lib/galaxy/jobs/runners/pulsar.py

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from sqlalchemy import select
4141

4242
from galaxy import model
43+
from galaxy.authnz.util import provider_name_to_backend
4344
from galaxy.job_execution.compute_environment import (
4445
ComputeEnvironment,
4546
dataset_path_to_extra_path,
@@ -1090,19 +1091,64 @@ def get_client_from_state(self, job_state):
10901091

10911092
def queue_job(self, job_wrapper):
10921093
"""
1093-
Inject user's own ARC endpoint and OIDC token (if defined) as destination parameters.
1094+
Queue a job to run it using the Pulsar ARC client.
1095+
1096+
ARC supports authentication via either x509 certificates or OIDC tokens. Since Galaxy only supports the latter
1097+
(through OIDC providers), the Pulsar ARC client implementation is designed to work with OIDC. Thus, to run jobs,
1098+
the Pulsar ARC client needs an ARC endpoint URL and an OIDC access token. Those are passed as destination
1099+
parameters.
1100+
1101+
OIDC tokens are, for obvious reasons, not meant to be part of the job configuration file nor of TPV
1102+
configuration files; they have to be obtained before the job is queued. For admins, it may also be interesting
1103+
to have a mechanism to inject an ARC endpoint URL from the user preferences, so that users can configure their
1104+
own ARC endpoint URLs.
1105+
1106+
Therefore, this method provides a framework to:
1107+
- Obtain an ARC endpoint URL from the user's preferences (if enabled).
1108+
- Obtain an OIDC access token for the user running the job.
1109+
- Decide which OIDC provider to obtain the token from if multiple are available.
1110+
1111+
To let users configure their own settings, admins have to set the destination parameter
1112+
"arc_user_preferences_key". Galaxy will then read the options "arc_url" and "arc_oidc_provider" under that key
1113+
from the user extra preferences. Both are optional; if the user does not configure any, the destination defaults
1114+
will be used. If no destination default exists and the user account is associated with exactly one OIDC
1115+
provider, then Galaxy will use that provider.
10941116
"""
1095-
destination_arc_url = job_wrapper.job_destination.params.get("arc_url")
1096-
destination_oidc_token = job_wrapper.job_destination.params.get("oidc_token")
1097-
user_arc_url = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_resources")
1098-
user_oidc_token = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_token")
1117+
job = job_wrapper.get_job()
1118+
user = job.user
10991119

1100-
job_wrapper.job_destination.params.update(
1101-
{
1102-
"arc_url": user_arc_url or destination_arc_url,
1103-
"oidc_token": user_oidc_token or destination_oidc_token,
1104-
}
1120+
extra_user_preferences_key = job_wrapper.job_destination.params.get("arc_user_preferences_key")
1121+
# for example, "distributed_compute_arc"
1122+
1123+
user_arc_url = (
1124+
user.extra_preferences.get(f"{extra_user_preferences_key}|arc_url") if extra_user_preferences_key else None
11051125
)
1126+
user_arc_oidc_provider = (
1127+
user.extra_preferences.get(f"{extra_user_preferences_key}|arc_oidc_provider")
1128+
if extra_user_preferences_key
1129+
else None
1130+
)
1131+
destination_arc_url = job_wrapper.job_destination.params.get("arc_url")
1132+
destination_oidc_provider = job_wrapper.job_destination.params.get("arc_oidc_provider")
1133+
arc_url = user_arc_url or destination_arc_url
1134+
arc_oidc_provider = user_arc_oidc_provider or destination_oidc_provider
1135+
if arc_oidc_provider is None:
1136+
user_oidc_providers = [auth.provider for auth in user.custos_auth + user.social_auth]
1137+
if len(user_oidc_providers) > 1:
1138+
raise Exception(
1139+
f"Multiple identity providers are linked to your user account '{user.username}', please select one "
1140+
f"in your user preferences to launch ARC jobs."
1141+
)
1142+
elif len(user_oidc_providers) == 0:
1143+
raise Exception(
1144+
f"No identity provider is linked to your user account '{user.username}', please log in using an "
1145+
f"identity provider to launch ARC jobs."
1146+
)
1147+
arc_oidc_provider = user_oidc_providers[0]
1148+
arc_oidc_provider_backend = provider_name_to_backend(arc_oidc_provider)
1149+
arc_oidc_token = user.get_oidc_tokens(arc_oidc_provider_backend)["access"]
1150+
1151+
job_wrapper.job_destination.params.update({"arc_url": arc_url, "arc_oidc_token": arc_oidc_token})
11061152

11071153
return super().queue_job(job_wrapper)
11081154

0 commit comments

Comments
 (0)