Skip to content

Commit 33d61ca

Browse files
committed
Revert "Refactor block production control flow"
This reverts commit 599ebf0.
1 parent 548735c commit 33d61ca

File tree

1 file changed

+141
-60
lines changed

1 file changed

+141
-60
lines changed

src/lib/block_producer/block_producer.ml

Lines changed: 141 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,47 @@ end
2727
type Structured_log_events.t += Block_produced
2828
[@@deriving register_event { msg = "Successfully produced a new block" }]
2929

30+
module Singleton_supervisor : sig
31+
type ('data, 'a) t
32+
33+
val create :
34+
task:(unit Ivar.t -> 'data -> ('a, unit) Interruptible.t) -> ('data, 'a) t
35+
36+
val cancel : (_, _) t -> unit
37+
38+
val dispatch : ('data, 'a) t -> 'data -> ('a, unit) Interruptible.t
39+
end = struct
40+
type ('data, 'a) t =
41+
{ mutable task : (unit Ivar.t * ('a, unit) Interruptible.t) option
42+
; f : unit Ivar.t -> 'data -> ('a, unit) Interruptible.t
43+
}
44+
45+
let create ~task = { task = None; f = task }
46+
47+
let cancel t =
48+
match t.task with
49+
| Some (ivar, _) ->
50+
if Ivar.is_full ivar then
51+
[%log' error (Logger.create ())] "Ivar.fill bug is here!" ;
52+
Ivar.fill ivar () ;
53+
t.task <- None
54+
| None ->
55+
()
56+
57+
let dispatch t data =
58+
cancel t ;
59+
let ivar = Ivar.create () in
60+
let interruptible =
61+
let open Interruptible.Let_syntax in
62+
t.f ivar data
63+
>>| fun x ->
64+
t.task <- None ;
65+
x
66+
in
67+
t.task <- Some (ivar, interruptible) ;
68+
interruptible
69+
end
70+
3071
let time_to_ms = Fn.compose Block_time.Span.to_ms Block_time.to_span_since_epoch
3172

3273
let time_of_ms = Fn.compose Block_time.of_span_since_epoch Block_time.Span.of_ms
@@ -38,6 +79,54 @@ let lift_sync f =
3879
[%log' error (Logger.create ())] "Ivar.fill bug is here!" ;
3980
Ivar.fill ivar (f ()) ) )
4081

82+
module Singleton_scheduler : sig
83+
type t
84+
85+
val create : Block_time.Controller.t -> t
86+
87+
(** If you reschedule when already scheduled, take the min of the two schedulings *)
88+
val schedule : t -> Block_time.t -> f:(unit -> unit) -> unit
89+
end = struct
90+
type t =
91+
{ mutable timeout : unit Block_time.Timeout.t option
92+
; time_controller : Block_time.Controller.t
93+
}
94+
95+
let create time_controller = { time_controller; timeout = None }
96+
97+
let cancel t =
98+
match t.timeout with
99+
| Some timeout ->
100+
Block_time.Timeout.cancel t.time_controller timeout () ;
101+
t.timeout <- None
102+
| None ->
103+
()
104+
105+
let schedule t time ~f =
106+
let remaining_time =
107+
Option.map t.timeout ~f:Block_time.Timeout.remaining_time
108+
in
109+
cancel t ;
110+
let span_till_time =
111+
Block_time.diff time (Block_time.now t.time_controller)
112+
in
113+
let wait_span =
114+
match remaining_time with
115+
| Some remaining
116+
when Block_time.Span.(remaining > Block_time.Span.of_ms Int64.zero) ->
117+
let min a b = if Block_time.Span.(a < b) then a else b in
118+
min remaining span_till_time
119+
| None | Some _ ->
120+
span_till_time
121+
in
122+
let timeout =
123+
Block_time.Timeout.create t.time_controller wait_span ~f:(fun _ ->
124+
t.timeout <- None ;
125+
f () )
126+
in
127+
t.timeout <- Some timeout
128+
end
129+
41130
(** Sends an error to the reporting service containing as many failed transactions as we can fit. *)
42131
let report_transaction_inclusion_failures ~commit_id ~logger failed_txns =
43132
let num_failures = List.length failed_txns in
@@ -595,7 +684,7 @@ let produce ~genesis_breadcrumb ~context:(module Context : CONTEXT) ~prover
595684
~verifier ~trust_system ~get_completed_work ~transaction_resource_pool
596685
~frontier_reader ~time_controller ~transition_writer ~log_block_creation
597686
~block_reward_threshold ~block_produced_bvar ~slot_tx_end ~slot_chain_end
598-
~net ~zkapp_cmd_limit_hardcap interrupt_ivar
687+
~net ~zkapp_cmd_limit_hardcap ivar
599688
(scheduled_time, block_data, winner_pubkey) =
600689
let open Context in
601690
let module Breadcrumb = Transition_frontier.Breadcrumb in
@@ -683,9 +772,7 @@ let produce ~genesis_breadcrumb ~context:(module Context : CONTEXT) ~prover
683772
|> Sequence.map
684773
~f:Transaction_hash.User_command_with_valid_signature.data
685774
in
686-
let%bind () =
687-
Interruptible.lift (Deferred.return ()) (Ivar.read interrupt_ivar)
688-
in
775+
let%bind () = Interruptible.lift (Deferred.return ()) (Ivar.read ivar) in
689776
[%log internal] "Generate_next_state" ;
690777
let%bind next_state_opt =
691778
generate_next_state ~commit_id ~constraint_constants ~scheduled_time
@@ -1120,19 +1207,14 @@ let iteration ~schedule_next_vrf_check ~produce_block_now
11201207
~frontier_reader () ) ;
11211208
schedule_block_production (scheduled_time, data, winner_pk) )
11221209

1123-
let schedule ~time_controller time =
1124-
let span_till_time = Block_time.diff time (Block_time.now time_controller) in
1125-
Block_time.Timeout.create time_controller span_till_time ~f:Fn.id
1126-
|> Block_time.Timeout.to_deferred
1127-
11281210
let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
11291211
~trust_system ~get_completed_work ~transaction_resource_pool
11301212
~time_controller ~consensus_local_state ~coinbase_receiver ~frontier_reader
11311213
~transition_writer ~set_next_producer_timing ~log_block_creation
11321214
~block_reward_threshold ~block_produced_bvar ~vrf_evaluation_state ~net
11331215
~zkapp_cmd_limit_hardcap =
11341216
let open Context in
1135-
O1trace.sync_thread "produce_blocks_run" (fun () ->
1217+
O1trace.sync_thread "produce_blocks" (fun () ->
11361218
let genesis_breadcrumb =
11371219
genesis_breadcrumb_creator ~context:(module Context) prover
11381220
in
@@ -1152,17 +1234,19 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
11521234
~zkapp_cmd_limit_hardcap
11531235
in
11541236
let module Breadcrumb = Transition_frontier.Breadcrumb in
1155-
let iteration_wrapped (slot, i, prev_step) =
1237+
let production_supervisor = Singleton_supervisor.create ~task:produce in
1238+
let scheduler = Singleton_scheduler.create time_controller in
1239+
let rec check_next_block_timing slot i () =
11561240
(* Begin checking for the ability to produce a block *)
11571241
match Broadcast_pipe.Reader.peek frontier_reader with
11581242
| None ->
11591243
log_bootstrap_mode ~logger () ;
1160-
let%map () =
1161-
(* Iterates until there is some frontier *)
1162-
Broadcast_pipe.Reader.iter_until frontier_reader
1163-
~f:(Fn.compose Deferred.return Option.is_some)
1164-
in
1165-
(slot, i, prev_step)
1244+
don't_wait_for
1245+
(let%map () =
1246+
Broadcast_pipe.Reader.iter_until frontier_reader
1247+
~f:(Fn.compose Deferred.return Option.is_some)
1248+
in
1249+
check_next_block_timing slot i () )
11661250
| Some transition_frontier ->
11671251
let consensus_state =
11681252
Transition_frontier.best_tip transition_frontier
@@ -1210,54 +1294,48 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
12101294
"Block producer will begin producing only empty blocks after \
12111295
$slot_diff slots"
12121296
slot_tx_end ;
1297+
let next_vrf_check_now =
1298+
check_next_block_timing new_global_slot i'
1299+
in
12131300
(* TODO: Re-enable this assertion when it doesn't fail dev demos
12141301
* (see #5354)
12151302
* assert (
12161303
Consensus.Hooks.required_local_state_sync
12171304
~constants:consensus_constants ~consensus_state
12181305
~local_state:consensus_local_state
12191306
= None ) ; *)
1220-
let next_vrf_check_now () =
1221-
return (new_global_slot, i', prev_step)
1222-
in
1223-
let produce_block_now data =
1224-
Option.iter !prev_step ~f:(fun ivar ->
1225-
if Ivar.is_full ivar then
1226-
[%log error] "Ivar.fill bug is here!" ;
1227-
Ivar.fill ivar () ) ;
1228-
let intr_ivar = Ivar.create () in
1229-
let this_step = ref (Some intr_ivar) in
1230-
let produce_intr =
1231-
let%map.Interruptible x = produce intr_ivar data in
1232-
this_step := None ;
1233-
x
1234-
in
1235-
let%map _ = Interruptible.force produce_intr in
1236-
(* TODO consider uncommenting below or removing interruptible usage completely *)
1237-
(* Interruptible.don't_wait_for produce_intr ; *)
1238-
(new_global_slot, i', this_step)
1239-
in
1240-
let schedule_next_vrf_check time =
1241-
let%map _ = schedule ~time_controller time in
1242-
(new_global_slot, i', prev_step)
1243-
in
1244-
let schedule_block_production (time, data, winner) =
1245-
let%bind _ = schedule ~time_controller time in
1246-
produce_block_now (time, data, winner)
1307+
let produce_block_now triple =
1308+
ignore
1309+
( Interruptible.finally
1310+
(Singleton_supervisor.dispatch production_supervisor triple)
1311+
~f:next_vrf_check_now
1312+
: (_, _) Interruptible.t )
12471313
in
1248-
iteration ~schedule_next_vrf_check ~produce_block_now
1249-
~schedule_block_production ~next_vrf_check_now ~genesis_breadcrumb
1250-
~context:(module Context)
1251-
~vrf_evaluator ~time_controller ~coinbase_receiver
1252-
~frontier_reader ~set_next_producer_timing ~transition_frontier
1253-
~vrf_evaluation_state ~epoch_data_for_vrf ~ledger_snapshot i slot
1314+
don't_wait_for
1315+
( iteration
1316+
~schedule_next_vrf_check:
1317+
(Fn.compose Deferred.return
1318+
(Singleton_scheduler.schedule scheduler
1319+
~f:next_vrf_check_now ) )
1320+
~produce_block_now:
1321+
(Fn.compose Deferred.return produce_block_now)
1322+
~schedule_block_production:(fun (time, data, winner) ->
1323+
Singleton_scheduler.schedule scheduler time ~f:(fun () ->
1324+
produce_block_now (time, data, winner) ) ;
1325+
Deferred.unit )
1326+
~next_vrf_check_now:
1327+
(Fn.compose Deferred.return next_vrf_check_now)
1328+
~genesis_breadcrumb
1329+
~context:(module Context)
1330+
~vrf_evaluator ~time_controller ~coinbase_receiver
1331+
~frontier_reader ~set_next_producer_timing
1332+
~transition_frontier ~vrf_evaluation_state ~epoch_data_for_vrf
1333+
~ledger_snapshot i slot
1334+
: unit Deferred.t )
12541335
in
1255-
let start _ =
1256-
Deferred.forever
1257-
( Mina_numbers.Global_slot_since_hard_fork.zero
1258-
, Mina_numbers.Length.zero
1259-
, ref @@ Some (Ivar.create ()) )
1260-
iteration_wrapped
1336+
let start () =
1337+
check_next_block_timing Mina_numbers.Global_slot_since_hard_fork.zero
1338+
Mina_numbers.Length.zero ()
12611339
in
12621340
let genesis_state_timestamp =
12631341
consensus_constants.genesis_state_timestamp
@@ -1266,17 +1344,20 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier
12661344
let now = Block_time.now time_controller in
12671345
if Block_time.( >= ) now genesis_state_timestamp then start ()
12681346
else
1347+
let time_till_genesis = Block_time.diff genesis_state_timestamp now in
12691348
[%log warn]
12701349
~metadata:
12711350
[ ( "time_till_genesis"
12721351
, `Int
1273-
(Int64.to_int_exn
1274-
( Block_time.Span.to_ms
1275-
@@ Block_time.diff genesis_state_timestamp now ) ) )
1352+
(Int64.to_int_exn (Block_time.Span.to_ms time_till_genesis))
1353+
)
12761354
]
12771355
"Node started before genesis: waiting $time_till_genesis \
12781356
milliseconds before starting block producer" ;
1279-
upon (schedule ~time_controller genesis_state_timestamp) start )
1357+
ignore
1358+
( Block_time.Timeout.create time_controller time_till_genesis
1359+
~f:(fun _ -> start ())
1360+
: unit Block_time.Timeout.t ) )
12801361

12811362
let run_precomputed ~context:(module Context : CONTEXT) ~verifier ~trust_system
12821363
~time_controller ~frontier_reader ~transition_writer ~precomputed_blocks =

0 commit comments

Comments
 (0)