@@ -374,6 +374,90 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
374374 launch_params ["setup_params" ] = setup_params
375375 return launch_params
376376
377+ def get_pulsar_app_config (
378+ self ,
379+ pulsar_app_config ,
380+ container ,
381+ wait_after_submission ,
382+ manager_name ,
383+ manager_type ,
384+ dependencies_description ,
385+ ):
386+
387+ pulsar_app_config = pulsar_app_config or {}
388+ manager_config = self ._ensure_manager_config (
389+ pulsar_app_config ,
390+ manager_name ,
391+ manager_type ,
392+ )
393+
394+ if (
395+ "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config
396+ ):
397+ pulsar_app_config ["staging_directory" ] = CONTAINER_STAGING_DIRECTORY
398+
399+ if self .amqp_key_prefix :
400+ pulsar_app_config ["amqp_key_prefix" ] = self .amqp_key_prefix
401+
402+ if "monitor" not in manager_config :
403+ manager_config ["monitor" ] = (
404+ MonitorStyle .BACKGROUND .value
405+ if wait_after_submission
406+ else MonitorStyle .NONE .value
407+ )
408+ if "persistence_directory" not in pulsar_app_config :
409+ pulsar_app_config ["persistence_directory" ] = os .path .join (
410+ CONTAINER_STAGING_DIRECTORY , "persisted_data"
411+ )
412+ elif "manager" in pulsar_app_config and manager_name != "_default_" :
413+ log .warning (
414+ "'manager' set in app config but client has non-default manager '%s', this will cause communication"
415+ " failures, remove `manager` from app or client config to fix" ,
416+ manager_name ,
417+ )
418+
419+ using_dependencies = container is None and dependencies_description is not None
420+ if using_dependencies and "dependency_resolution" not in pulsar_app_config :
421+ # Setup default dependency resolution for container above...
422+ dependency_resolution = {
423+ "cache" : False ,
424+ "use" : True ,
425+ "default_base_path" : "/pulsar_dependencies" ,
426+ "cache_dir" : "/pulsar_dependencies/_cache" ,
427+ "resolvers" : [
428+ { # TODO: add CVMFS resolution...
429+ "type" : "conda" ,
430+ "auto_init" : True ,
431+ "auto_install" : True ,
432+ "prefix" : "/pulsar_dependencies/conda" ,
433+ },
434+ {
435+ "type" : "conda" ,
436+ "auto_init" : True ,
437+ "auto_install" : True ,
438+ "prefix" : "/pulsar_dependencies/conda" ,
439+ "versionless" : True ,
440+ },
441+ ],
442+ }
443+ pulsar_app_config ["dependency_resolution" ] = dependency_resolution
444+ return pulsar_app_config
445+
446+ def _ensure_manager_config (self , pulsar_app_config , manager_name , manager_type ):
447+ if "manager" in pulsar_app_config :
448+ manager_config = pulsar_app_config ["manager" ]
449+ elif "managers" in pulsar_app_config :
450+ managers_config = pulsar_app_config ["managers" ]
451+ if manager_name not in managers_config :
452+ managers_config [manager_name ] = {}
453+ manager_config = managers_config [manager_name ]
454+ else :
455+ manager_config = {}
456+ pulsar_app_config ["manager" ] = manager_config
457+ if "type" not in manager_config :
458+ manager_config ["type" ] = manager_type
459+ return manager_config
460+
377461
378462class MessagingClientManagerProtocol (ClientManagerProtocol ):
379463 status_cache : Dict [str , Dict [str , Any ]]
@@ -513,48 +597,15 @@ def launch(
513597
514598 manager_name = self .client_manager .manager_name
515599 manager_type = "coexecution" if container is not None else "unqueued"
516- pulsar_app_config = pulsar_app_config or {}
517- manager_config = self ._ensure_manager_config (
518- pulsar_app_config , manager_name , manager_type ,
600+ pulsar_app_config = self .get_pulsar_app_config (
601+ pulsar_app_config = pulsar_app_config ,
602+ container = container ,
603+ wait_after_submission = wait_after_submission ,
604+ manager_name = manager_name ,
605+ manager_type = manager_type ,
606+ dependencies_description = dependencies_description ,
519607 )
520608
521- if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config :
522- pulsar_app_config ["staging_directory" ] = CONTAINER_STAGING_DIRECTORY
523-
524- if self .amqp_key_prefix :
525- pulsar_app_config ["amqp_key_prefix" ] = self .amqp_key_prefix
526-
527- if "monitor" not in manager_config :
528- manager_config ["monitor" ] = MonitorStyle .BACKGROUND .value if wait_after_submission else MonitorStyle .NONE .value
529- if "persistence_directory" not in pulsar_app_config :
530- pulsar_app_config ["persistence_directory" ] = os .path .join (CONTAINER_STAGING_DIRECTORY , "persisted_data" )
531- elif "manager" in pulsar_app_config and manager_name != '_default_' :
532- log .warning (
533- "'manager' set in app config but client has non-default manager '%s', this will cause communication"
534- " failures, remove `manager` from app or client config to fix" , manager_name )
535-
536- using_dependencies = container is None and dependencies_description is not None
537- if using_dependencies and "dependency_resolution" not in pulsar_app_config :
538- # Setup default dependency resolution for container above...
539- dependency_resolution = {
540- "cache" : False ,
541- "use" : True ,
542- "default_base_path" : "/pulsar_dependencies" ,
543- "cache_dir" : "/pulsar_dependencies/_cache" ,
544- "resolvers" : [{ # TODO: add CVMFS resolution...
545- "type" : "conda" ,
546- "auto_init" : True ,
547- "auto_install" : True ,
548- "prefix" : '/pulsar_dependencies/conda' ,
549- }, {
550- "type" : "conda" ,
551- "auto_init" : True ,
552- "auto_install" : True ,
553- "prefix" : '/pulsar_dependencies/conda' ,
554- "versionless" : True ,
555- }]
556- }
557- pulsar_app_config ["dependency_resolution" ] = dependency_resolution
558609 base64_message = to_base64_json (launch_params )
559610 base64_app_conf = to_base64_json (pulsar_app_config )
560611 pulsar_container_image = self .pulsar_container_image
@@ -606,21 +657,6 @@ def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_ar
606657 manager_args .extend (["--base64" , base64_job , "--app_conf_base64" , base64_app_conf ])
607658 return manager_args
608659
609- def _ensure_manager_config (self , pulsar_app_config , manager_name , manager_type ):
610- if "manager" in pulsar_app_config :
611- manager_config = pulsar_app_config ["manager" ]
612- elif "managers" in pulsar_app_config :
613- managers_config = pulsar_app_config ["managers" ]
614- if manager_name not in managers_config :
615- managers_config [manager_name ] = {}
616- manager_config = managers_config [manager_name ]
617- else :
618- manager_config = {}
619- pulsar_app_config ["manager" ] = manager_config
620- if "type" not in manager_config :
621- manager_config ["type" ] = manager_type
622- return manager_config
623-
624660 def _launch_containers (
625661 self ,
626662 pulsar_submit_container : CoexecutionContainerCommand ,
0 commit comments