@@ -80,28 +80,22 @@ async def run(self, limit: int = 5, wait: int = 5):
8080
8181 def create_dynamic_environment (self , job : Mapping [str , Any ]) -> int :
8282 # TODO: implement this
83- print (f'Creating dynamic environment for job { job ["job_id" ]} ' )
8483 return 0
8584
8685 async def dispatch_composition (self , job : Mapping [str , Any ]):
87- print (f'Registered processes in dispatcher.dispatch: { app_registrar .registered_addresses } ' )
88-
8986 job_status = job ["status" ]
9087 job_id = job ["job_id" ]
9188 if job_status .lower () == "pending" :
9289 try :
9390 # install simulators required TODO: implement this
9491 self .create_dynamic_environment (job )
9592
96- # TODO: Now, search job spec for possible file uploads and read them in locally
97-
9893 # change job status to IN_PROGRESS
99- # await self.db_connector.update_job(job_id=job_id, status="IN_PROGRESS")
94+ await self .db_connector .update_job (job_id = job_id , status = "IN_PROGRESS" )
10095
10196 # get request params and parse remote file uploads if needed
10297 input_state = job ["spec" ]
10398 duration = job .get ("duration" , 1 )
104-
10599 for process_name , process_spec in input_state .items ():
106100 print (f'Process { process_name } has spec { process_spec } ' )
107101 process_config = process_spec ["config" ]
@@ -118,51 +112,39 @@ async def dispatch_composition(self, job: Mapping[str, Any]):
118112 local_fp = download_file_from_bucket (source_blob_path = source_fp , out_dir = temp_dest , bucket_name = DEFAULT_BUCKET_NAME )
119113 process_spec ["config" ]["mesh_file" ] = local_fp
120114
121- print (f'Now the input state is:\n { input_state } ' )
122- process = SimpleMembraneProcess (config = input_state ['membrane' ]['config' ], core = app_registrar .core )
123- print (process )
124115 # generate composition instance
125116 composition = self .generate_composite (input_state )
126- print (f'Composition state is:\n { composition .state } ' )
127- # get composition results and state
128117
129- # results = self.generate_composition_results(input_state, duration)
118+ # get composition results and state
119+ results = self .generate_composition_results (input_state , duration )
130120 state = self .generate_composition_state (composition )
131- print (f'Composition state is:\n { state } ' )
132- process = composition .state ['membrane' ]['instance' ]
133- print (f'Process state is:\n { process .initial_state ()} ' )
134- process .update (process .initial_state (), 1 )
135-
136- # print(f'Generated results: {results} and state: {state}')
137121
138122 # change status to complete and write results in DB
139- # await self.db_connector.update_job(
140- # job_id=job_id,
141- # status="COMPLETE",
142- # results=results
143- # )
123+ await self .db_connector .update_job (
124+ job_id = job_id ,
125+ status = "COMPLETE" ,
126+ results = results
127+ )
144128
145129 # write new result state to states collection
146- # await self.db_connector.write(
147- # collection_name="result_states",
148- # job_id=job_id,
149- # data=state,
150- # last_updated=self.db_connector.timestamp()
151- # )
130+ await self .db_connector .write (
131+ collection_name = "result_states" ,
132+ job_id = job_id ,
133+ data = state ,
134+ last_updated = self .db_connector .timestamp ()
135+ )
152136 except Exception as e :
153137 logger .error (f"Exception while dispatching { job_id } : { e } " )
154138 failed_job = self .generate_failed_job (job_id , str (e ))
155- # await self.db_connector.update_job(**failed_job)
139+ await self .db_connector .update_job (** failed_job )
156140
157141 def generate_composite (self , input_state ) -> Composite :
158- print (f'Got the input state in generate_composite:\n { input_state } ' )
159142 return Composite (
160143 config = {"state" : input_state },
161144 core = app_registrar .core
162145 )
163146
164147 def generate_composition_results (self , composition : Composite , duration : int ) -> ResultData :
165- print (f'Got the composition before results\n { composition .state } ' )
166148 # run the composition
167149 composition .run (duration )
168150
0 commit comments