4040from sqlalchemy import select
4141
4242from galaxy import model
43+ from galaxy .authnz .util import provider_name_to_backend
4344from galaxy .job_execution .compute_environment import (
4445 ComputeEnvironment ,
4546 dataset_path_to_extra_path ,
@@ -1075,6 +1076,7 @@ def _populate_parameter_defaults(self, job_destination):
10751076 "default_file_action" : "json_transfer" ,
10761077}
10771078
1079+
10781080class PulsarARCJobRunner (PulsarCoexecutionJobRunner ):
10791081 runner_name = "PulsarARCJobRunner"
10801082
@@ -1090,17 +1092,63 @@ def get_client_from_state(self, job_state):
10901092
10911093 def queue_job (self , job_wrapper ):
10921094 """
1093- Inject user's own ARC endpoint and OIDC token if defined as destination parameters.
1095+ Queue a job to run it using the Pulsar ARC client.
1096+
1097+ ARC supports authentication via either x509 certificates or OIDC tokens. Since Galaxy only supports the latter
1098+ (through OIDC providers), the Pulsar ARC client implementation is designed to work with OIDC. Thus, to run jobs,
1099+ the Pulsar ARC client needs an ARC endpoint URL and an OIDC access token. Those are passed as destination
1100+ parameters.
1101+
1102+ OIDC tokens are, for obvious reasons, not meant to be part of the job configuration file nor of TPV
1103+ configuration files; they have to be obtained before the job is queued. For admins, it may also be interesting
1104+ to have a mechanism to inject an ARC endpoint URL from the user preferences, so that users can configure their
1105+ own ARC endpoint URLs.
1106+
1107+ Therefore, this method provides a framework to:
1108+ - Obtain an ARC endpoint URL from the user's preferences (if enabled).
1109+ - Obtain an OIDC access token for the user running the job.
1110+ - Decide which OIDC provider to obtain the token from if multiple are available.
1111+
1112+ To let users configure their own settings, admins have to set the destination parameter
1113+ "arc_user_preferences_key". Galaxy will then read the options "arc_url" and "arc_oidc_provider" under that key
1114+ from the user extra preferences. Both are optional; if the user does not configure any, the destination defaults
1115+ will be used. If no destination default exists and the user account is associated with exactly one OIDC
1116+ provider, then Galaxy will use that provider.
10941117 """
1118+ job = job_wrapper .get_job ()
1119+ user = job .user
1120+
1121+ extra_user_preferences_key = job_wrapper .job_destination .params .get ("arc_user_preferences_key" )
1122+ # for example, "distributed_compute_arc"
1123+
1124+ user_arc_url = (
1125+ user .extra_preferences .get (f"{ extra_user_preferences_key } |arc_url" ) if extra_user_preferences_key else None
1126+ )
1127+ user_arc_oidc_provider = (
1128+ user .extra_preferences .get (f"{ extra_user_preferences_key } |arc_oidc_provider" )
1129+ if extra_user_preferences_key
1130+ else None
1131+ )
10951132 destination_arc_url = job_wrapper .job_destination .params .get ("arc_url" )
1096- destination_oidc_token = job_wrapper .job_destination .params .get ("oidc_token" )
1097- user_arc_url = job_wrapper .get_job ().user .extra_preferences .get ("distributed_arc_compute|remote_arc_resources" )
1098- user_oidc_token = job_wrapper .get_job ().user .extra_preferences .get ("distributed_arc_compute|remote_arc_token" )
1133+ destination_oidc_provider = job_wrapper .job_destination .params .get ("arc_oidc_provider" )
1134+ arc_url = user_arc_url or destination_arc_url
1135+ arc_oidc_provider = user_arc_oidc_provider or destination_oidc_provider
1136+ if arc_oidc_provider is None :
1137+ user_oidc_providers = [auth .provider for auth in user .custos_auth + user .social_auth ]
1138+ if len (user_oidc_providers ) > 1 :
1139+ raise Exception (
1140+ f"Multiple identity providers are linked to your user account '{ user .username } ', please select one "
1141+ f"in your user preferences to launch ARC jobs."
1142+ )
1143+ elif len (user_oidc_providers ) == 0 :
1144+ raise Exception (
1145+ f"No identity provider is linked to your user account '{ user .username } ', please log in using an "
1146+ f"identity provider to launch ARC jobs." )
1147+ arc_oidc_provider = user_oidc_providers [0 ]
1148+ arc_oidc_provider_backend = provider_name_to_backend (arc_oidc_provider )
1149+ arc_oidc_token = user .get_oidc_tokens (arc_oidc_provider_backend )["access" ]
10991150
1100- job_wrapper .job_destination .params .update ({
1101- "arc_url" : user_arc_url or destination_arc_url ,
1102- "oidc_token" : user_oidc_token or destination_oidc_token ,
1103- })
1151+ job_wrapper .job_destination .params .update ({"arc_url" : arc_url , "arc_oidc_token" : arc_oidc_token })
11041152
11051153 return super ().queue_job (job_wrapper )
11061154
@@ -1109,6 +1157,7 @@ def _init_client_manager_extend_kwargs(self, **kwargs):
11091157 kwargs ["arc_enabled" ] = True
11101158 return kwargs
11111159
1160+
11121161KUBERNETES_DESTINATION_DEFAULTS : Dict [str , Any ] = {"k8s_enabled" : True , ** COEXECUTION_DESTINATION_DEFAULTS }
11131162
11141163
0 commit comments