@@ -67,7 +67,7 @@ async def _(**pipeline_kwargs) -> CompPipelineAtDB:
6767 yield _
6868
6969 # cleanup
70- async with sqlalchemy_async_engine .connect () as conn :
70+ async with sqlalchemy_async_engine .begin () as conn :
7171 await conn .execute (
7272 comp_pipeline .delete ().where (
7373 comp_pipeline .c .project_id .in_ (created_pipeline_ids )
@@ -123,7 +123,7 @@ async def _(
123123 ),
124124 "node_class" : to_node_class (node_data .key ),
125125 "internal_id" : internal_id + 1 ,
126- "submit" : datetime .datetime .now (tz = datetime . UTC ),
126+ "submit" : datetime .datetime .now (),
127127 "job_id" : generate_dask_job_id (
128128 service_key = node_data .key ,
129129 service_version = node_data .version ,
@@ -133,7 +133,7 @@ async def _(
133133 ),
134134 }
135135 task_config .update (** overrides_kwargs )
136- async with sqlalchemy_async_engine .connect () as conn :
136+ async with sqlalchemy_async_engine .begin () as conn :
137137 result = await conn .execute (
138138 comp_tasks .insert ()
139139 .values (** task_config )
@@ -147,7 +147,7 @@ async def _(
147147 yield _
148148
149149 # cleanup
150- async with sqlalchemy_async_engine .connect () as conn :
150+ async with sqlalchemy_async_engine .begin () as conn :
151151 await conn .execute (
152152 comp_tasks .delete ().where (comp_tasks .c .task_id .in_ (created_task_ids ))
153153 )
@@ -197,14 +197,14 @@ async def _(
197197 ) -> CompRunsAtDB :
198198 run_config = {
199199 "project_uuid" : f"{ project .uuid } " ,
200- "user_id" : f" { user ['id' ] } " ,
200+ "user_id" : user ["id" ] ,
201201 "iteration" : 1 ,
202202 "result" : StateType .NOT_STARTED ,
203203 "metadata" : run_metadata ,
204204 "use_on_demand_clusters" : False ,
205205 }
206206 run_config .update (** run_kwargs )
207- async with sqlalchemy_async_engine .connect () as conn :
207+ async with sqlalchemy_async_engine .begin () as conn :
208208 result = await conn .execute (
209209 comp_runs .insert ()
210210 .values (** jsonable_encoder (run_config ))
@@ -217,7 +217,7 @@ async def _(
217217 yield _
218218
219219 # cleanup
220- async with sqlalchemy_async_engine .connect () as conn :
220+ async with sqlalchemy_async_engine .begin () as conn :
221221 await conn .execute (
222222 comp_runs .delete ().where (comp_runs .c .run_id .in_ (created_run_ids ))
223223 )
@@ -242,7 +242,7 @@ async def _(user: dict[str, Any], **cluster_kwargs) -> Cluster:
242242 new_cluster = Cluster .model_validate (cluster_config )
243243 assert new_cluster
244244
245- async with sqlalchemy_async_engine .connect () as conn :
245+ async with sqlalchemy_async_engine .begin () as conn :
246246 # insert basic cluster
247247 created_cluster = (
248248 await conn .execute (
@@ -298,7 +298,7 @@ async def _(user: dict[str, Any], **cluster_kwargs) -> Cluster:
298298 yield _
299299
300300 # cleanup
301- async with sqlalchemy_async_engine .connect () as conn :
301+ async with sqlalchemy_async_engine .begin () as conn :
302302 await conn .execute (
303303 clusters .delete ().where (clusters .c .id .in_ (created_cluster_ids ))
304304 )
@@ -318,6 +318,7 @@ async def publish_project(
318318 async def _ () -> PublishedProject :
319319 created_project = await project (user , workbench = fake_workbench_without_outputs )
320320 return PublishedProject (
321+ user = user ,
321322 project = created_project ,
322323 pipeline = await create_pipeline (
323324 project_id = f"{ created_project .uuid } " ,
@@ -352,6 +353,7 @@ async def running_project(
352353 created_project = await project (user , workbench = fake_workbench_without_outputs )
353354 now_time = arrow .utcnow ().datetime
354355 return RunningProject (
356+ user = user ,
355357 project = created_project ,
356358 pipeline = await create_pipeline (
357359 project_id = f"{ created_project .uuid } " ,
@@ -388,6 +390,7 @@ async def running_project_mark_for_cancellation(
388390 created_project = await project (user , workbench = fake_workbench_without_outputs )
389391 now_time = arrow .utcnow ().datetime
390392 return RunningProject (
393+ user = user ,
391394 project = created_project ,
392395 pipeline = await create_pipeline (
393396 project_id = f"{ created_project .uuid } " ,
0 commit comments