Skip to content

Commit 42be0d2

Browse files
authored
Merge pull request #17801 from MinaProtocol/lyh/refactor-work-selector
Redesign work selector
2 parents 4d9ae06 + 31576b0 commit 42be0d2

File tree

9 files changed

+88
-185
lines changed

9 files changed

+88
-185
lines changed

src/lib/mina_lib/mina_lib.ml

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -920,25 +920,7 @@ let add_complete_work ~logger ~fee ~prover
920920
Or_error.try_with update_metrics
921921
|> Result.iter_error ~f:(fun err ->
922922
[%log warn] "Failed to update metrics on adding work"
923-
~metadata:[ ("error", `String (Error.to_string_hum err)) ] ) ;
924-
let cb result =
925-
(* remove it from seen jobs after attempting to adding it to the pool to
926-
avoid this work being reassigned.
927-
* If the diff is accepted then remove it from the seen jobs.
928-
* If not then the work should have already been in the pool with a
929-
lower fee or the statement isn't referenced anymore or any other
930-
error. In any case remove it from the seen jobs so that it can be
931-
picked up if needed *)
932-
Work_selector.remove t.work_selector stmts ;
933-
Result.iter_error result ~f:(fun err ->
934-
(* Possible reasons of failure: receiving pipe's capacity exceeded,
935-
fee that isn't the lowest, failure in verification or application to the pool *)
936-
[%log warn] "Failed to add completed work to the pool"
937-
~metadata:
938-
[ ("work_ids", Transaction_snark_work.Statement.compact_json stmts)
939-
; ("error", `String (Error.to_string_hum err))
940-
] )
941-
in
923+
~metadata:[ ("error", Error_json.error_to_yojson err) ] ) ;
942924
Network_pool.Snark_pool.(
943925
Local_sink.push t.pipes.snark_local_sink
944926
( Add_solved_work
@@ -949,7 +931,15 @@ let add_complete_work ~logger ~fee ~prover
949931
|> One_or_two.map ~f:Ledger_proof.Cached.read_proof_from_disk
950932
; fee = fee_with_prover
951933
} )
952-
, cb ))
934+
, Result.iter_error ~f:(fun err ->
935+
(* Possible reasons of failure: receiving pipe's capacity exceeded,
936+
fee that isn't the lowest, failure in verification or application to the pool *)
937+
[%log warn] "Failed to add completed work to the pool"
938+
~metadata:
939+
[ ( "work_ids"
940+
, Transaction_snark_work.Statement.compact_json stmts )
941+
; ("error", Error_json.error_to_yojson err)
942+
] ) ))
953943
|> Deferred.don't_wait_for
954944

955945
let add_work t (work : Snark_work_lib.Result.Partitioned.Stable.Latest.t) =
@@ -1778,9 +1768,8 @@ let create ~commit_id ?wallets (config : Config.t) =
17781768
| Ok () ->
17791769
()
17801770
| Error err ->
1781-
[%log' warn config.logger]
1782-
"Error when setting ITN logger data: %s"
1783-
(Error.to_string_hum err)
1771+
[%log' warn config.logger] "Error when setting ITN logger data"
1772+
~metadata:[ ("error", Error_json.error_to_yojson err) ]
17841773
else Deferred.unit
17851774
in
17861775
O1trace.thread "mina_lib" (fun () ->
@@ -1866,13 +1855,13 @@ let create ~commit_id ?wallets (config : Config.t) =
18661855
Or_error.iter_error result ~f:(fun error ->
18671856
[%log' warn config.logger]
18681857
"Failed to toggle verifier internal tracing: $error"
1869-
~metadata:[ ("error", `String (Error.to_string_hum error)) ] ) ) ;
1858+
~metadata:[ ("error", Error_json.error_to_yojson error) ] ) ) ;
18701859
Internal_tracing.register_toggle_callback (fun enabled ->
18711860
let%map result = Prover.toggle_internal_tracing prover enabled in
18721861
Or_error.iter_error result ~f:(fun error ->
18731862
[%log' warn config.logger]
18741863
"Failed to toggle prover internal tracing: $error"
1875-
~metadata:[ ("error", `String (Error.to_string_hum error)) ] ) ) ;
1864+
~metadata:[ ("error", Error_json.error_to_yojson error) ] ) ) ;
18761865
let%bind vrf_evaluator =
18771866
Monitor.try_with ~here:[%here]
18781867
~rest:
@@ -2137,7 +2126,6 @@ let create ~commit_id ?wallets (config : Config.t) =
21372126
in
21382127
let work_selector =
21392128
Work_selector.State.init
2140-
~reassignment_wait:config.work_reassignment_wait
21412129
~frontier_broadcast_pipe:frontier_broadcast_pipe_r
21422130
~logger:config.logger
21432131
in

src/lib/work_selector/intf.ml

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ module type State_intf = sig
100100
type transition_frontier
101101

102102
val init :
103-
reassignment_wait:int
104-
-> frontier_broadcast_pipe:
103+
frontier_broadcast_pipe:
105104
transition_frontier option Pipe_lib.Broadcast_pipe.Reader.t
106105
-> logger:Logger.t
107106
-> t
@@ -116,45 +115,30 @@ module type Lib_intf = sig
116115
include
117116
State_intf with type transition_frontier := Inputs.Transition_frontier.t
118117

119-
val remove_old_assignments : t -> logger:Logger.t -> unit
120-
121-
(**Jobs that have not been assigned yet*)
122-
val all_unseen_works :
118+
(** [mark_scheduled t work] Mark [work] as scheduled in [t] *)
119+
val mark_scheduled :
123120
t
124121
-> ( Transaction_witness.t
125122
, Ledger_proof.Cached.t )
126123
Snark_work_lib.Work.Single.Spec.t
127124
One_or_two.t
128-
list
129-
130-
val remove : t -> Transaction_snark.Statement.t One_or_two.t -> unit
125+
-> unit
131126

132-
val set :
133-
t
127+
(** [all_unscheduled_expensive_works ~snark_pool ~fee t] filters out all
128+
works in the list that satisfy the predicate
129+
[does_not_have_better_fee ~snark_pool ~fee], and are not scheduled yet
130+
*)
131+
val all_unscheduled_expensive_works :
132+
snark_pool:Snark_pool.t
133+
-> fee:Fee.t
134+
-> t
134135
-> ( Transaction_witness.t
135136
, Ledger_proof.Cached.t )
136137
Snark_work_lib.Work.Single.Spec.t
137138
One_or_two.t
138-
-> unit
139+
list
139140
end
140141

141-
(** [get_expensive_work ~snark_pool ~fee works] filters out all works in the
142-
list that satisfy the predicate
143-
[does_not_have_better_fee ~snark_pool ~fee] *)
144-
val get_expensive_work :
145-
snark_pool:Snark_pool.t
146-
-> fee:Fee.t
147-
-> ( Transaction_witness.t
148-
, Ledger_proof.Cached.t )
149-
Snark_work_lib.Work.Single.Spec.t
150-
One_or_two.t
151-
list
152-
-> ( Transaction_witness.t
153-
, Ledger_proof.Cached.t )
154-
Snark_work_lib.Work.Single.Spec.t
155-
One_or_two.t
156-
list
157-
158142
(**jobs that are not in the snark pool yet*)
159143
val pending_work_statements :
160144
snark_pool:Snark_pool.t

src/lib/work_selector/random.ml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
open Core_kernel
22

33
module Make (Lib : Intf.Lib_intf) = struct
4-
let work ~snark_pool ~fee ~logger (state : Lib.State.t) =
5-
Lib.State.remove_old_assignments state ~logger ;
6-
let unseen_jobs = Lib.State.all_unseen_works state in
7-
match Lib.get_expensive_work ~snark_pool ~fee unseen_jobs with
4+
let work ~snark_pool ~fee ~logger:_ (state : Lib.State.t) =
5+
match Lib.State.all_unscheduled_expensive_works ~snark_pool ~fee state with
86
| [] ->
97
None
108
| expensive_work ->
119
let i = Random.int (List.length expensive_work) in
1210
let x = List.nth_exn expensive_work i in
13-
Lib.State.set state x ; Some x
11+
Lib.State.mark_scheduled state x ;
12+
Some x
1413
end
1514

1615
let%test_module "test" =

src/lib/work_selector/random_offset.ml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,15 @@ module Make (Lib : Intf.Lib_intf) = struct
4242
List.nth_exn expensive_work !offset
4343
end
4444

45-
let work ~snark_pool ~fee ~logger (state : Lib.State.t) =
46-
Lib.State.remove_old_assignments state ~logger ;
47-
let unseen_jobs = Lib.State.all_unseen_works state in
48-
match Lib.get_expensive_work ~snark_pool ~fee unseen_jobs with
45+
let work ~snark_pool ~fee ~logger:_ (state : Lib.State.t) =
46+
match Lib.State.all_unscheduled_expensive_works ~snark_pool ~fee state with
4947
| [] ->
5048
None
5149
| expensive_work ->
5250
Offset.update ~new_length:(List.length expensive_work) ;
5351
let x = Offset.get_nth expensive_work in
54-
Lib.State.set state x ; Some x
52+
Lib.State.mark_scheduled state x ;
53+
Some x
5554
end
5655

5756
let%test_module "test" =

src/lib/work_selector/sequence.ml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
module Make (Lib : Intf.Lib_intf) = struct
2-
let work ~snark_pool ~fee ~logger (state : Lib.State.t) =
3-
Lib.State.remove_old_assignments state ~logger ;
4-
let unseen_jobs = Lib.State.all_unseen_works state in
5-
match Lib.get_expensive_work ~snark_pool ~fee unseen_jobs with
2+
let work ~snark_pool ~fee ~logger:_ (state : Lib.State.t) =
3+
match Lib.State.all_unscheduled_expensive_works ~snark_pool ~fee state with
64
| [] ->
75
None
86
| x :: _ ->
9-
Lib.State.set state x ; Some x
7+
Lib.State.mark_scheduled state x ;
8+
Some x
109
end
1110

1211
let%test_module "test" =

src/lib/work_selector/test.ml

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ open Pipe_lib
66
module Make_test (Make_selection_method : Intf.Make_selection_method_intf) =
77
struct
88
module T = Inputs.Test_inputs
9-
10-
let reassignment_wait = 2000
11-
129
module Lib = Work_lib.Make (T)
1310
module Selection_method = Make_selection_method (Lib)
1411

@@ -19,11 +16,10 @@ struct
1916

2017
let precomputed_values = Precomputed_values.for_unit_tests
2118

22-
let init_state sl reassignment_wait logger =
19+
let init_state sl logger =
2320
let tf_reader, tf_writer = Broadcast_pipe.create None in
2421
let work_state =
25-
Lib.State.init ~reassignment_wait ~frontier_broadcast_pipe:tf_reader
26-
~logger
22+
Lib.State.init ~frontier_broadcast_pipe:tf_reader ~logger
2723
in
2824
let%map () = Broadcast_pipe.Writer.write tf_writer (Some sl) in
2925
work_state
@@ -37,7 +33,7 @@ struct
3733
Quickcheck.test gen_staged_ledger ~trials:100 ~f:(fun sl ->
3834
Async.Thread_safe.block_on_async_exn (fun () ->
3935
let open Deferred.Let_syntax in
40-
let%bind work_state = init_state sl reassignment_wait logger in
36+
let%bind work_state = init_state sl logger in
4137
let rec go i =
4238
[%test_result: Bool.t]
4339
~message:"Exceeded time expected to exhaust work" ~expect:true
@@ -49,34 +45,6 @@ struct
4945
in
5046
go 0 ) )
5147

52-
let%test_unit "Reassign work after the wait time" =
53-
Backtrace.elide := false ;
54-
let snark_pool = T.Snark_pool.create () in
55-
let fee = Currency.Fee.zero in
56-
let logger = Logger.null () in
57-
let send_work work_state =
58-
let rec go all_work =
59-
let stuff = Selection_method.work ~snark_pool ~fee ~logger work_state in
60-
match stuff with
61-
| None ->
62-
all_work
63-
| Some work ->
64-
go (One_or_two.to_list work @ all_work)
65-
in
66-
go []
67-
in
68-
Quickcheck.test gen_staged_ledger ~trials:10 ~f:(fun sl ->
69-
Async.Thread_safe.block_on_async_exn (fun () ->
70-
let open Deferred.Let_syntax in
71-
let%bind work_state = init_state sl reassignment_wait logger in
72-
let work_sent = send_work work_state in
73-
(*wait for wait_time after which all the work will be reassigned*)
74-
let%map () =
75-
Async.after (Time.Span.of_ms (Float.of_int reassignment_wait))
76-
in
77-
let work_sent_again = send_work work_state in
78-
assert (List.length work_sent = List.length work_sent_again) ) )
79-
8048
let gen_snark_pool (works : ('a, 'b) Lib.Work_spec.t One_or_two.t list) fee =
8149
let open Quickcheck.Generator.Let_syntax in
8250
let cheap_work_fee = Option.value_exn Fee.(sub fee one) in
@@ -123,7 +91,7 @@ struct
12391
~trials:100 ~f:(fun (sl, snark_pool) ->
12492
Async.Thread_safe.block_on_async_exn (fun () ->
12593
let open Deferred.Let_syntax in
126-
let%bind work_state = init_state sl reassignment_wait logger in
94+
let%bind work_state = init_state sl logger in
12795
let rec go i =
12896
[%test_result: Bool.t]
12997
~message:"Exceeded time expected to exhaust work" ~expect:true

0 commit comments

Comments
 (0)