44import logging
55import os
66import uuid
7+ import urllib .error
8+ import urllib .request
79from typing import Any , Dict , Optional
810
911import boto3
2224NSC_KEY_ENV = "NSC_KEY"
2325NV_KEY_ENV = "NV_KEY"
2426
27+ DEFAULT_TASK_SIZE = "standard"
28+
2529
2630def _log (job_id : str , message : str , ** details : Any ) -> None :
2731 payload = {"job_id" : job_id , "message" : message , ** details }
2832 # Ensure consistent JSON logging for ingestion/filtering.
2933 logger .info (json .dumps (payload ))
3034
3135
36+ def _compose_api_base_url (environment : str ) -> str :
37+ env = (environment or "production" ).lower ()
38+ if env == "staging" :
39+ return "https://synth.neurostore.xyz/api"
40+ if env == "local" :
41+ return "http://localhost:81/api"
42+ return "https://compose.neurosynth.org/api"
43+
44+
45+ def _fetch_meta_analysis (meta_analysis_id : str , environment : str ) -> Optional [Dict [str , Any ]]:
46+ base_url = _compose_api_base_url (environment ).rstrip ("/" )
47+ url = f"{ base_url } /meta-analyses/{ meta_analysis_id } ?nested=true"
48+ request = urllib .request .Request (url , headers = {"User-Agent" : "compose-runner/submit" })
49+ try :
50+ with urllib .request .urlopen (request , timeout = 10 ) as response :
51+ return json .load (response )
52+ except (urllib .error .URLError , urllib .error .HTTPError , json .JSONDecodeError ) as exc :
53+ logger .warning ("Failed to fetch meta-analysis %s: %s" , meta_analysis_id , exc )
54+ return None
55+
56+
57+ def _requires_large_task (specification : Dict [str , Any ]) -> bool :
58+ if not isinstance (specification , dict ):
59+ return False
60+ corrector = specification .get ("corrector" )
61+ if not isinstance (corrector , dict ):
62+ return False
63+ if corrector .get ("type" ) != "FWECorrector" :
64+ return False
65+ args = corrector .get ("args" )
66+ if not isinstance (args , dict ):
67+ return False
68+ method = args .get ("method" )
69+ if method is None :
70+ kwargs = args .get ("**kwargs" )
71+ if isinstance (kwargs , dict ):
72+ method = kwargs .get ("method" )
73+ if isinstance (method , str ) and method .lower () == "montecarlo" :
74+ return True
75+ return False
76+
77+
78+ def _select_task_size (meta_analysis_id : str , environment : str , artifact_prefix : str ) -> str :
79+ doc = _fetch_meta_analysis (meta_analysis_id , environment )
80+ if not doc :
81+ return DEFAULT_TASK_SIZE
82+ specification = doc .get ("specification" )
83+ try :
84+ if _requires_large_task (specification ):
85+ _log (
86+ artifact_prefix ,
87+ "workflow.task_size_selected" ,
88+ task_size = "large" ,
89+ reason = "montecarlo_fwe" ,
90+ )
91+ return "large"
92+ except Exception as exc : # noqa: broad-except
93+ logger .warning ("Failed to evaluate specification for %s: %s" , meta_analysis_id , exc )
94+ return DEFAULT_TASK_SIZE
95+
96+
3297def _job_input (
3398 payload : Dict [str , Any ],
3499 artifact_prefix : str ,
35100 bucket : Optional [str ],
36101 prefix : Optional [str ],
37102 nsc_key : Optional [str ],
38103 nv_key : Optional [str ],
104+ task_size : str ,
39105) -> Dict [str , Any ]:
40106 no_upload_flag = bool (payload .get ("no_upload" , False ))
41107 doc : Dict [str , Any ] = {
@@ -44,6 +110,7 @@ def _job_input(
44110 "environment" : payload .get ("environment" , "production" ),
45111 "no_upload" : "true" if no_upload_flag else "false" ,
46112 "results" : {"bucket" : bucket or "" , "prefix" : prefix or "" },
113+ "task_size" : task_size ,
47114 }
48115 n_cores = payload .get ("n_cores" )
49116 doc ["n_cores" ] = str (n_cores ) if n_cores is not None else ""
@@ -76,7 +143,10 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
76143 nsc_key = payload .get ("nsc_key" ) or os .environ .get (NSC_KEY_ENV )
77144 nv_key = payload .get ("nv_key" ) or os .environ .get (NV_KEY_ENV )
78145
79- job_input = _job_input (payload , artifact_prefix , bucket , prefix , nsc_key , nv_key )
146+ environment = payload .get ("environment" , "production" )
147+ task_size = _select_task_size (payload ["meta_analysis_id" ], environment , artifact_prefix )
148+
149+ job_input = _job_input (payload , artifact_prefix , bucket , prefix , nsc_key , nv_key , task_size )
80150 params = {
81151 "stateMachineArn" : os .environ [STATE_MACHINE_ARN_ENV ],
82152 "name" : artifact_prefix ,
0 commit comments