Skip to content

Commit 45ae89b

Browse files
0xMimirdirectcuteo
authored andcommitted
Added limit for IWANT requests
Small improvements
2 parents 26354df + 414ce3e commit 45ae89b

File tree

8 files changed

+130
-51
lines changed

8 files changed

+130
-51
lines changed

frontend/src/app/core/services/web-node.service.ts

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
import { Injectable } from '@angular/core';
22
import { BehaviorSubject, filter, from, fromEvent, map, merge, Observable, of, switchMap, tap } from 'rxjs';
33
import base from 'base-x';
4-
import { any, log } from '@openmina/shared';
4+
import { any } from '@openmina/shared';
55
import { HttpClient } from '@angular/common/http';
66

77
@Injectable({
88
providedIn: 'root',
99
})
1010
export class WebNodeService {
1111

12-
private readonly backendSubject$: BehaviorSubject<any> = new BehaviorSubject<any>(null);
13-
private backend: any;
12+
private readonly webnode$: BehaviorSubject<any> = new BehaviorSubject<any>(null);
1413
private webNodeKeyPair: { publicKey: string, privateKey: string };
1514

1615
constructor(private http: HttpClient) {
@@ -42,79 +41,74 @@ export class WebNodeService {
4241
console.log(wasm);
4342
return from(wasm.run(this.webNodeKeyPair.privateKey));
4443
}),
45-
tap((jsHandle: any) => {
46-
this.backend = jsHandle;
44+
tap((webnode: any) => {
4745
console.log('----------------WEBNODE----------------');
48-
console.log(jsHandle);
49-
this.backendSubject$.next(jsHandle);
46+
console.log(webnode);
47+
this.webnode$.next(webnode);
5048
}),
51-
switchMap(() => this.backendSubject$.asObservable()),
49+
switchMap(() => this.webnode$.asObservable()),
5250
filter(Boolean),
5351
);
5452
}
5553

56-
get webNodeKeys(): { publicKey: string, privateKey: string } {
57-
return this.webNodeKeyPair;
58-
}
59-
6054
get status$(): Observable<any> {
61-
return this.backendSubject$.asObservable().pipe(
55+
return this.webnode$.asObservable().pipe(
6256
filter(Boolean),
6357
switchMap(handle => from((handle as any).status())),
6458
);
6559
}
6660

6761
get blockProducerStats$(): Observable<any> {
68-
return this.backendSubject$.asObservable().pipe(
62+
return this.webnode$.asObservable().pipe(
6963
filter(Boolean),
7064
switchMap(handle => from((handle as any).stats().block_producer())),
7165
);
7266
}
7367

7468
get peers$(): Observable<any> {
75-
return this.backendSubject$.asObservable().pipe(
69+
return this.webnode$.asObservable().pipe(
7670
filter(Boolean),
7771
switchMap(handle => from(any(handle).state().peers())),
7872
);
7973
}
8074

8175
get messageProgress$(): Observable<any> {
82-
return this.backendSubject$.asObservable().pipe(
76+
return this.webnode$.asObservable().pipe(
8377
filter(Boolean),
8478
switchMap(handle => from((handle as any).state().message_progress())),
8579
);
8680
}
8781

8882
get sync$(): Observable<any> {
89-
return this.backendSubject$.asObservable().pipe(
83+
return this.webnode$.asObservable().pipe(
9084
filter(Boolean),
9185
switchMap(handle => from((handle as any).stats().sync())),
9286
);
9387
}
9488

9589
get accounts$(): Observable<any> {
96-
return this.backendSubject$.asObservable().pipe(
90+
return this.webnode$.asObservable().pipe(
9791
filter(Boolean),
9892
switchMap(handle => from((handle as any).ledger().latest().accounts().all())),
9993
);
10094
}
10195

10296
get bestChainUserCommands$(): Observable<any> {
103-
return this.backendSubject$.asObservable().pipe(
97+
return this.webnode$.asObservable().pipe(
10498
filter(Boolean),
10599
switchMap(handle => from((handle as any).transition_frontier().best_chain().user_commands())),
106100
);
107101
}
108102

109103
sendPayment$(payment: any): Observable<any> {
110-
return this.backendSubject$.asObservable().pipe(
104+
return this.webnode$.asObservable().pipe(
111105
filter(Boolean),
112106
switchMap(handle => from((handle as any).transaction_pool().inject().payment(payment))),
113107
);
114108
}
115109

116110
get transactionPool$(): Observable<any> {
117-
return this.backendSubject$.asObservable().pipe(
111+
return this.webnode$.asObservable().pipe(
118112
filter(Boolean),
119113
switchMap(handle => from((handle as any).transaction_pool().get())),
120114
);

frontend/src/app/features/benchmarks/wallets/benchmarks-wallets.effects.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ import { Injectable } from '@angular/core';
22
import { MinaState, selectMinaState } from '@app/app.setup';
33
import { Actions, createEffect, ofType } from '@ngrx/effects';
44
import { Effect } from '@openmina/shared';
5-
import { forkJoin, map, switchMap } from 'rxjs';
5+
import { EMPTY, forkJoin, map, switchMap } from 'rxjs';
66
import { Store } from '@ngrx/store';
77
import {
8+
BENCHMARKS_WALLETS_CLOSE,
89
BENCHMARKS_WALLETS_GET_ALL_TXS,
910
BENCHMARKS_WALLETS_GET_ALL_TXS_SUCCESS,
1011
BENCHMARKS_WALLETS_GET_WALLETS,
1112
BENCHMARKS_WALLETS_GET_WALLETS_SUCCESS,
1213
BENCHMARKS_WALLETS_SEND_TX_SUCCESS,
1314
BENCHMARKS_WALLETS_SEND_TXS, BENCHMARKS_WALLETS_SEND_ZKAPPS, BENCHMARKS_WALLETS_SEND_ZKAPPS_SUCCESS,
14-
BenchmarksWalletsActions,
15+
BenchmarksWalletsActions, BenchmarksWalletsClose,
1516
BenchmarksWalletsGetWallets,
1617
BenchmarksWalletsSendTxs,
1718
} from '@benchmarks/wallets/benchmarks-wallets.actions';
@@ -38,22 +39,23 @@ export class BenchmarksWalletsEffects extends MinaRustBaseEffect<BenchmarksWalle
3839
private zkService: BenchmarksWalletsZkService,
3940
private mempoolService: MempoolService,
4041
store: Store<MinaState>) {
41-
4242
super(store, selectMinaState);
4343

4444
this.getWallets$ = createEffect(() => this.actions$.pipe(
45-
ofType(BENCHMARKS_WALLETS_GET_WALLETS),
46-
this.latestActionState<BenchmarksWalletsGetWallets>(),
47-
switchMap(({ action }) => this.benchmarksService.getAccounts().pipe(
48-
switchMap(payload => {
49-
const actions = [];
50-
if (action.payload?.initialRequest) {
51-
actions.push({ type: BENCHMARKS_WALLETS_GET_ALL_TXS });
52-
}
53-
actions.push({ type: BENCHMARKS_WALLETS_GET_WALLETS_SUCCESS, payload });
54-
return actions;
55-
}),
56-
)),
45+
ofType(BENCHMARKS_WALLETS_GET_WALLETS, BENCHMARKS_WALLETS_CLOSE),
46+
this.latestActionState<BenchmarksWalletsGetWallets | BenchmarksWalletsClose>(),
47+
switchMap(({ action }) => action.type === BENCHMARKS_WALLETS_CLOSE
48+
? EMPTY
49+
: this.benchmarksService.getAccounts().pipe(
50+
switchMap(payload => {
51+
const actions = [];
52+
if (action.payload?.initialRequest) {
53+
actions.push({ type: BENCHMARKS_WALLETS_GET_ALL_TXS });
54+
}
55+
actions.push({ type: BENCHMARKS_WALLETS_GET_WALLETS_SUCCESS, payload });
56+
return actions;
57+
}),
58+
)),
5759
catchErrorAndRepeat(MinaErrorType.GENERIC, BENCHMARKS_WALLETS_GET_WALLETS_SUCCESS, []),
5860
));
5961

frontend/src/app/features/benchmarks/wallets/benchmarks-wallets.service.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4035,7 +4035,7 @@ export class BenchmarksWalletsService {
40354035
}
40364036

40374037
getAccounts(): Observable<Pick<BenchmarksWallet, 'publicKey' | 'privateKey' | 'minaTokens' | 'nonce'>[]> {
4038-
return this.rust.get<any[]>('/accounts').pipe(
4038+
return this.rust.get<AccountsResponse[]>('/accounts').pipe(
40394039
map(wallets => wallets.map(wallet => {
40404040
return ({
40414041
privateKey: WALLETS.find(w => w.publicKey === wallet.public_key)?.privateKey,
@@ -4140,3 +4140,9 @@ export class BenchmarksWalletsService {
41404140
});
41414141
}
41424142
}
4143+
4144+
export interface AccountsResponse {
4145+
public_key: string;
4146+
nonce: number;
4147+
balance: number;
4148+
}

frontend/src/app/layout/server-status/server-status.component.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
</div>
2525
</ng-container>
2626
<div class="node-status fx-row-vert-cent bg-surface h-sm border-rad-6 p-relative z-1 mr-10"
27-
[class.can-add-nodes]="canAddNodes">
27+
[class.can-add-nodes]="switchForbidden">
2828
<ng-container *ngIf="!switchForbidden && !hideNodeStats">
2929
<div class="shine-parent overflow-hidden p-absolute z-0 border-rad-6">
3030
<div *ngIf="details.status === AppNodeStatus.CATCHUP || details.status === AppNodeStatus.BOOTSTRAP"

frontend/src/app/layout/server-status/server-status.component.scss

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@
113113
}
114114
}
115115

116+
.chip::before {
117+
border-top-right-radius: 0 !important;
118+
border-bottom-right-radius: 0 !important;
119+
}
120+
116121
.shine-parent {
117122
height: calc(100% + 3px);
118123
width: calc(100% - 2px);

frontend/src/app/layout/toolbar/loading.reducer.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import {
4545
} from '@network/bootstrap-stats/network-bootstrap-stats.actions';
4646
import { BLOCK_PRODUCTION_PREFIX } from '@block-production/block-production.actions';
4747
import {
48+
BENCHMARKS_WALLETS_CLOSE,
4849
BENCHMARKS_WALLETS_GET_ALL_TXS,
4950
BENCHMARKS_WALLETS_GET_ALL_TXS_SUCCESS,
5051
BENCHMARKS_WALLETS_GET_WALLETS, BENCHMARKS_WALLETS_GET_WALLETS_SUCCESS,
@@ -154,6 +155,8 @@ export function loadingReducer(state: LoadingState = initialState, action: Featu
154155
return remove(state, BENCHMARKS_WALLETS_GET_WALLETS);
155156
case BENCHMARKS_WALLETS_GET_ALL_TXS_SUCCESS:
156157
return remove(state, BENCHMARKS_WALLETS_GET_ALL_TXS);
158+
case BENCHMARKS_WALLETS_CLOSE:
159+
return remove(state, [BENCHMARKS_WALLETS_GET_WALLETS, BENCHMARKS_WALLETS_GET_ALL_TXS]);
157160

158161
default:
159162
return state;

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{collections::btree_map::Entry, sync::Arc};
33
use binprot::BinProtRead;
44
use mina_p2p_messages::{gossip, v2};
55
use openmina_core::{block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, Substate};
6-
use redux::Dispatcher;
6+
use redux::{Dispatcher, Timestamp};
77

88
use crate::{
99
channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
@@ -28,7 +28,7 @@ impl P2pNetworkPubsubState {
2828
Action: crate::P2pActionTrait<State>,
2929
{
3030
let pubsub_state = state_context.get_substate_mut()?;
31-
let (action, _meta) = action.split();
31+
let (action, meta) = action.split();
3232

3333
match action {
3434
P2pNetworkPubsubAction::NewStream {
@@ -125,7 +125,7 @@ impl P2pNetworkPubsubState {
125125
seen_limit,
126126
..
127127
} => {
128-
pubsub_state.reduce_incoming_data(&peer_id, data)?;
128+
pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;
129129

130130
let dispatcher: &mut Dispatcher<Action, State> = state_context.into_dispatcher();
131131
dispatcher.push(P2pNetworkPubsubEffectfulAction::IncomingData {
@@ -447,7 +447,12 @@ impl P2pNetworkPubsubState {
447447
Ok(())
448448
}
449449

450-
fn reduce_incoming_data(&mut self, peer_id: &PeerId, data: Data) -> Result<(), String> {
450+
fn reduce_incoming_data(
451+
&mut self,
452+
peer_id: &PeerId,
453+
data: Data,
454+
timestamp: Timestamp,
455+
) -> Result<(), String> {
451456
let Some(state) = self.clients.get_mut(peer_id) else {
452457
// TODO: investigate, cannot reproduce this
453458
// bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
@@ -529,14 +534,19 @@ impl P2pNetworkPubsubState {
529534
}
530535
}
531536

532-
for ihave in &control.ihave {
533-
let message_ids = ihave
534-
.message_ids
535-
.iter()
536-
.filter(|msg_id| !self.mcache.map.contains_key(*msg_id))
537-
.cloned()
538-
.collect();
539-
if let Some(client) = self.clients.get_mut(peer_id) {
537+
for ihave in control.ihave {
538+
if self.clients.contains_key(peer_id) {
539+
let message_ids = ihave
540+
.message_ids
541+
.into_iter()
542+
.filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp))
543+
.collect::<Vec<_>>();
544+
545+
let Some(client) = self.clients.get_mut(peer_id) else {
546+
bug_condition!("State not found for {}", peer_id);
547+
return Ok(());
548+
};
549+
540550
let ctr = client.message.control.get_or_insert_with(Default::default);
541551
ctr.iwant.push(pb::ControlIWant { message_ids })
542552
}

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
use super::pb;
22
use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId};
33

4-
use std::collections::{BTreeMap, VecDeque};
4+
use std::{
5+
collections::{BTreeMap, VecDeque},
6+
time::Duration,
7+
};
58

69
use mina_p2p_messages::v2;
710
use openmina_core::{snark::Snark, transaction::Transaction};
11+
use redux::Timestamp;
812
use serde::{Deserialize, Serialize};
913

14+
pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
15+
1016
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
1117
pub struct P2pNetworkPubsubState {
1218
pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
@@ -18,12 +24,65 @@ pub struct P2pNetworkPubsubState {
1824
pub incoming_transactions: Vec<(Transaction, u32)>,
1925
pub incoming_snarks: Vec<(Snark, u32)>,
2026
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
27+
pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
28+
}
29+
30+
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
31+
pub struct P2pNetworkPubsubIwantRequestCount {
32+
pub message_id: Vec<u8>,
33+
pub count: Vec<Timestamp>,
2134
}
2235

2336
impl P2pNetworkPubsubState {
2437
pub fn prune_peer_state(&mut self, peer_id: &PeerId) {
2538
self.clients.remove(peer_id);
2639
}
40+
41+
pub fn filter_iwant_message_ids(&mut self, message_id: &Vec<u8>, timestamp: Timestamp) -> bool {
42+
if self.mcache.map.contains_key(message_id) {
43+
return false;
44+
}
45+
46+
let message_count = self
47+
.iwant
48+
.iter_mut()
49+
.find(|message| &message.message_id == message_id);
50+
51+
match message_count {
52+
Some(message) => {
53+
let message_counts = std::mem::take(&mut message.count);
54+
55+
message.count = message_counts
56+
.into_iter()
57+
.filter(|time| {
58+
timestamp
59+
.checked_sub(*time)
60+
.map_or(false, |duration| duration < IWANT_TIMEOUT_DURATION)
61+
})
62+
.collect();
63+
64+
if message.count.len() < 3 {
65+
message.count.push(timestamp);
66+
return true;
67+
}
68+
69+
false
70+
}
71+
None => {
72+
let message_count = P2pNetworkPubsubIwantRequestCount {
73+
message_id: message_id.to_owned(),
74+
count: vec![timestamp],
75+
};
76+
77+
self.iwant.push_back(message_count);
78+
if self.iwant.len() > 10 {
79+
self.iwant.pop_front();
80+
}
81+
82+
true
83+
}
84+
}
85+
}
2786
}
2887

2988
#[derive(Serialize, Deserialize, Debug, Clone)]

0 commit comments

Comments
 (0)