@@ -1061,6 +1061,43 @@ def _populate_parameter_defaults(self, job_destination):
10611061 pulsar_app_config ["staging_directory" ] = params .get ("jobs_directory" )
10621062
10631063
1064+ ARC_DESTINATION_DEFAULTS : Dict [str , Any ] = {
1065+ "arc_enabled" : True ,
1066+ "arc_url" : PARAMETER_SPECIFICATION_REQUIRED ,
1067+ ** COEXECUTION_DESTINATION_DEFAULTS ,
1068+ "default_file_action" : "json_transfer" ,
1069+ }
1070+
1071+ class PulsarARCJobRunner (PulsarCoexecutionJobRunner ):
1072+ runner_name = "PulsarARCJobRunner"
1073+
1074+ destination_defaults = ARC_DESTINATION_DEFAULTS
1075+
1076+ use_mq = False
1077+ poll = True
1078+
1079+ def get_client_from_state (self , job_state ):
1080+ client = super ().get_client_from_state (job_state )
1081+ client ._arc_job_id = job_state .job_id # used by the client to get the job state
1082+ return client
1083+
1084+ def queue_job (self , job_wrapper ):
1085+ """
1086+ Inject user's own ARC endpoint and OIDC token if defined as destination parameters.
1087+ """
1088+ destination_arc_url = job_wrapper .job_destination .params .get ("arc_url" )
1089+ destination_oidc_token = job_wrapper .job_destination .params .get ("oidc_token" )
1090+ user_arc_url = job_wrapper .get_job ().user .extra_preferences .get ("distributed_arc_compute|remote_arc_resources" )
1091+ user_oidc_token = job_wrapper .get_job ().user .extra_preferences .get ("distributed_arc_compute|remote_arc_token" )
1092+
1093+ job_wrapper .job_destination .params .update ({
1094+ "arc_url" : user_arc_url or destination_arc_url ,
1095+ "oidc_token" : user_oidc_token or destination_oidc_token ,
1096+ })
1097+
1098+ return super ().queue_job (job_wrapper )
1099+
1100+
10641101KUBERNETES_DESTINATION_DEFAULTS : Dict [str , Any ] = {"k8s_enabled" : True , ** COEXECUTION_DESTINATION_DEFAULTS }
10651102
10661103
0 commit comments