Skip to content

Commit 4d9ae06

Browse files
authored
Merge pull request #17802 from MinaProtocol/lyh/simplify-wire-data-snark-worker
Update SNARK worker & minimize data pased on wire - compatible
2 parents 73b805c + 1a26087 commit 4d9ae06

27 files changed

+489
-485
lines changed

changes/17724.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
## Abstract
2+
Updated SNARK worker protocol. Now more parallelism can be utilized when constructing proofs for a zkApp command. Redeployment of SNARK worker and SNARK coordinator(i.e. any daemon that's attached to a SNARK worker) would be needed for this to take effect.
3+
4+
## Metrics
5+
The metrics related to SNARK workers has been updated as well.
6+
7+
Following metrics are maintained by the worker:
8+
- `snark_worker_sub_zkapp_command_segment_time`
9+
- `snark_worker_sub_zkapp_command_merge_time`
10+
- `snark_worker_merge_time`
11+
- `snark_worker_nonzkapp_transition_time`
12+
13+
addtionally, the coordinator maintains all metrics above, plus an extra metric:
14+
- `snark_worker_zkapp_transition_time`
15+
16+
While having same name, these shared metrics have different meanings across coordinator and worker: Any time tracked by a worker represents the time this specific worker spent on that genre, while any time tracked by a coordinator represents the sum of time any worker attached to that coordinator spent on that genre.
17+
18+
## Foot Note
19+
This is part of the Slot Reduction MIP from Mesa Hardfork.

src/app/cli/src/init/mina_run.ml

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -380,56 +380,41 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
380380
in
381381
let snark_worker_impls =
382382
[ implement Snark_worker.Rpcs_versioned.Get_work.Latest.rpc (fun () () ->
383-
Deferred.return
384-
(let open Option.Let_syntax in
385-
let%bind key =
386-
Option.merge
387-
(Mina_lib.snark_worker_key mina)
388-
(Mina_lib.snark_coordinator_key mina)
389-
~f:Fn.const
390-
in
391-
let%map work = Mina_lib.request_work mina in
392-
let work =
393-
Snark_work_lib.Work.Spec.map work
394-
~f:
395-
(Snark_work_lib.Work.Single.Spec.map
396-
~f_proof:Ledger_proof.Cached.read_proof_from_disk
397-
~f_witness:Transaction_witness.read_all_proofs_from_disk )
398-
in
399-
[%log trace]
400-
~metadata:
401-
[ ( "work_spec"
402-
, Snark_work_lib.Selector.Spec.Stable.Latest.to_yojson work )
403-
]
404-
"responding to a Get_work request with some new work" ;
405-
Mina_metrics.(Counter.inc_one Snark_work.snark_work_assigned_rpc) ;
406-
(work, key)) )
407-
; implement Snark_worker.Rpcs_versioned.Submit_work.Latest.rpc
408-
(fun () (work : Snark_work_lib.Selector.Result.Stable.Latest.t) ->
409-
[%log trace] "received completed work from a snark worker"
410-
~metadata:
411-
[ ( "work_spec"
412-
, Snark_work_lib.Selector.Spec.Stable.Latest.to_yojson work.spec
413-
)
414-
] ;
383+
match Mina_lib.request_work mina with
384+
| None ->
385+
Deferred.return None
386+
| Some (Ok spec) ->
387+
[%log debug] "responding to a Get_work request with some new work"
388+
~metadata:
389+
[ ( "work_id"
390+
, Snark_work_lib.(
391+
Spec.Partitioned.Poly.get_id spec |> Id.Any.to_yojson)
392+
)
393+
] ;
415394

395+
Mina_metrics.(Counter.inc_one Snark_work.snark_work_assigned_rpc) ;
396+
Deferred.return (Some spec)
397+
| Some (Error (`Failed_to_generate_inputs (zkapp_cmd, e))) ->
398+
let open Mina_base.Zkapp_command in
399+
[%log error]
400+
"Mina_lib.request_work failed to generate inputs for a zkapp \
401+
command"
402+
~metadata:
403+
[ ("error", `String (Error.to_string_hum e))
404+
; ( "zkapp_cmd"
405+
, Stable.Latest.to_yojson
406+
@@ read_all_proofs_from_disk zkapp_cmd )
407+
] ;
408+
Deferred.return None )
409+
; implement Snark_worker.Rpcs_versioned.Submit_work.Latest.rpc
410+
(fun () (result : Snark_work_lib.Result.Partitioned.Stable.Latest.t) ->
411+
[%log debug] "received completed work from a snark worker"
412+
~metadata:[ ("work_id", Snark_work_lib.Id.Any.to_yojson result.id) ] ;
416413
Mina_metrics.(
417414
Counter.inc_one Snark_work.completed_snark_work_received_rpc) ;
418-
One_or_two.zip_exn work.spec.instances work.metrics
419-
|> One_or_two.iter ~f:(fun (single_spec, (elapsed, _tag)) ->
420-
Snark_work_lib.Metrics.(
421-
emit_single_metrics ~logger ~single_spec
422-
~data:{ data = elapsed; proof = () }
423-
~on_zkapp_command:emit_zkapp_metrics_legacy ()) ) ;
424-
Deferred.return @@ Mina_lib.add_work mina work )
415+
Deferred.return @@ Mina_lib.add_work mina result )
425416
; implement Snark_worker.Rpcs_versioned.Failed_to_generate_snark.Latest.rpc
426-
(fun
427-
()
428-
((error, _work_spec, _prover_public_key) :
429-
Error.t
430-
* Snark_work_lib.Selector.Spec.Stable.Latest.t
431-
* Signature_lib.Public_key.Compressed.t )
432-
->
417+
(fun () (error, _) ->
433418
[%str_log error]
434419
(Snark_worker.Events.Generating_snark_work_failed
435420
{ error = Error_json.error_to_yojson error } ) ;

src/lib/mina_graphql/mina_graphql.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2351,7 +2351,7 @@ module Queries = struct
23512351
~args:Arg.[]
23522352
~typ:(non_null @@ list @@ non_null Types.pending_work)
23532353
~resolve:(fun { ctx = mina; _ } () ->
2354-
let snark_job_state = Mina_lib.snark_job_state mina in
2354+
let snark_job_state = Mina_lib.work_selector mina in
23552355
let snark_pool = Mina_lib.snark_pool mina in
23562356
let fee_opt =
23572357
Mina_lib.(
@@ -2381,7 +2381,7 @@ module Queries = struct
23812381
]
23822382
~typ:(non_null @@ list @@ non_null Types.pending_work_spec)
23832383
~resolve:(fun { ctx = mina; _ } () start_idx end_idx ->
2384-
let snark_job_state = Mina_lib.snark_job_state mina in
2384+
let snark_job_state = Mina_lib.work_selector mina in
23852385
let snark_pool = Mina_lib.snark_pool mina in
23862386
let all_work = Work_selector.all_work ~snark_pool snark_job_state in
23872387
let work_size = all_work |> List.length |> Unsigned.UInt32.of_int in

src/lib/mina_lib/mina_lib.ml

Lines changed: 90 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ type t =
107107
; pipes : pipes
108108
; wallets : Secrets.Wallets.t
109109
; coinbase_receiver : Consensus.Coinbase_receiver.t ref
110-
; snark_job_state : Work_selector.State.t
110+
; work_selector : Work_selector.State.t
111+
; work_partitioner : Work_partitioner.t
111112
; mutable next_producer_timing :
112113
Daemon_rpcs.Types.Status.Next_producer_timing.t option
113114
; subscriptions : Mina_subscriptions.t
@@ -796,7 +797,7 @@ let get_inferred_nonce_from_transaction_pool_and_ledger t
796797
let%map account = get_account t account_id in
797798
account.Account.nonce
798799

799-
let snark_job_state t = t.snark_job_state
800+
let work_selector t = t.work_selector
800801

801802
let add_block_subscriber t public_key =
802803
Mina_subscriptions.add_block_subscriber t.subscriptions public_key
@@ -869,46 +870,104 @@ let best_chain ?max_length t =
869870

870871
let request_work t =
871872
let (module Work_selection_method) = t.config.work_selection_method in
873+
let%bind.Option prover =
874+
Option.first_some (snark_worker_key t) (snark_coordinator_key t)
875+
in
872876
let fee = snark_work_fee t in
873-
let instances_opt =
874-
Work_selection_method.work ~logger:t.config.logger ~fee
875-
~snark_pool:(snark_pool t) (snark_job_state t)
877+
let sok_message = Sok_message.create ~fee ~prover in
878+
[%log' debug t.config.logger] "Received work request"
879+
~metadata:[ ("sok_message", Sok_message.to_yojson sok_message) ] ;
880+
let work_from_selector =
881+
lazy
882+
(Work_selection_method.work ~snark_pool:(snark_pool t) ~fee
883+
~logger:t.config.logger t.work_selector )
876884
in
877-
Option.map instances_opt ~f:(fun instances ->
878-
{ Snark_work_lib.Work.Spec.instances; fee } )
885+
Work_partitioner.request_partitioned_work ~work_from_selector ~sok_message
886+
~partitioner:t.work_partitioner
879887

880888
let work_selection_method t = t.config.work_selection_method
881889

882-
let add_work t (work : Snark_work_lib.Selector.Result.Stable.Latest.t) =
890+
let add_complete_work ~logger ~fee ~prover
891+
~(results :
892+
( Snark_work_lib.Spec.Single.t
893+
, Ledger_proof.Cached.t )
894+
Snark_work_lib.Result.Single.Poly.t
895+
One_or_two.t ) t =
883896
let update_metrics () =
884897
let snark_pool = snark_pool t in
885898
let fee_opt =
886899
Option.map (snark_worker_key t) ~f:(fun _ -> snark_work_fee t)
887900
in
888901
let pending_work =
889-
Work_selector.pending_work_statements ~snark_pool ~fee_opt
890-
t.snark_job_state
902+
Work_selector.pending_work_statements ~snark_pool ~fee_opt t.work_selector
891903
|> List.length
892904
in
893905
Mina_metrics.(
894906
Gauge.set Snark_work.pending_snark_work (Int.to_float pending_work))
895907
in
896-
let spec =
897-
One_or_two.map work.spec.instances
898-
~f:Snark_work_lib.Work.Single.Spec.statement
908+
let proofs = One_or_two.map ~f:(fun { proof; _ } -> proof) results in
909+
let fee_with_prover = Fee_with_prover.{ fee; prover } in
910+
let stmts =
911+
One_or_two.map
912+
~f:(fun { spec; _ } -> Snark_work_lib.Spec.Single.Poly.statement spec)
913+
results
899914
in
900-
let cb _ =
901-
(* remove it from seen jobs after attempting to adding it to the pool to avoid this work being reassigned
915+
[%log debug] "Partitioner combined work"
916+
~metadata:
917+
[ ("work_ids", Transaction_snark_work.Statement.compact_json stmts)
918+
; ("fee_with_prover", Fee_with_prover.to_yojson fee_with_prover)
919+
] ;
920+
Or_error.try_with update_metrics
921+
|> Result.iter_error ~f:(fun err ->
922+
[%log warn] "Failed to update metrics on adding work"
923+
~metadata:[ ("error", `String (Error.to_string_hum err)) ] ) ;
924+
let cb result =
925+
(* remove it from seen jobs after attempting to adding it to the pool to
926+
avoid this work being reassigned.
902927
* If the diff is accepted then remove it from the seen jobs.
903-
* If not then the work should have already been in the pool with a lower fee or the statement isn't referenced anymore or any other error. In any case remove it from the seen jobs so that it can be picked up if needed *)
904-
Work_selector.remove t.snark_job_state spec
928+
* If not then the work should have already been in the pool with a
929+
lower fee or the statement isn't referenced anymore or any other
930+
error. In any case remove it from the seen jobs so that it can be
931+
picked up if needed *)
932+
Work_selector.remove t.work_selector stmts ;
933+
Result.iter_error result ~f:(fun err ->
934+
(* Possible reasons of failure: receiving pipe's capacity exceeded,
935+
fee that isn't the lowest, failure in verification or application to the pool *)
936+
[%log warn] "Failed to add completed work to the pool"
937+
~metadata:
938+
[ ("work_ids", Transaction_snark_work.Statement.compact_json stmts)
939+
; ("error", `String (Error.to_string_hum err))
940+
] )
905941
in
906-
ignore (Or_error.try_with (fun () -> update_metrics ()) : unit Or_error.t) ;
907942
Network_pool.Snark_pool.(
908943
Local_sink.push t.pipes.snark_local_sink
909-
(Resource_pool.Diff.of_result work, cb))
944+
( Add_solved_work
945+
( stmts
946+
, Network_pool.Priced_proof.
947+
{ proof =
948+
proofs
949+
|> One_or_two.map ~f:Ledger_proof.Cached.read_proof_from_disk
950+
; fee = fee_with_prover
951+
} )
952+
, cb ))
910953
|> Deferred.don't_wait_for
911954

955+
let add_work t (work : Snark_work_lib.Result.Partitioned.Stable.Latest.t) =
956+
let logger = t.config.logger in
957+
match
958+
Work_partitioner.submit_partitioned_work ~result:work
959+
~partitioner:t.work_partitioner
960+
with
961+
| SpecUnmatched ->
962+
`SpecUnmatched
963+
| Removed ->
964+
`Removed
965+
| Processed None ->
966+
`Ok
967+
| Processed (Some { results; fee; prover }) ->
968+
add_complete_work ~logger ~fee ~prover ~results t ;
969+
`Ok
970+
912971
let add_work_graphql t diff =
913972
let results_ivar = Ivar.create () in
914973
Network_pool.Snark_pool.Local_sink.push t.pipes.snark_local_sink
@@ -2076,12 +2135,20 @@ let create ~commit_id ?wallets (config : Config.t) =
20762135
; constraint_constants
20772136
}
20782137
in
2079-
let snark_jobs_state =
2138+
let work_selector =
20802139
Work_selector.State.init
20812140
~reassignment_wait:config.work_reassignment_wait
20822141
~frontier_broadcast_pipe:frontier_broadcast_pipe_r
20832142
~logger:config.logger
20842143
in
2144+
(* NOTE: [reassignment_wait] is interpreted as milliseconds *)
2145+
let work_partitioner =
2146+
Work_partitioner.create
2147+
~signature_kind:Mina_signature_kind.t_DEPRECATED
2148+
~reassignment_timeout:
2149+
(Time.Span.of_ms (Float.of_int config.work_reassignment_wait))
2150+
~logger:config.logger ~proof_cache_db
2151+
in
20852152
let sinks = (block_sink, tx_remote_sink, snark_remote_sink) in
20862153
let%bind net =
20872154
O1trace.thread "mina_networking" (fun () ->
@@ -2091,7 +2158,7 @@ let create ~commit_id ?wallets (config : Config.t) =
20912158
~get_transition_frontier:(fun () ->
20922159
Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r )
20932160
~get_snark_pool:(fun () -> Some snark_pool)
2094-
~snark_job_state:(fun () -> Some snark_jobs_state)
2161+
~snark_job_state:(fun () -> Some work_selector)
20952162
~get_node_status )
20962163
in
20972164
let user_command_input_reader, user_command_input_writer =
@@ -2515,7 +2582,8 @@ let create ~commit_id ?wallets (config : Config.t) =
25152582
}
25162583
; wallets
25172584
; coinbase_receiver = ref config.coinbase_receiver
2518-
; snark_job_state = snark_jobs_state
2585+
; work_selector
2586+
; work_partitioner
25192587
; subscriptions
25202588
; sync_status
25212589
; precomputed_block_writer

src/lib/mina_lib/mina_lib.mli

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ val current_epoch_delegators :
9595
val last_epoch_delegators :
9696
t -> pk:Public_key.Compressed.t -> Mina_base.Account.t list option
9797

98+
(** [replace_snark_worker_key t key_opt] Replace all SNARK worker's key
99+
associated with current coordinator.
100+
- If the new key is [None], SNARK worker will be turned off if it's running;
101+
- If the new key is [Some k], SNARK worker will be turn on if it's not running. *)
98102
val replace_snark_worker_key :
99103
t -> Public_key.Compressed.t option -> unit Deferred.t
100104

@@ -113,11 +117,19 @@ val snark_work_fee : t -> Currency.Fee.t
113117

114118
val set_snark_work_fee : t -> Currency.Fee.t -> unit
115119

116-
val request_work : t -> Work_selector.work Snark_work_lib.Work.Spec.t option
120+
val request_work :
121+
t
122+
-> ( Snark_work_lib.Spec.Partitioned.Stable.Latest.t
123+
, Work_partitioner.Snark_worker_shared.Failed_to_generate_inputs.t )
124+
Result.t
125+
option
117126

118127
val work_selection_method : t -> (module Work_selector.Selection_method_intf)
119128

120-
val add_work : t -> Snark_work_lib.Selector.Result.Stable.Latest.t -> unit
129+
val add_work :
130+
t
131+
-> Snark_work_lib.Result.Partitioned.Stable.Latest.t
132+
-> [> `Ok | `Removed | `SpecUnmatched ]
121133

122134
val add_work_graphql :
123135
t
@@ -127,7 +139,7 @@ val add_work_graphql :
127139
* Network_pool.Snark_pool.Resource_pool.Diff.rejected )
128140
Deferred.Or_error.t
129141

130-
val snark_job_state : t -> Work_selector.State.t
142+
val work_selector : t -> Work_selector.State.t
131143

132144
val get_current_nonce :
133145
t
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
type t =
2-
{ spec_with_proof : (Single_spec.t * Ledger_proof.t) One_or_two.t
2+
{ results :
3+
(Single_spec.t, Ledger_proof.Cached.t) Single_result.Poly.t One_or_two.t
34
; fee : Currency.Fee.t
45
; prover : Signature_lib.Public_key.Compressed.t
56
}

src/lib/snark_work_lib/id.ml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,17 @@ module Sub_zkapp = struct
5353
let to_single ({ which_one; pairing_id; _ } : t) : Single.t =
5454
{ which_one; pairing_id }
5555
end
56+
57+
module Any = struct
58+
[%%versioned
59+
module Stable = struct
60+
module V1 = struct
61+
type t =
62+
| Single of Single.Stable.V1.t
63+
| Sub_zkapp of Sub_zkapp.Stable.V1.t
64+
[@@deriving compare, hash, sexp, yojson, equal]
65+
66+
let to_latest = Fn.id
67+
end
68+
end]
69+
end

src/lib/snark_work_lib/id.mli

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,17 @@ module Sub_zkapp : sig
5151

5252
val to_single : t -> Single.t
5353
end
54+
55+
module Any : sig
56+
[%%versioned:
57+
module Stable : sig
58+
module V1 : sig
59+
type t =
60+
| Single of Single.Stable.V1.t
61+
| Sub_zkapp of Sub_zkapp.Stable.V1.t
62+
[@@deriving compare, hash, sexp, yojson, equal]
63+
64+
val to_latest : t -> t
65+
end
66+
end]
67+
end

0 commit comments

Comments
 (0)