@@ -254,123 +254,129 @@ module Make (S : Terrat_vcs_provider2.S) = struct
254254 in
255255 Fc. with_finally
256256 (fun () ->
257- let open Irm in
258- with_conn storage ~f: (fun db ->
259- let open Irm in
260- Pgsql_io. tx db ~f: (fun () -> S.Db. store_account_repository ~request_id db account repo)
261- >> = fun () ->
262- let open Abb.Future.Infix_monad in
263- Builder.State. make
264- ~log_id: request_id
265- ~config
266- ~store
267- ~db
268- ~exec
269- ~tasks: (Tasks_pr. tasks tasks)
270- ()
271- >> = fun s ->
272- let open Irm in
273- log_err ~request_id @@ Builder. eval s Keys. update_context_branch_hashes
274- >> = fun () ->
275- Pgsql_io. tx db ~f: (fun () ->
276- let open Abb.Future.Infix_monad in
277- Builder.State. make
278- ~log_id: request_id
279- ~config
280- ~store
281- ~db
282- ~exec
283- ~tasks: (Tasks_pr. tasks tasks)
284- ()
285- >> = fun s ->
286- let open Irm in
287- log_err ~request_id @@ Builder. eval s Keys. get_context_for_pull_request
288- >> = fun context ->
289- let s =
290- s
291- |> Builder.State. orig_store
292- |> Keys.Key. add Keys. context context
293- |> CCFun. flip Builder.State. set_orig_store s
294- in
295- Abb.Future. return (Ok s))
296- >> = fun s ->
297- Pgsql_io. tx db ~f: (fun () ->
298- Logs. info (fun m ->
299- m
300- " %s : target=%s"
301- (Builder. log_id s)
302- (Hmap.Key. info Keys. eval_pull_request_event));
303- log_err ~request_id @@ Builder. eval s Keys. eval_pull_request_event))
304- >> = fun job ->
305- let open Abb.Future.Infix_monad in
306- with_conn storage ~f: (fun db ->
307- Pgsql_io. tx db ~f: (fun () ->
308- let open Abb.Future.Infix_monad in
309- let store = store |> Keys.Key. add Keys. job job in
310- Builder.State. make
311- ~log_id: (Builder. mk_log_id ~request_id job.Tjc.Job. id)
312- ~config
313- ~store
314- ~db
315- ~exec
316- ~tasks: (Tasks_pr. tasks tasks)
317- ()
318- >> = fun s ->
319- let open Irm in
320- log_err ~request_id @@ tx_safe ~request_id @@ Builder. eval s Keys. iter_job
321- >> = fun r -> Abb.Future. return (Ok (s, r))))
322- >> = fun iter_result ->
323- (* Call maybe_complete_job in a separate transaction *)
324- with_conn storage ~f: (fun db ->
325- Pgsql_io. tx db ~f: (fun () ->
326- match iter_result with
327- | Ok (s , _ ) ->
328- let store = store |> Keys.Key. add Keys. job job in
329- let open Abb.Future.Infix_monad in
330- let store = store |> Tasks_base. forward_std_keys s in
331- Builder.State. make
332- ~log_id: (Builder. mk_log_id ~request_id job.Tjc.Job. id)
333- ~config
334- ~store
335- ~db
336- ~exec
337- ~tasks: (Tasks_pr. tasks tasks)
338- ()
339- >> = fun s' ->
340- let open Irm in
341- log_err ~request_id
342- @@ tx_safe ~request_id
343- @@ Builder. eval s' Keys. maybe_complete_job
344- | Error _ -> Abb.Future. return (Ok `Noop )))
345- >> = fun _ ->
346- match iter_result with
347- | Ok (s , `Ok _ ) ->
348- with_conn storage ~f: (fun db ->
349- Pgsql_io. tx db ~f: (fun () ->
350- let store = store |> Keys.Key. add Keys. job job in
351- let open Abb.Future.Infix_monad in
352- let store = store |> Tasks_base. forward_std_keys s in
353- Builder.State. make
354- ~log_id: (Builder. mk_log_id ~request_id job.Tjc.Job. id)
355- ~config
356- ~store
357- ~db
358- ~exec
359- ~tasks: (Tasks_pr. tasks tasks)
360- ()
361- >> = fun s ->
362- log_err ~request_id @@ tx_safe ~request_id @@ Builder. eval s Keys. run_next_layer))
363- | Ok (_ , ((`Suspend_eval _ | `Noop ) as r )) -> Abb.Future. return (Ok r)
364- | Error err ->
365- Logs. info (fun m ->
366- m
367- " %s : JOB : FAILED : job_id= %a : %a"
368- request_id
369- Uuidm. pp
370- job.Tjc.Job. id
371- Builder. pp_err
372- err);
373- Abb.Future. return (Ok `Noop ))
257+ let run =
258+ let open Irm in
259+ with_conn storage ~f: (fun db ->
260+ let open Irm in
261+ Pgsql_io. tx db ~f: (fun () ->
262+ S.Db. store_account_repository ~request_id db account repo)
263+ >> = fun () ->
264+ let open Abb.Future.Infix_monad in
265+ Builder.State. make
266+ ~log_id: request_id
267+ ~config
268+ ~store
269+ ~db
270+ ~exec
271+ ~tasks: (Tasks_pr. tasks tasks)
272+ ()
273+ >> = fun s ->
274+ let open Irm in
275+ Builder. eval s Keys. update_context_branch_hashes
276+ >> = fun () ->
277+ Pgsql_io. tx db ~f: (fun () ->
278+ let open Abb.Future.Infix_monad in
279+ Builder.State. make
280+ ~log_id: request_id
281+ ~config
282+ ~store
283+ ~db
284+ ~exec
285+ ~tasks: (Tasks_pr. tasks tasks)
286+ ()
287+ >> = fun s ->
288+ let open Irm in
289+ Builder. eval s Keys. get_context_for_pull_request
290+ >> = fun context ->
291+ let s =
292+ s
293+ |> Builder.State. orig_store
294+ |> Keys.Key. add Keys. context context
295+ |> CCFun. flip Builder.State. set_orig_store s
296+ in
297+ Abb.Future. return (Ok s))
298+ >> = fun s ->
299+ Pgsql_io. tx db ~f: (fun () ->
300+ Logs. info (fun m ->
301+ m
302+ " %s : target=%s"
303+ (Builder. log_id s)
304+ (Hmap.Key. info Keys. eval_pull_request_event));
305+ Builder. eval s Keys. eval_pull_request_event))
306+ >> = fun job ->
307+ let open Abb.Future.Infix_monad in
308+ with_conn storage ~f: (fun db ->
309+ Pgsql_io. tx db ~f: (fun () ->
310+ let open Abb.Future.Infix_monad in
311+ let store = store |> Keys.Key. add Keys. job job in
312+ Builder.State. make
313+ ~log_id: (Builder. mk_log_id ~request_id job.Tjc.Job. id)
314+ ~config
315+ ~store
316+ ~db
317+ ~exec
318+ ~tasks: (Tasks_pr. tasks tasks)
319+ ()
320+ >> = fun s ->
321+ let open Irm in
322+ tx_safe ~request_id @@ Builder. eval s Keys. iter_job
323+ >> = fun r -> Abb.Future. return (Ok (s, r))))
324+ >> = function
325+ | Ok (s , (`Ok _ | `Noop )) -> (
326+ let open Irm in
327+ (* Call maybe_complete_job in a separate transaction *)
328+ with_conn storage ~f: (fun db ->
329+ Pgsql_io. tx db ~f: (fun () ->
330+ let open Abb.Future.Infix_monad in
331+ let store =
332+ store |> Keys.Key. add Keys. job job |> Tasks_base. forward_std_keys s
333+ in
334+ Builder.State. make
335+ ~log_id: (Builder. mk_log_id ~request_id job.Tjc.Job. id)
336+ ~config
337+ ~store
338+ ~db
339+ ~exec
340+ ~tasks: (Tasks_pr. tasks tasks)
341+ ()
342+ >> = fun s' -> tx_safe ~request_id @@ Builder. eval s' Keys. maybe_complete_job))
343+ >> = function
344+ | `Ok () ->
345+ with_conn storage ~f: (fun db ->
346+ Pgsql_io. tx db ~f: (fun () ->
347+ let store = store |> Keys.Key. add Keys. job job in
348+ let open Abb.Future.Infix_monad in
349+ let store = store |> Tasks_base. forward_std_keys s in
350+ Builder.State. make
351+ ~log_id: (Builder. mk_log_id ~request_id job.Tjc.Job. id)
352+ ~config
353+ ~store
354+ ~db
355+ ~exec
356+ ~tasks: (Tasks_pr. tasks tasks)
357+ ()
358+ >> = fun s -> tx_safe ~request_id @@ Builder. eval s Keys. run_next_layer))
359+ | (`Suspend_eval _ | `Noop ) as r -> Abb.Future. return (Ok r))
360+ | Ok (_ , (`Suspend_eval _ as r )) -> Abb.Future. return (Ok r)
361+ | Error err ->
362+ let open Irm in
363+ Logs. info (fun m ->
364+ m
365+ " %s : JOB : FAILED : job_id= %a : %a"
366+ request_id
367+ Uuidm. pp
368+ job.Tjc.Job. id
369+ Builder. pp_err
370+ err);
371+ with_conn storage ~f: (fun db ->
372+ S.Job_context.Job. update_state
373+ ~request_id
374+ db
375+ ~job_id: job.Tjc.Job. id
376+ Tjc.Job.State. Failed )
377+ >> = fun () -> Abb.Future. return (Ok `Noop )
378+ in
379+ log_err ~request_id run)
374380 ~finally: (fun () ->
375381 Fc. ignore
376382 @@ Abb.Future. fork
0 commit comments