Skip to content

Commit 599ebf0

Browse files
georgeeecjjdespres
authored andcommitted
Refactor block production control flow
1 parent 76b8999 commit 599ebf0

File tree

1 file changed

+60
-141
lines changed

1 file changed

+60
-141
lines changed

src/lib/block_producer/block_producer.ml

Lines changed: 60 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -27,47 +27,6 @@ end
2727
type Structured_log_events.t += Block_produced
2828
[@@deriving register_event { msg = "Successfully produced a new block" }]
2929

30-
module Singleton_supervisor : sig
31-
type ('data, 'a) t
32-
33-
val create :
34-
task:(unit Ivar.t -> 'data -> ('a, unit) Interruptible.t) -> ('data, 'a) t
35-
36-
val cancel : (_, _) t -> unit
37-
38-
val dispatch : ('data, 'a) t -> 'data -> ('a, unit) Interruptible.t
39-
end = struct
40-
type ('data, 'a) t =
41-
{ mutable task : (unit Ivar.t * ('a, unit) Interruptible.t) option
42-
; f : unit Ivar.t -> 'data -> ('a, unit) Interruptible.t
43-
}
44-
45-
let create ~task = { task = None; f = task }
46-
47-
let cancel t =
48-
match t.task with
49-
| Some (ivar, _) ->
50-
if Ivar.is_full ivar then
51-
[%log' error (Logger.create ())] "Ivar.fill bug is here!" ;
52-
Ivar.fill ivar () ;
53-
t.task <- None
54-
| None ->
55-
()
56-
57-
let dispatch t data =
58-
cancel t ;
59-
let ivar = Ivar.create () in
60-
let interruptible =
61-
let open Interruptible.Let_syntax in
62-
t.f ivar data
63-
>>| fun x ->
64-
t.task <- None ;
65-
x
66-
in
67-
t.task <- Some (ivar, interruptible) ;
68-
interruptible
69-
end
70-
7130
let time_to_ms = Fn.compose Block_time.Span.to_ms Block_time.to_span_since_epoch
7231

7332
let time_of_ms = Fn.compose Block_time.of_span_since_epoch Block_time.Span.of_ms
@@ -79,54 +38,6 @@ let lift_sync f =
7938
[%log' error (Logger.create ())] "Ivar.fill bug is here!" ;
8039
Ivar.fill ivar (f ()) ) )
8140

82-
module Singleton_scheduler : sig
83-
type t
84-
85-
val create : Block_time.Controller.t -> t
86-
87-
(** If you reschedule when already scheduled, take the min of the two schedulings *)
88-
val schedule : t -> Block_time.t -> f:(unit -> unit) -> unit
89-
end = struct
90-
type t =
91-
{ mutable timeout : unit Block_time.Timeout.t option
92-
; time_controller : Block_time.Controller.t
93-
}
94-
95-
let create time_controller = { time_controller; timeout = None }
96-
97-
let cancel t =
98-
match t.timeout with
99-
| Some timeout ->
100-
Block_time.Timeout.cancel t.time_controller timeout () ;
101-
t.timeout <- None
102-
| None ->
103-
()
104-
105-
let schedule t time ~f =
106-
let remaining_time =
107-
Option.map t.timeout ~f:Block_time.Timeout.remaining_time
108-
in
109-
cancel t ;
110-
let span_till_time =
111-
Block_time.diff time (Block_time.now t.time_controller)
112-
in
113-
let wait_span =
114-
match remaining_time with
115-
| Some remaining
116-
when Block_time.Span.(remaining > Block_time.Span.of_ms Int64.zero) ->
117-
let min a b = if Block_time.Span.(a < b) then a else b in
118-
min remaining span_till_time
119-
| None | Some _ ->
120-
span_till_time
121-
in
122-
let timeout =
123-
Block_time.Timeout.create t.time_controller wait_span ~f:(fun _ ->
124-
t.timeout <- None ;
125-
f () )
126-
in
127-
t.timeout <- Some timeout
128-
end
129-
13041
(** Sends an error to the reporting service containing as many failed transactions as we can fit. *)
13142
let report_transaction_inclusion_failures ~commit_id ~logger failed_txns =
13243
let num_failures = List.length failed_txns in
@@ -684,7 +595,7 @@ let produce ~genesis_breadcrumb ~context:(module Context : CONTEXT) ~prover
684595
~verifier ~trust_system ~get_completed_work ~transaction_resource_pool
685596
~frontier_reader ~time_controller ~transition_writer ~log_block_creation
686597
~block_reward_threshold ~block_produced_bvar ~slot_tx_end ~slot_chain_end
687-
~net ~zkapp_cmd_limit_hardcap ivar
598+
~net ~zkapp_cmd_limit_hardcap interrupt_ivar
688599
(scheduled_time, block_data, winner_pubkey) =
689600
let open Context in
690601
let module Breadcrumb = Transition_frontier.Breadcrumb in
@@ -772,7 +683,9 @@ let produce ~genesis_breadcrumb ~context:(module Context : CONTEXT) ~prover
772683
|> Sequence.map
773684
~f:Transaction_hash.User_command_with_valid_signature.data
774685
in
775-
let%bind () = Interruptible.lift (Deferred.return ()) (Ivar.read ivar) in
686+
let%bind () =
687+
Interruptible.lift (Deferred.return ()) (Ivar.read interrupt_ivar)
688+
in
776689
[%log internal] "Generate_next_state" ;
777690
let%bind next_state_opt =
778691
generate_next_state ~commit_id ~constraint_constants ~scheduled_time
@@ -1207,14 +1120,19 @@ let iteration ~schedule_next_vrf_check ~produce_block_now
12071120
~frontier_reader () ) ;
12081121
schedule_block_production (scheduled_time, data, winner_pk) )
12091122

1123+
let schedule ~time_controller time =
1124+
let span_till_time = Block_time.diff time (Block_time.now time_controller) in
1125+
Block_time.Timeout.create time_controller span_till_time ~f:Fn.id
1126+
|> Block_time.Timeout.to_deferred
1127+
12101128
let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
12111129
~trust_system ~get_completed_work ~transaction_resource_pool
12121130
~time_controller ~consensus_local_state ~coinbase_receiver ~frontier_reader
12131131
~transition_writer ~set_next_producer_timing ~log_block_creation
12141132
~block_reward_threshold ~block_produced_bvar ~vrf_evaluation_state ~net
12151133
~zkapp_cmd_limit_hardcap =
12161134
let open Context in
1217-
O1trace.sync_thread "produce_blocks" (fun () ->
1135+
O1trace.sync_thread "produce_blocks_run" (fun () ->
12181136
let genesis_breadcrumb =
12191137
genesis_breadcrumb_creator ~context:(module Context) prover
12201138
in
@@ -1234,19 +1152,17 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
12341152
~zkapp_cmd_limit_hardcap
12351153
in
12361154
let module Breadcrumb = Transition_frontier.Breadcrumb in
1237-
let production_supervisor = Singleton_supervisor.create ~task:produce in
1238-
let scheduler = Singleton_scheduler.create time_controller in
1239-
let rec check_next_block_timing slot i () =
1155+
let iteration_wrapped (slot, i, prev_step) =
12401156
(* Begin checking for the ability to produce a block *)
12411157
match Broadcast_pipe.Reader.peek frontier_reader with
12421158
| None ->
12431159
log_bootstrap_mode ~logger () ;
1244-
don't_wait_for
1245-
(let%map () =
1246-
Broadcast_pipe.Reader.iter_until frontier_reader
1247-
~f:(Fn.compose Deferred.return Option.is_some)
1248-
in
1249-
check_next_block_timing slot i () )
1160+
let%map () =
1161+
(* Iterates until there is some frontier *)
1162+
Broadcast_pipe.Reader.iter_until frontier_reader
1163+
~f:(Fn.compose Deferred.return Option.is_some)
1164+
in
1165+
(slot, i, prev_step)
12501166
| Some transition_frontier ->
12511167
let consensus_state =
12521168
Transition_frontier.best_tip transition_frontier
@@ -1294,48 +1210,54 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
12941210
"Block producer will begin producing only empty blocks after \
12951211
$slot_diff slots"
12961212
slot_tx_end ;
1297-
let next_vrf_check_now =
1298-
check_next_block_timing new_global_slot i'
1299-
in
13001213
(* TODO: Re-enable this assertion when it doesn't fail dev demos
13011214
* (see #5354)
13021215
* assert (
13031216
Consensus.Hooks.required_local_state_sync
13041217
~constants:consensus_constants ~consensus_state
13051218
~local_state:consensus_local_state
13061219
= None ) ; *)
1307-
let produce_block_now triple =
1308-
ignore
1309-
( Interruptible.finally
1310-
(Singleton_supervisor.dispatch production_supervisor triple)
1311-
~f:next_vrf_check_now
1312-
: (_, _) Interruptible.t )
1220+
let next_vrf_check_now () =
1221+
return (new_global_slot, i', prev_step)
13131222
in
1314-
don't_wait_for
1315-
( iteration
1316-
~schedule_next_vrf_check:
1317-
(Fn.compose Deferred.return
1318-
(Singleton_scheduler.schedule scheduler
1319-
~f:next_vrf_check_now ) )
1320-
~produce_block_now:
1321-
(Fn.compose Deferred.return produce_block_now)
1322-
~schedule_block_production:(fun (time, data, winner) ->
1323-
Singleton_scheduler.schedule scheduler time ~f:(fun () ->
1324-
produce_block_now (time, data, winner) ) ;
1325-
Deferred.unit )
1326-
~next_vrf_check_now:
1327-
(Fn.compose Deferred.return next_vrf_check_now)
1328-
~genesis_breadcrumb
1329-
~context:(module Context)
1330-
~vrf_evaluator ~time_controller ~coinbase_receiver
1331-
~frontier_reader ~set_next_producer_timing
1332-
~transition_frontier ~vrf_evaluation_state ~epoch_data_for_vrf
1333-
~ledger_snapshot i slot
1334-
: unit Deferred.t )
1223+
let produce_block_now data =
1224+
Option.iter !prev_step ~f:(fun ivar ->
1225+
if Ivar.is_full ivar then
1226+
[%log error] "Ivar.fill bug is here!" ;
1227+
Ivar.fill ivar () ) ;
1228+
let intr_ivar = Ivar.create () in
1229+
let this_step = ref (Some intr_ivar) in
1230+
let produce_intr =
1231+
let%map.Interruptible x = produce intr_ivar data in
1232+
this_step := None ;
1233+
x
1234+
in
1235+
let%map _ = Interruptible.force produce_intr in
1236+
(* TODO consider uncommenting below or removing interruptible usage completely *)
1237+
(* Interruptible.don't_wait_for produce_intr ; *)
1238+
(new_global_slot, i', this_step)
1239+
in
1240+
let schedule_next_vrf_check time =
1241+
let%map _ = schedule ~time_controller time in
1242+
(new_global_slot, i', prev_step)
1243+
in
1244+
let schedule_block_production (time, data, winner) =
1245+
let%bind _ = schedule ~time_controller time in
1246+
produce_block_now (time, data, winner)
1247+
in
1248+
iteration ~schedule_next_vrf_check ~produce_block_now
1249+
~schedule_block_production ~next_vrf_check_now ~genesis_breadcrumb
1250+
~context:(module Context)
1251+
~vrf_evaluator ~time_controller ~coinbase_receiver
1252+
~frontier_reader ~set_next_producer_timing ~transition_frontier
1253+
~vrf_evaluation_state ~epoch_data_for_vrf ~ledger_snapshot i slot
13351254
in
1336-
let start () =
1337-
check_next_block_timing Mina_numbers.Global_slot_since_hard_fork.zero
1338-
Mina_numbers.Length.zero ()
1255+
let start _ =
1256+
Deferred.forever
1257+
( Mina_numbers.Global_slot_since_hard_fork.zero
1258+
, Mina_numbers.Length.zero
1259+
, ref @@ Some (Ivar.create ()) )
1260+
iteration_wrapped
13391261
in
13401262
let genesis_state_timestamp =
13411263
consensus_constants.genesis_state_timestamp
@@ -1344,20 +1266,17 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
13441266
let now = Block_time.now time_controller in
13451267
if Block_time.( >= ) now genesis_state_timestamp then start ()
13461268
else
1347-
let time_till_genesis = Block_time.diff genesis_state_timestamp now in
13481269
[%log warn]
13491270
~metadata:
13501271
[ ( "time_till_genesis"
13511272
, `Int
1352-
(Int64.to_int_exn (Block_time.Span.to_ms time_till_genesis))
1353-
)
1273+
(Int64.to_int_exn
1274+
( Block_time.Span.to_ms
1275+
@@ Block_time.diff genesis_state_timestamp now ) ) )
13541276
]
13551277
"Node started before genesis: waiting $time_till_genesis \
13561278
milliseconds before starting block producer" ;
1357-
ignore
1358-
( Block_time.Timeout.create time_controller time_till_genesis
1359-
~f:(fun _ -> start ())
1360-
: unit Block_time.Timeout.t ) )
1279+
upon (schedule ~time_controller genesis_state_timestamp) start )
13611280

13621281
let run_precomputed ~context:(module Context : CONTEXT) ~verifier ~trust_system
13631282
~time_controller ~frontier_reader ~transition_writer ~precomputed_blocks =

0 commit comments

Comments
 (0)