Skip to content

Commit cf06c7f

Browse files
committed
Merge remote-tracking branch 'origin/fix-always-deregister-payment-tracker' into test
2 parents caa0cbf + f91f8cc commit cf06c7f

File tree

5 files changed

+211
-30
lines changed

5 files changed

+211
-30
lines changed

rust/lit-node/lit-node-testnet/src/validator.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,6 @@ impl ValidatorCollection {
857857
.filter(|f| ports.contains(&f.node.port))
858858
.map(|v| v.socket_address())
859859
.collect();
860-
861860
let nodes_for_epoch2 = nodes_for_epoch.clone();
862861

863862
let threshold = self

rust/lit-node/lit-node/src/endpoints/versions/v2.rs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use crate::endpoints::{admin, pkp, web_client};
44
use crate::functions::ActionStore;
55
use crate::models;
66
use crate::payment::delegated_usage::DelegatedUsageDB;
7-
use crate::payment::{payed_endpoint::PayedEndpoint, payment_tracker::PaymentTracker};
7+
use crate::payment::{
8+
payed_endpoint::PayedEndpoint,
9+
payment_tracker::{PaymentTracker, PaymentUsageGuard},
10+
};
811
use crate::peers::grpc_client_pool::GrpcClientPool;
912
use crate::tss::common::{restore::restore_state::RestoreState, tss_state::TssState};
1013
use crate::utils::rocket::guards::RequestHeaders;
@@ -62,7 +65,10 @@ pub(crate) async fn sign_session_key(
6265
tracing: Tracing,
6366
request_headers: RequestHeaders<'_>,
6467
) -> status::Custom<Value> {
65-
payment_tracker.register_usage(&PayedEndpoint::SignSessionKey);
68+
let _usage_guard = PaymentUsageGuard::new(
69+
payment_tracker.inner().clone(),
70+
PayedEndpoint::SignSessionKey,
71+
);
6672

6773
let (json_sign_session_key_request, client_session) =
6874
match client_state.json_decrypt_to_session(&json_sign_session_key_request) {
@@ -80,7 +86,7 @@ pub(crate) async fn sign_session_key(
8086
};
8187
let client_session = Arc::new(client_session);
8288

83-
let call_result = with_timeout(
89+
with_timeout(
8490
&cfg.load_full(),
8591
None,
8692
Some(client_session.clone()),
@@ -104,11 +110,7 @@ pub(crate) async fn sign_session_key(
104110
.await
105111
},
106112
)
107-
.await;
108-
109-
payment_tracker.deregister_usage(&PayedEndpoint::SignSessionKey);
110-
111-
call_result
113+
.await
112114
}
113115

114116
#[allow(clippy::too_many_arguments)]
@@ -147,7 +149,10 @@ pub(crate) async fn encryption_sign(
147149
// Err(e) => return e.handle(),
148150
// };
149151

150-
payment_tracker.register_usage(&PayedEndpoint::EncryptionSign);
152+
let _usage_guard = PaymentUsageGuard::new(
153+
payment_tracker.inner().clone(),
154+
PayedEndpoint::EncryptionSign,
155+
);
151156

152157
let (encryption_sign_request, client_session) =
153158
match client_state.json_decrypt_to_session(&encryption_sign_request) {
@@ -160,7 +165,7 @@ pub(crate) async fn encryption_sign(
160165
};
161166
let client_session = Arc::new(client_session);
162167

163-
let call_result = with_timeout(
168+
with_timeout(
164169
&cfg.load_full(),
165170
None,
166171
Some(client_session.clone()),
@@ -182,11 +187,7 @@ pub(crate) async fn encryption_sign(
182187
.await
183188
},
184189
)
185-
.await;
186-
187-
payment_tracker.deregister_usage(&PayedEndpoint::EncryptionSign);
188-
189-
call_result
190+
.await
190191
}
191192

192193
#[cfg(feature = "lit-actions")]
@@ -210,7 +211,8 @@ pub(crate) async fn execute_function(
210211
request_headers: RequestHeaders<'_>,
211212
action_store: &State<ActionStore>,
212213
) -> status::Custom<Value> {
213-
payment_tracker.register_usage(&PayedEndpoint::LitAction);
214+
let _usage_guard =
215+
PaymentUsageGuard::new(payment_tracker.inner().clone(), PayedEndpoint::LitAction);
214216

215217
let (json_execution_request, client_session) =
216218
match client_state.json_decrypt_to_session(&json_execution_request) {
@@ -230,7 +232,7 @@ pub(crate) async fn execute_function(
230232

231233
let actions_config = tss_state.chain_data_config_manager.get_actions_config();
232234

233-
let call_result = with_timeout(
235+
with_timeout(
234236
&cfg.load_full(),
235237
Some(actions_config.timeout_ms),
236238
Some(client_session.clone()),
@@ -257,11 +259,7 @@ pub(crate) async fn execute_function(
257259
.await
258260
},
259261
)
260-
.await;
261-
262-
payment_tracker.deregister_usage(&PayedEndpoint::LitAction);
263-
264-
call_result
262+
.await
265263
}
266264

267265
#[cfg(feature = "lit-actions")]
@@ -330,7 +328,8 @@ pub(crate) async fn pkp_sign(
330328
tracing: Tracing,
331329
http_client: &State<reqwest::Client>,
332330
) -> status::Custom<Value> {
333-
payment_tracker.register_usage(&PayedEndpoint::PkpSign);
331+
let _usage_guard =
332+
PaymentUsageGuard::new(payment_tracker.inner().clone(), PayedEndpoint::PkpSign);
334333

335334
let (json_pkp_signing_request, client_session) =
336335
match client_state.json_decrypt_to_session(&json_pkp_signing_request) {
@@ -343,7 +342,7 @@ pub(crate) async fn pkp_sign(
343342
};
344343
let client_session = Arc::new(client_session);
345344

346-
let call_result = with_timeout(
345+
with_timeout(
347346
&cfg.load_full(),
348347
None,
349348
Some(client_session.clone()),
@@ -366,11 +365,7 @@ pub(crate) async fn pkp_sign(
366365
.await
367366
},
368367
)
369-
.await;
370-
371-
payment_tracker.deregister_usage(&PayedEndpoint::PkpSign);
372-
373-
call_result
368+
.await
374369
}
375370

376371
#[post("/web/admin/get_blinders/v2", format = "json", data = "<auth>")]

rust/lit-node/lit-node/src/payment/payment_tracker.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::payment::{batches::Batches, payed_endpoint::PayedEndpoint};
22
use crate::version::{DataVersionReader, DataVersionWriter};
33
use sdd::AtomicShared;
4+
use std::sync::Arc;
45
use std::sync::atomic::{AtomicU64, Ordering};
56

67
#[derive(Default, Copy, Clone)]
@@ -89,3 +90,41 @@ impl PaymentTracker {
8990
DataVersionWriter::store(&self.node_capacity_config, config);
9091
}
9192
}
93+
94+
/// A guard that automatically deregisters usage when dropped, even on panics or early returns.
95+
/// This ensures that payment tracking remains accurate even when exceptions occur.
96+
pub struct PaymentUsageGuard {
97+
payment_tracker: Arc<PaymentTracker>,
98+
endpoint: PayedEndpoint,
99+
/// Whether the guard has already been manually deregistered (to avoid double deregistration)
100+
deregistered: bool,
101+
}
102+
103+
impl PaymentUsageGuard {
104+
/// Creates a new guard and registers usage for the given endpoint.
105+
pub fn new(payment_tracker: Arc<PaymentTracker>, endpoint: PayedEndpoint) -> Self {
106+
payment_tracker.register_usage(&endpoint);
107+
Self {
108+
payment_tracker,
109+
endpoint,
110+
deregistered: false,
111+
}
112+
}
113+
114+
/// Manually deregister usage. This is optional - the guard will automatically
115+
/// deregister when dropped, but you can call this explicitly if needed.
116+
pub fn deregister(&mut self) {
117+
if !self.deregistered {
118+
self.payment_tracker.deregister_usage(&self.endpoint);
119+
self.deregistered = true;
120+
}
121+
}
122+
}
123+
124+
impl Drop for PaymentUsageGuard {
125+
fn drop(&mut self) {
126+
if !self.deregistered {
127+
self.payment_tracker.deregister_usage(&self.endpoint);
128+
}
129+
}
130+
}

rust/lit-node/lit-node/tests/acceptance/payment.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,153 @@ async fn test_pending_payments_block_usage() {
13611361
.await;
13621362
}
13631363

1364+
#[tokio::test]
1365+
async fn test_payment_tracker_usage_tracking() {
1366+
// This test verifies that:
1367+
// 1. Pricing increases with concurrency (usage percentage)
1368+
// 2. After requests complete, usage tracking correctly returns to 0%
1369+
// (verifying that PaymentUsageGuard properly deregisters when requests finish)
1370+
1371+
crate::common::setup_logging();
1372+
let (testnet, _validator_collection, actions, node_set) = setup_testnet_for_payments().await;
1373+
let realm_id = ethers::types::U256::from(1);
1374+
let node_set = get_identity_pubkeys_from_node_set(&node_set).await;
1375+
1376+
let self_pay_user = EndUser::new(&testnet);
1377+
self_pay_user
1378+
.set_wallet_balance(INITIAL_FUNDING_AMOUNT)
1379+
.await;
1380+
1381+
// Get initial price at 0% usage
1382+
// 3 is the SignSessionKey product ID
1383+
let product_id = 1;
1384+
let initial_price = self_pay_user.first_node_price_from_feed(product_id).await;
1385+
info!("Initial price at 0% usage: {}", initial_price);
1386+
1387+
// Prepare valid encryption parameters for successful requests
1388+
let test_encryption_parameters = prepare_test_encryption_parameters();
1389+
1390+
let resource_ability_requests = vec![LitResourceAbilityRequest {
1391+
resource: LitResourceAbilityRequestResource {
1392+
resource: format!(
1393+
"{}/{}",
1394+
test_encryption_parameters.hashed_access_control_conditions,
1395+
test_encryption_parameters.data_to_encrypt_hash
1396+
),
1397+
resource_prefix: LitResourcePrefix::ACC.to_string(),
1398+
},
1399+
ability: LitAbility::AccessControlConditionDecryption.to_string(),
1400+
}];
1401+
1402+
// Fund the user with enough balance for multiple requests
1403+
// Use a multiplier for funding to ensure we have enough for concurrent requests
1404+
let funding_amount = initial_price * 1000 * NUM_STAKED_VALIDATORS;
1405+
self_pay_user.deposit_to_wallet_ledger(funding_amount).await;
1406+
1407+
// Step 1: Make concurrent requests to increase usage and verify pricing increases
1408+
info!("Step 1: Making concurrent requests to increase usage");
1409+
let num_concurrent_requests = 30;
1410+
let mut handles = Vec::new();
1411+
1412+
for i in 0..num_concurrent_requests {
1413+
let session_sigs_and_node_set = get_session_sigs_for_auth(
1414+
&node_set,
1415+
resource_ability_requests.clone(),
1416+
Some(self_pay_user.wallet.clone()),
1417+
None,
1418+
Some(initial_price),
1419+
);
1420+
1421+
let params = test_encryption_parameters.clone();
1422+
let epoch = actions.get_current_epoch(realm_id).await.as_u64();
1423+
1424+
let handle = tokio::spawn(async move {
1425+
// Add small delay to ensure requests are truly concurrent
1426+
tokio::time::sleep(tokio::time::Duration::from_millis(i * 10)).await;
1427+
retrieve_decryption_key_session_sigs(
1428+
params,
1429+
&session_sigs_and_node_set,
1430+
epoch,
1431+
DEFAULT_KEY_SET_NAME,
1432+
)
1433+
.await
1434+
});
1435+
handles.push(handle);
1436+
}
1437+
1438+
// Wait a bit for requests to register usage
1439+
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1440+
1441+
// Check that price has increased due to concurrency
1442+
let price_with_concurrency = self_pay_user.first_node_price_from_feed(product_id).await;
1443+
info!(
1444+
"Price with {} concurrent requests: {}",
1445+
num_concurrent_requests, price_with_concurrency
1446+
);
1447+
1448+
// Price should be higher than initial
1449+
assert!(
1450+
price_with_concurrency >= initial_price,
1451+
"Price should increase or stay the same with concurrent requests. Initial: {}, With concurrency: {}",
1452+
initial_price,
1453+
price_with_concurrency
1454+
);
1455+
1456+
// Wait for all requests to complete
1457+
info!("Waiting for all concurrent requests to complete");
1458+
for handle in handles {
1459+
let _ = handle.await;
1460+
}
1461+
1462+
// Step 2: Wait for all requests to fully complete and verify usage returns to 0%
1463+
info!("Step 2: Waiting for usage to return to 0%");
1464+
1465+
// Give some time for all guards to drop and deregister
1466+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1467+
1468+
// Check that price has returned to initial (or close to it)
1469+
// Note: There might be some delay in price feed updates, so we check multiple times
1470+
let mut final_price = self_pay_user.first_node_price_from_feed(product_id).await;
1471+
let mut attempts = 0;
1472+
const MAX_ATTEMPTS: u32 = 10;
1473+
1474+
while final_price > initial_price && attempts < MAX_ATTEMPTS {
1475+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1476+
final_price = self_pay_user.first_node_price_from_feed(product_id).await;
1477+
attempts += 1;
1478+
info!(
1479+
"Attempt {}: Final price: {}, Initial price: {}",
1480+
attempts, final_price, initial_price
1481+
);
1482+
}
1483+
1484+
info!("Final price after all requests: {}", final_price);
1485+
info!("Initial price: {}", initial_price);
1486+
1487+
// The final price should be close to the initial price (within reasonable tolerance)
1488+
// This verifies that usage tracking correctly returned to 0% after all requests completed
1489+
// Note: We allow some tolerance because price feed updates might have slight delays
1490+
let price_difference = if final_price > initial_price {
1491+
final_price - initial_price
1492+
} else {
1493+
U256::zero()
1494+
};
1495+
1496+
// Allow up to 5% difference to account for price feed update delays
1497+
let tolerance = initial_price / 20;
1498+
1499+
assert!(
1500+
price_difference <= tolerance,
1501+
"Price should return close to initial after all requests complete (including exceptions). \
1502+
Initial: {}, Final: {}, Difference: {}, Tolerance: {}. \
1503+
This indicates usage tracking correctly deregistered even after exceptions.",
1504+
initial_price,
1505+
final_price,
1506+
price_difference,
1507+
tolerance
1508+
);
1509+
}
1510+
13641511
async fn setup_testnet_for_payments() -> (Testnet, ValidatorCollection, Actions, Vec<NodeSet>) {
13651512
do_setup_testnet_for_payments(true).await
13661513
}

rust/lit-node/lit-node/tests/integration/lit_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ pub mod litactions {
386386
// currently designed to handle just a single siganture.
387387
let mut shares = vec![];
388388
for resp in execute_resp {
389+
info!("resp: {:?}", resp);
389390
assert!(resp.ok);
390391
let data = resp.data.as_ref().unwrap();
391392
info!("json_object: {:?}", data);

0 commit comments

Comments
 (0)