Skip to content

Commit 15eb933

Browse files
authored
Merge pull request #1117 from openmina/feat/snark_worker
Rust based Snark Worker
2 parents 7e69e40 + 1379991 commit 15eb933

25 files changed

+253
-42
lines changed

ledger/src/port_ocaml/hash.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ impl Default for JaneStreetHasher {
2222
}
2323
}
2424

25+
#[allow(clippy::precedence)]
2526
fn rotl32(x: u32, n: u32) -> u32 {
2627
(x) << n | (x) >> (32 - n)
2728
}

macros/src/action_event.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,8 @@ fn filter_fields(field_spec: &FieldsSpec, fields: &FieldsNamed) -> Result<Vec<To
165165
FieldsSpec::Some(f) => f
166166
.iter()
167167
.filter(|(name, _)| {
168-
name.as_ref().map_or(true, |name| {
169-
fields.named.iter().any(|n| Some(name) == n.ident.as_ref())
170-
})
168+
name.as_ref()
169+
.is_none_or(|name| fields.named.iter().any(|n| Some(name) == n.ident.as_ref()))
171170
})
172171
.map(|(_, expr)| Ok(expr.clone()))
173172
.collect(),

node/common/src/service/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ impl NodeServiceCommonBuilder {
146146
),
147147
ledger_manager,
148148
block_producer: self.block_producer,
149+
// initialized in state machine.
150+
snark_worker: None,
149151
archive: self.archive,
150152
p2p,
151153
stats: self.gather_stats.then(Stats::new),

node/common/src/service/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use super::{
2323
p2p::webrtc_with_libp2p::P2pServiceCtx,
2424
replay::ReplayerState,
2525
rpc::{RpcSender, RpcService},
26+
snark_worker::SnarkWorker,
2627
snarks::SnarkBlockVerifyArgs,
2728
EventReceiver, EventSender,
2829
};
@@ -41,6 +42,7 @@ pub struct NodeService {
4142
pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,
4243

4344
pub ledger_manager: LedgerManager,
45+
pub snark_worker: Option<SnarkWorker>,
4446
pub block_producer: Option<BlockProducerService>,
4547
pub archive: Option<ArchiveService>,
4648
pub p2p: P2pServiceCtx,
@@ -116,6 +118,7 @@ impl NodeService {
116118
event_receiver: mpsc::unbounded_channel().1.into(),
117119
snark_block_proof_verify: mpsc::unbounded_channel().0,
118120
ledger_manager: LedgerManager::spawn(Default::default()),
121+
snark_worker: None,
119122
block_producer: None,
120123
archive: None,
121124
p2p: P2pServiceCtx::mocked(p2p_sec_key),
Lines changed: 211 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,242 @@
1+
use ledger::proofs::provers::{TransactionProver, ZkappProver};
2+
use ledger::proofs::zkapp::ZkappParams;
3+
use ledger::scan_state::scan_state::transaction_snark::SokMessage;
14
use mina_p2p_messages::v2;
2-
use node::external_snark_worker::{ExternalSnarkWorkerError, SnarkWorkSpec};
5+
use mina_signer::CompressedPubKey;
6+
use node::core::channels::mpsc;
7+
use node::event_source::ExternalSnarkWorkerEvent;
8+
use node::external_snark_worker::{
9+
ExternalSnarkWorkerError, ExternalSnarkWorkerWorkError, SnarkWorkResult, SnarkWorkSpec,
10+
SnarkWorkSpecError,
11+
};
12+
use node::snark::TransactionVerifier;
313

414
use crate::NodeService;
515

6-
pub struct SnarkWorker {}
16+
use super::EventSender;
17+
18+
pub struct SnarkWorker {
19+
cmd_sender: mpsc::UnboundedSender<Cmd>,
20+
}
21+
22+
enum Cmd {
23+
Submit(Box<SnarkWorkSpec>),
24+
Cancel,
25+
Kill,
26+
}
727

828
impl node::service::ExternalSnarkWorkerService for NodeService {
929
fn start(
1030
&mut self,
11-
_public_key: v2::NonZeroCurvePoint,
12-
_fee: v2::CurrencyFeeStableV1,
31+
pub_key: v2::NonZeroCurvePoint,
32+
fee: v2::CurrencyFeeStableV1,
33+
work_verifier: TransactionVerifier,
1334
) -> Result<(), ExternalSnarkWorkerError> {
1435
if self.replayer.is_some() {
1536
return Ok(());
1637
}
17-
todo!()
38+
let (cmd_sender, cmd_receiver) = mpsc::unbounded_channel();
39+
// TODO(binier): improve pub key conv
40+
let sok_message = SokMessage::create(
41+
(&fee).into(),
42+
CompressedPubKey::from_address(&pub_key.to_string()).unwrap(),
43+
);
44+
self.snark_worker = Some(SnarkWorker { cmd_sender });
45+
let event_sender = self.event_sender().clone();
46+
47+
node::core::thread::Builder::new()
48+
.name("snark_worker".to_owned())
49+
.spawn(move || worker_thread(cmd_receiver, event_sender, sok_message, work_verifier))
50+
.map(|_| ())
51+
.map_err(|err| ExternalSnarkWorkerError::Error(err.to_string()))
1852
}
1953

2054
fn kill(&mut self) -> Result<(), ExternalSnarkWorkerError> {
2155
if self.replayer.is_some() {
2256
return Ok(());
2357
}
24-
todo!()
58+
59+
if self
60+
.snark_worker
61+
.as_ref()
62+
.and_then(|s| s.cmd_sender.send(Cmd::Kill).ok())
63+
.is_none()
64+
{
65+
return Err(ExternalSnarkWorkerError::NotRunning);
66+
}
67+
Ok(())
2568
}
2669

27-
fn submit(&mut self, _spec: SnarkWorkSpec) -> Result<(), ExternalSnarkWorkerError> {
70+
fn submit(&mut self, spec: SnarkWorkSpec) -> Result<(), ExternalSnarkWorkerError> {
2871
if self.replayer.is_some() {
2972
return Ok(());
3073
}
31-
todo!()
74+
75+
if self
76+
.snark_worker
77+
.as_ref()
78+
.and_then(|s| s.cmd_sender.send(Cmd::Submit(spec.into())).ok())
79+
.is_none()
80+
{
81+
return Err(ExternalSnarkWorkerError::NotRunning);
82+
}
83+
Ok(())
3284
}
3385

3486
fn cancel(&mut self) -> Result<(), ExternalSnarkWorkerError> {
3587
if self.replayer.is_some() {
3688
return Ok(());
3789
}
38-
todo!()
90+
91+
// TODO(binier): for wasm threads, call terminate:
92+
// https://developer.mozilla.org/en-US/docs/Web/API/Worker/terminate
93+
if self
94+
.snark_worker
95+
.as_ref()
96+
.and_then(|s| s.cmd_sender.send(Cmd::Cancel).ok())
97+
.is_none()
98+
{
99+
return Err(ExternalSnarkWorkerError::NotRunning);
100+
}
101+
Ok(())
102+
}
103+
}
104+
105+
fn worker_thread(
106+
mut cmd_receiver: mpsc::UnboundedReceiver<Cmd>,
107+
event_sender: EventSender,
108+
sok_message: SokMessage,
109+
work_verifier: TransactionVerifier,
110+
) {
111+
let _ = event_sender.send(ExternalSnarkWorkerEvent::Started.into());
112+
let tx_prover = TransactionProver::make(Some(work_verifier.clone()));
113+
let zkapp_prover = ZkappProver::make(Some(work_verifier));
114+
while let Some(cmd) = cmd_receiver.blocking_recv() {
115+
match cmd {
116+
Cmd::Kill => {
117+
let _ = event_sender.send(ExternalSnarkWorkerEvent::Killed.into());
118+
return;
119+
}
120+
Cmd::Cancel => {
121+
// can't cancel as it's a blocking thread. Once this
122+
// is moved to another process, kill it.
123+
let _ = event_sender.send(ExternalSnarkWorkerEvent::WorkCancelled.into());
124+
}
125+
Cmd::Submit(spec) => {
126+
let event = match prove_spec(&tx_prover, &zkapp_prover, *spec, &sok_message) {
127+
Err(err) => ExternalSnarkWorkerEvent::WorkError(err),
128+
Ok(res) => ExternalSnarkWorkerEvent::WorkResult(res),
129+
};
130+
131+
let _ = event_sender.send(event.into());
132+
}
133+
}
39134
}
40135
}
136+
137+
fn prove_spec(
138+
tx_prover: &TransactionProver,
139+
zkapp_prover: &ZkappProver,
140+
spec: SnarkWorkSpec,
141+
sok_message: &SokMessage,
142+
) -> Result<SnarkWorkResult, ExternalSnarkWorkerWorkError> {
143+
match spec {
144+
SnarkWorkSpec::One(single) => prove_single(tx_prover, zkapp_prover, single, sok_message)
145+
.map(v2::TransactionSnarkWorkTStableV2Proofs::One),
146+
SnarkWorkSpec::Two((one, two)) => Ok(v2::TransactionSnarkWorkTStableV2Proofs::Two((
147+
prove_single(tx_prover, zkapp_prover, one, sok_message)?,
148+
prove_single(tx_prover, zkapp_prover, two, sok_message)?,
149+
))),
150+
}
151+
.map(Into::into)
152+
}
153+
154+
fn invalid_bigint_err() -> ExternalSnarkWorkerWorkError {
155+
ExternalSnarkWorkerWorkError::WorkSpecError(SnarkWorkSpecError::InvalidBigInt)
156+
}
157+
158+
fn prove_single(
159+
tx_prover: &TransactionProver,
160+
zkapp_prover: &ZkappProver,
161+
single: v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single,
162+
sok_message: &SokMessage,
163+
) -> Result<v2::LedgerProofProdStableV2, ExternalSnarkWorkerWorkError> {
164+
use ledger::proofs::{merge::MergeParams, transaction::TransactionParams};
165+
166+
let (snarked_ledger_state, res) = match single {
167+
v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Transition(
168+
snarked_ledger_state,
169+
witness,
170+
) => {
171+
if let v2::MinaTransactionTransactionStableV2::Command(cmd) = &witness.transaction {
172+
if matches!(&**cmd, v2::MinaBaseUserCommandStableV2::ZkappCommand(_)) {
173+
return prove_zkapp(zkapp_prover, snarked_ledger_state, witness, sok_message);
174+
}
175+
}
176+
let res = ledger::proofs::generate_tx_proof(TransactionParams {
177+
statement: &snarked_ledger_state.0,
178+
tx_witness: &witness,
179+
message: sok_message,
180+
tx_step_prover: &tx_prover.tx_step_prover,
181+
tx_wrap_prover: &tx_prover.tx_wrap_prover,
182+
only_verify_constraints: false,
183+
expected_step_proof: None,
184+
ocaml_wrap_witness: None,
185+
});
186+
(snarked_ledger_state.0, res)
187+
}
188+
v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0Single::Merge(data) => {
189+
let (snarked_ledger_state, proof_1, proof_2) = *data;
190+
let res = ledger::proofs::generate_merge_proof(MergeParams {
191+
statement: (&snarked_ledger_state.0)
192+
.try_into()
193+
.map_err(|_| invalid_bigint_err())?,
194+
proofs: &[proof_1, proof_2],
195+
message: sok_message,
196+
step_prover: &tx_prover.merge_step_prover,
197+
wrap_prover: &tx_prover.tx_wrap_prover,
198+
only_verify_constraints: false,
199+
expected_step_proof: None,
200+
ocaml_wrap_witness: None,
201+
});
202+
(snarked_ledger_state.0, res)
203+
}
204+
};
205+
res.map_err(|err| ExternalSnarkWorkerWorkError::Error(err.to_string()))
206+
.map(|proof| {
207+
v2::LedgerProofProdStableV2(v2::TransactionSnarkStableV2 {
208+
statement: v2::MinaStateSnarkedLedgerStateWithSokStableV2 {
209+
source: snarked_ledger_state.source,
210+
target: snarked_ledger_state.target,
211+
connecting_ledger_left: snarked_ledger_state.connecting_ledger_left,
212+
connecting_ledger_right: snarked_ledger_state.connecting_ledger_right,
213+
supply_increase: snarked_ledger_state.supply_increase,
214+
fee_excess: snarked_ledger_state.fee_excess,
215+
sok_digest: (&sok_message.digest()).into(),
216+
},
217+
proof: v2::TransactionSnarkProofStableV2((&proof).into()),
218+
})
219+
})
220+
}
221+
222+
fn prove_zkapp(
223+
zkapp_prover: &ZkappProver,
224+
snarked_ledger_state: v2::MinaStateSnarkedLedgerStateStableV2,
225+
witness: v2::TransactionWitnessStableV2,
226+
sok_message: &SokMessage,
227+
) -> Result<v2::LedgerProofProdStableV2, ExternalSnarkWorkerWorkError> {
228+
ledger::proofs::generate_zkapp_proof(ZkappParams {
229+
statement: &snarked_ledger_state.0,
230+
tx_witness: &witness,
231+
message: sok_message,
232+
step_opt_signed_opt_signed_prover: &zkapp_prover.step_opt_signed_opt_signed_prover,
233+
step_opt_signed_prover: &zkapp_prover.step_opt_signed_prover,
234+
step_proof_prover: &zkapp_prover.step_proof_prover,
235+
merge_step_prover: &zkapp_prover.merge_step_prover,
236+
tx_wrap_prover: &zkapp_prover.tx_wrap_prover,
237+
opt_signed_path: None,
238+
proved_path: None,
239+
})
240+
.map(|proof| (&proof).into())
241+
.map_err(|err| ExternalSnarkWorkerWorkError::Error(err.to_string()))
242+
}

node/src/event_source/event_source_effects.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
9797
MioEvent::IncomingDataDidReceive(addr, result) => {
9898
store.dispatch(P2pNetworkSchedulerAction::IncomingDataDidReceive {
9999
addr,
100-
result: result.map(From::from),
100+
result,
101101
});
102102
}
103103
MioEvent::OutgoingDataDidSend(_, _result) => {}

node/src/external_snark_worker_effectful/external_snark_worker_effectful_effects.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ pub fn external_snark_worker_effectful_effects<S: crate::Service>(
99
let (action, _) = action.split();
1010
match action {
1111
ExternalSnarkWorkerEffectfulAction::Start { public_key, fee } => {
12-
if let Err(err) = store.service.start(public_key, fee) {
12+
let work_verifier = store.state().snark.work_verify.verifier_index.clone();
13+
if let Err(err) = store.service.start(public_key, fee, work_verifier) {
1314
store.dispatch(ExternalSnarkWorkerAction::Error {
1415
error: err,
1516
permanent: true,

node/src/external_snark_worker_effectful/external_snark_worker_service.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use mina_p2p_messages::v2::{CurrencyFeeStableV1, NonZeroCurvePoint};
22
use serde::{Deserialize, Serialize};
3+
use snark::TransactionVerifier;
34

45
use crate::external_snark_worker::{
56
ExternalSnarkWorkerError, ExternalSnarkWorkerWorkError, SnarkWorkResult, SnarkWorkSpec,
@@ -21,6 +22,7 @@ pub trait ExternalSnarkWorkerService {
2122
&mut self,
2223
public_key: NonZeroCurvePoint,
2324
fee: CurrencyFeeStableV1,
25+
work_verifier: TransactionVerifier,
2426
) -> Result<(), ExternalSnarkWorkerError>;
2527

2628
/// Submits snark work

node/src/recorder/recorder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ fn graceful_shutdown(only_i: Option<usize>) {
160160
let files_iter = files
161161
.iter_mut()
162162
.enumerate()
163-
.filter(|(i, _)| only_i.map_or(true, |only_i| only_i == *i))
163+
.filter(|(i, _)| only_i.is_none_or(|only_i| only_i == *i))
164164
.filter_map(|(i, v)| Some((i, v.take()?)));
165165

166166
for (i, file) in files_iter {

node/src/snark_pool/candidate/snark_pool_candidate_actions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl redux::EnablingCondition<crate::State> for SnarkPoolCandidateAction {
7070
.snark_pool
7171
.candidates
7272
.get(*peer_id, &info.job_id)
73-
.map_or(true, |v| info > v)
73+
.is_none_or(|v| info > v)
7474
}
7575
SnarkPoolCandidateAction::WorkFetchAll => state.p2p.ready().is_some(),
7676
SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id } => {
@@ -104,7 +104,7 @@ impl redux::EnablingCondition<crate::State> for SnarkPoolCandidateAction {
104104
.snark_pool
105105
.candidates
106106
.get(*peer_id, &job_id)
107-
.map_or(true, |v| match work.partial_cmp(v).unwrap() {
107+
.is_none_or(|v| match work.partial_cmp(v).unwrap() {
108108
Ordering::Less => false,
109109
Ordering::Greater => true,
110110
Ordering::Equal => {

0 commit comments

Comments
 (0)