1
- use std:: time:: Duration ;
1
+ use std:: { cmp :: Ordering , time:: Duration } ;
2
2
3
3
use mina_p2p_messages:: v2:: MinaLedgerSyncLedgerQueryStableV1 ;
4
4
use node:: {
5
5
event_source:: Event ,
6
6
ledger:: LedgerAddress ,
7
7
p2p:: {
8
8
channels:: {
9
- rpc:: { P2pRpcKind , P2pRpcRequest , RpcChannelMsg } ,
9
+ rpc:: { P2pRpcRequest , RpcChannelMsg } ,
10
10
ChannelMsg ,
11
11
} ,
12
12
P2pChannelEvent , P2pEvent ,
13
13
} ,
14
- State ,
14
+ ActionKind , State ,
15
15
} ;
16
16
17
17
use crate :: {
18
18
cluster:: ClusterNodeId ,
19
19
node:: RustNodeTestingConfig ,
20
20
scenario:: { ListenerNode , ScenarioStep } ,
21
- scenarios:: cluster_runner:: ClusterRunner ,
21
+ scenarios:: { cluster_runner:: ClusterRunner , RunDecision } ,
22
22
} ;
23
23
24
24
/// Set up single Rust node and sync up root snarked ledger.
@@ -49,94 +49,59 @@ impl SoloNodeSyncRootSnarkedLedger {
49
49
} )
50
50
. await
51
51
. unwrap ( ) ;
52
- eprintln ! ( "node: {node_id} dialing to replayer: {REPLAYER_1}" ) ;
52
+ eprintln ! ( "node( {node_id}) dialing to replayer: {REPLAYER_1}" ) ;
53
53
runner
54
54
. exec_step ( ScenarioStep :: ConnectNodes {
55
55
dialer : node_id,
56
56
listener : ListenerNode :: Custom ( REPLAYER_2 . parse ( ) . unwrap ( ) ) ,
57
57
} )
58
58
. await
59
59
. unwrap ( ) ;
60
- eprintln ! ( "node: {node_id} dialing to replayer: {REPLAYER_2}" ) ;
60
+ eprintln ! ( "node( {node_id}) dialing to replayer: {REPLAYER_2}" ) ;
61
61
62
- loop {
63
- if !runner
64
- . wait_for_pending_events_with_timeout ( Duration :: from_secs ( 10 ) )
65
- . await
66
- {
67
- panic ! ( "waiting for connection event timed out" ) ;
68
- }
69
- let ( state, events) = runner. node_pending_events ( node_id) . unwrap ( ) ;
70
-
71
- let connected_peer_count = state
72
- . p2p
73
- . ready_peers_iter ( )
74
- . filter ( |( _, p) | p. channels . rpc . is_ready ( ) )
75
- . count ( ) ;
76
-
77
- // Break loop if both replayers got connected and we started
78
- // sending ledger queries.
79
- if connected_peer_count >= 2 {
80
- let has_sent_ledger_query = state
81
- . p2p
82
- . ready_peers_iter ( )
83
- . filter_map ( |( _, p) | p. channels . rpc . pending_local_rpc_kind ( ) )
84
- . any ( |rpc_kind| matches ! ( rpc_kind, P2pRpcKind :: LedgerQuery ) ) ;
85
-
86
- if has_sent_ledger_query {
87
- break ;
88
- }
89
- }
90
-
91
- let events = events
92
- . filter_map ( |( _, event) | {
93
- // Don't dispatch ledger query responses yet. We want
94
- // to later manually control their order.
95
- Some ( ( ) )
96
- . filter ( |_| self . event_ledger_query_addr ( state, event) . is_none ( ) )
97
- . map ( |_| event. to_string ( ) )
98
- } )
99
- . collect :: < Vec < _ > > ( ) ;
62
+ // Wait for both peers to be connected, hiding p2p ledger query
63
+ // responses for now, as we want to control their order.
64
+ runner
65
+ . run (
66
+ Duration :: from_secs ( 10 ) ,
67
+ |_, state, event| {
68
+ if self . event_ledger_query_addr ( state, event) . is_some ( ) {
69
+ // skip/hide ledger query events.
70
+ return RunDecision :: Skip ;
71
+ }
72
+ RunDecision :: ContinueExec
73
+ } ,
74
+ |_, state, _, _| {
75
+ let connected_peer_count = state
76
+ . p2p
77
+ . ready_peers_iter ( )
78
+ . filter ( |( _, p) | p. channels . rpc . is_ready ( ) )
79
+ . count ( ) ;
80
+
81
+ // exit if both peers ready.
82
+ connected_peer_count >= 2
83
+ } ,
84
+ )
85
+ . await
86
+ . expect ( "waiting for 2 replayer peers to be connected timed out" ) ;
100
87
101
- for event in events {
102
- runner
103
- . exec_step ( ScenarioStep :: Event { node_id, event } )
104
- . await
105
- . unwrap ( ) ;
106
- }
107
- }
108
88
eprintln ! ( "2 replayers are now connected" ) ;
109
89
110
90
// Exec ledger query responses until we are deep enough for there
111
91
// to be more than 1 hash in the same height.
112
92
eprintln ! ( "exec ledger query responses until we are deep enough for there to be more than 1 hash in the same height" ) ;
113
- loop {
114
- if !runner
115
- . wait_for_pending_events_with_timeout ( Duration :: from_secs ( 5 ) )
116
- . await
117
- {
118
- panic ! ( "waiting for events event timed out" ) ;
119
- }
120
- let ( state, events) = runner. node_pending_events ( node_id) . unwrap ( ) ;
121
-
122
- let snarked_state = state
123
- . transition_frontier
124
- . sync
125
- . ledger ( )
126
- . unwrap ( )
127
- . snarked ( )
128
- . unwrap ( ) ;
129
- if snarked_state. fetch_pending ( ) . unwrap ( ) . len ( ) >= 2 {
130
- break ;
131
- }
132
-
133
- for event in events. map ( |( _, e) | e. to_string ( ) ) . collect :: < Vec < _ > > ( ) {
134
- runner
135
- . exec_step ( ScenarioStep :: Event { node_id, event } )
136
- . await
137
- . unwrap ( ) ;
138
- }
139
- }
93
+ runner
94
+ . run (
95
+ Duration :: from_secs ( 10 ) ,
96
+ |_, _, _| RunDecision :: ContinueExec ,
97
+ // |_, _, _| RunDecision::ContinueExec,
98
+ move |_, state, _, action| {
99
+ matches ! ( action. action( ) . kind( ) , ActionKind :: CheckTimeouts )
100
+ && self . fetch_pending_count ( state) >= 2
101
+ } ,
102
+ )
103
+ . await
104
+ . expect ( "time out" ) ;
140
105
141
106
eprintln ! ( "receive all hashes before first..." ) ;
142
107
self . receive_all_hashes_before_first ( & mut runner, node_id)
@@ -147,60 +112,62 @@ impl SoloNodeSyncRootSnarkedLedger {
147
112
eprintln ! ( "success" ) ;
148
113
}
149
114
115
+ fn fetch_pending_count ( self , state : & State ) -> usize {
116
+ None . or_else ( || {
117
+ let snarked_state = state. transition_frontier . sync . ledger ( ) ?. snarked ( ) ?;
118
+ Some ( snarked_state. fetch_pending ( ) . unwrap ( ) . len ( ) )
119
+ } )
120
+ . unwrap_or ( 0 )
121
+ }
122
+
123
+ async fn receive_single_hash ( self , runner : & mut ClusterRunner < ' _ > , node_id : ClusterNodeId ) {
124
+ runner
125
+ . run (
126
+ Duration :: from_secs ( 5 ) ,
127
+ |cur_node_id, state, event| {
128
+ if cur_node_id == node_id
129
+ && self . event_ledger_query_addr ( state, event) . is_some ( )
130
+ {
131
+ return RunDecision :: StopExec ;
132
+ }
133
+ RunDecision :: Skip
134
+ } ,
135
+ |_, _, _, _| false ,
136
+ )
137
+ . await
138
+ . expect ( "timeout" ) ;
139
+ }
140
+
150
141
async fn receive_all_hashes_before_first (
151
142
self ,
152
143
runner : & mut ClusterRunner < ' _ > ,
153
144
node_id : ClusterNodeId ,
154
145
) {
155
146
self . receive_all_hashes_except_first ( runner, node_id) . await ;
156
- let ( _state, events) = runner. node_pending_events ( node_id) . unwrap ( ) ;
157
- for event in events. map ( |( _, e) | e. to_string ( ) ) . collect :: < Vec < _ > > ( ) {
158
- runner
159
- . exec_step ( ScenarioStep :: Event { node_id, event } )
160
- . await
161
- . unwrap ( ) ;
162
- }
147
+ self . receive_single_hash ( runner, node_id) . await ;
163
148
}
164
149
165
150
async fn receive_all_hashes_except_first (
166
151
self ,
167
152
runner : & mut ClusterRunner < ' _ > ,
168
- node_id : ClusterNodeId ,
153
+ _node_id : ClusterNodeId ,
169
154
) {
170
- loop {
171
- if !runner
172
- . wait_for_pending_events_with_timeout ( Duration :: from_secs ( 5 ) )
173
- . await
174
- {
175
- panic ! ( "waiting for events event timed out" ) ;
176
- }
177
- let ( state, events) = runner. node_pending_events ( node_id) . unwrap ( ) ;
178
-
179
- let snarked_state = state
180
- . transition_frontier
181
- . sync
182
- . ledger ( )
183
- . unwrap ( )
184
- . snarked ( )
185
- . unwrap ( ) ;
186
- if snarked_state. fetch_pending ( ) . unwrap ( ) . len ( ) == 1 {
187
- break ;
188
- }
189
-
190
- let events = events. filter ( |( _, e) | !self . is_event_first_ledger_query ( state, e) ) ;
191
-
192
- for event in events. map ( |( _, e) | e. to_string ( ) ) . collect :: < Vec < _ > > ( ) {
193
- runner
194
- . exec_step ( ScenarioStep :: Event { node_id, event } )
195
- . await
196
- . unwrap ( ) ;
197
- }
198
- }
199
-
200
155
runner
201
- . exec_step ( ScenarioStep :: CheckTimeouts { node_id } )
156
+ . run (
157
+ Duration :: from_secs ( 10 ) ,
158
+ |_, state, event| {
159
+ if self . is_event_first_ledger_query ( state, event) {
160
+ return RunDecision :: Skip ;
161
+ }
162
+ RunDecision :: ContinueExec
163
+ } ,
164
+ move |_, state, _, action| {
165
+ matches ! ( action. action( ) . kind( ) , ActionKind :: CheckTimeouts )
166
+ && self . fetch_pending_count ( state) == 1
167
+ } ,
168
+ )
202
169
. await
203
- . unwrap ( ) ;
170
+ . expect ( "timeout" ) ;
204
171
}
205
172
206
173
async fn receive_all_hashes_before_last (
@@ -209,60 +176,46 @@ impl SoloNodeSyncRootSnarkedLedger {
209
176
node_id : ClusterNodeId ,
210
177
) {
211
178
self . receive_all_hashes_except_last ( runner, node_id) . await ;
212
- let ( _state, events) = runner. node_pending_events ( node_id) . unwrap ( ) ;
213
- for event in events. map ( |( _, e) | e. to_string ( ) ) . collect :: < Vec < _ > > ( ) {
214
- runner
215
- . exec_step ( ScenarioStep :: Event { node_id, event } )
216
- . await
217
- . unwrap ( ) ;
218
- }
179
+ self . receive_single_hash ( runner, node_id) . await ;
219
180
}
220
181
221
182
async fn receive_all_hashes_except_last (
222
183
self ,
223
184
runner : & mut ClusterRunner < ' _ > ,
224
185
node_id : ClusterNodeId ,
225
186
) {
226
- loop {
227
- if !runner
228
- . wait_for_pending_events_with_timeout ( Duration :: from_secs ( 5 ) )
187
+ let mut biggest_addr = None ;
188
+ while self . fetch_pending_count ( runner. node ( node_id) . unwrap ( ) . state ( ) ) > 1 {
189
+ runner
190
+ . run (
191
+ Duration :: from_secs ( 10 ) ,
192
+ |_, state, event| {
193
+ let Some ( addr) = self . event_ledger_query_addr ( state, event) else {
194
+ return RunDecision :: Skip ;
195
+ } ;
196
+ match biggest_addr. as_mut ( ) {
197
+ None => {
198
+ biggest_addr = Some ( addr) ;
199
+ RunDecision :: Skip
200
+ }
201
+ Some ( biggest_addr) => match addr. cmp ( biggest_addr) {
202
+ Ordering :: Less => RunDecision :: ContinueExec ,
203
+ Ordering :: Equal => RunDecision :: Skip ,
204
+ Ordering :: Greater => {
205
+ * biggest_addr = addr;
206
+ RunDecision :: Stop
207
+ }
208
+ } ,
209
+ }
210
+ } ,
211
+ move |_, state, _, action| {
212
+ matches ! ( action. action( ) . kind( ) , ActionKind :: CheckTimeouts )
213
+ && self . fetch_pending_count ( state) == 1
214
+ } ,
215
+ )
229
216
. await
230
- {
231
- panic ! ( "waiting for events event timed out" ) ;
232
- }
233
- let ( state, events) = runner. node_pending_events ( node_id) . unwrap ( ) ;
234
-
235
- let snarked_state = state
236
- . transition_frontier
237
- . sync
238
- . ledger ( )
239
- . unwrap ( )
240
- . snarked ( )
241
- . unwrap ( ) ;
242
- if snarked_state. fetch_pending ( ) . unwrap ( ) . len ( ) == 1 {
243
- break ;
244
- }
245
-
246
- let mut events = events
247
- . filter_map ( |( _, e) | Some ( ( e, self . event_ledger_query_addr ( state, e) ?) ) )
248
- . collect :: < Vec < _ > > ( ) ;
249
-
250
- events. sort_by ( |( _, addr1) , ( _, addr2) | addr1. cmp ( addr2) ) ;
251
-
252
- let events = events. into_iter ( ) . rev ( ) . skip ( 1 ) ;
253
-
254
- for event in events. map ( |( e, _) | e. to_string ( ) ) . collect :: < Vec < _ > > ( ) {
255
- runner
256
- . exec_step ( ScenarioStep :: Event { node_id, event } )
257
- . await
258
- . unwrap ( ) ;
259
- }
217
+ . expect ( "timeout" ) ;
260
218
}
261
-
262
- runner
263
- . exec_step ( ScenarioStep :: CheckTimeouts { node_id } )
264
- . await
265
- . unwrap ( ) ;
266
219
}
267
220
268
221
fn event_ledger_query_addr ( self , state : & State , event : & Event ) -> Option < LedgerAddress > {
0 commit comments