Skip to content

Commit 5eb7ff9

Browse files
authored
Merge pull request #356 from input-output-hk/whankinsiv/totals-endpoints
feat: address totals REST endpoint
2 parents 2adf0dc + 1b3cbb1 commit 5eb7ff9

File tree

6 files changed

+298
-58
lines changed

6 files changed

+298
-58
lines changed

common/src/types.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ pub type PolicyId = [u8; 28];
255255
pub type NativeAssets = Vec<(PolicyId, Vec<NativeAsset>)>;
256256
pub type NativeAssetsDelta = Vec<(PolicyId, Vec<NativeAssetDelta>)>;
257257
pub type NativeAssetsMap = HashMap<PolicyId, HashMap<AssetName, u64>>;
258+
pub type NativeAssetsDeltaMap = HashMap<PolicyId, HashMap<AssetName, i64>>;
258259

259260
#[derive(
260261
Debug,
@@ -402,6 +403,63 @@ impl AddAssign for ValueMap {
402403
}
403404
}
404405

406+
/// Hashmap representation of ValueDelta (lovelace + multiasset)
407+
pub struct ValueDeltaMap {
408+
pub lovelace: i64,
409+
pub assets: NativeAssetsDeltaMap,
410+
}
411+
412+
impl From<ValueDelta> for ValueDeltaMap {
413+
fn from(value: ValueDelta) -> Self {
414+
let mut assets = HashMap::new();
415+
416+
for (policy, asset_list) in value.assets {
417+
let policy_entry = assets.entry(policy).or_insert_with(HashMap::new);
418+
for asset in asset_list {
419+
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
420+
}
421+
}
422+
423+
ValueDeltaMap {
424+
lovelace: value.lovelace,
425+
assets,
426+
}
427+
}
428+
}
429+
430+
impl AddAssign<ValueDelta> for ValueDeltaMap {
431+
fn add_assign(&mut self, delta: ValueDelta) {
432+
self.lovelace += delta.lovelace;
433+
434+
for (policy, assets) in delta.assets {
435+
let policy_entry = self.assets.entry(policy).or_default();
436+
for asset in assets {
437+
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
438+
}
439+
}
440+
}
441+
}
442+
443+
impl From<ValueDeltaMap> for ValueDelta {
444+
fn from(map: ValueDeltaMap) -> Self {
445+
let mut assets_vec = Vec::with_capacity(map.assets.len());
446+
447+
for (policy, asset_map) in map.assets {
448+
let inner_assets = asset_map
449+
.into_iter()
450+
.map(|(name, amount)| NativeAssetDelta { name, amount })
451+
.collect();
452+
453+
assets_vec.push((policy, inner_assets));
454+
}
455+
456+
ValueDelta {
457+
lovelace: map.lovelace,
458+
assets: assets_vec,
459+
}
460+
}
461+
}
462+
405463
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
406464
pub struct ValueDelta {
407465
pub lovelace: i64,

modules/address_state/src/address_state.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,7 @@ impl AddressState {
117117
}
118118

119119
// Add deltas to volatile
120-
if let Err(e) = state.apply_address_deltas(&address_deltas_msg.deltas) {
121-
error!("address deltas handling error: {e:#}");
122-
}
120+
state.apply_address_deltas(&address_deltas_msg.deltas);
123121

124122
store = state.immutable.clone();
125123
config = state.config.clone();

modules/address_state/src/state.rs

Lines changed: 99 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use std::{
2-
collections::HashSet,
2+
collections::{HashMap, HashSet},
33
path::{Path, PathBuf},
44
sync::Arc,
55
};
66

77
use acropolis_common::{
88
Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier,
9-
ValueDelta,
9+
ValueDelta, ValueDeltaMap,
1010
};
1111
use anyhow::Result;
1212

@@ -172,10 +172,14 @@ impl State {
172172
&& block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k
173173
}
174174

175-
pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) -> Result<()> {
175+
pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) {
176176
let addresses = self.volatile.window.back_mut().expect("window should never be empty");
177177

178+
// Keeps track seen txs to avoid overcounting totals tx count and duplicating tx identifiers
179+
let mut seen: HashMap<Address, HashSet<TxIdentifier>> = HashMap::new();
180+
178181
for delta in deltas {
182+
let tx_id = TxIdentifier::from(delta.utxo);
179183
let entry = addresses.entry(delta.address.clone()).or_default();
180184

181185
if self.config.store_info {
@@ -187,18 +191,33 @@ impl State {
187191
}
188192
}
189193

190-
if self.config.store_transactions {
191-
let txs = entry.transactions.get_or_insert(Vec::new());
192-
txs.push(TxIdentifier::from(delta.utxo))
193-
}
194+
if self.config.store_transactions || self.config.store_totals {
195+
let seen_for_addr = seen.entry(delta.address.clone()).or_default();
194196

195-
if self.config.store_totals {
196-
let totals = entry.totals.get_or_insert(Vec::new());
197-
totals.push(delta.value.clone());
197+
if self.config.store_transactions {
198+
let txs = entry.transactions.get_or_insert(Vec::new());
199+
if !seen_for_addr.contains(&tx_id) {
200+
txs.push(tx_id);
201+
}
202+
}
203+
if self.config.store_totals {
204+
let totals = entry.totals.get_or_insert(Vec::new());
205+
206+
if seen_for_addr.contains(&tx_id) {
207+
if let Some(last_total) = totals.last_mut() {
208+
// Create temporary map for summing same tx deltas efficiently
209+
// TODO: Potentially move upstream to address deltas publisher
210+
let mut map = ValueDeltaMap::from(last_total.clone());
211+
map += delta.value.clone();
212+
*last_total = ValueDelta::from(map);
213+
}
214+
} else {
215+
totals.push(delta.value.clone());
216+
}
217+
}
218+
seen_for_addr.insert(tx_id);
198219
}
199220
}
200-
201-
Ok(())
202221
}
203222

204223
pub async fn get_addresses_totals(
@@ -277,7 +296,7 @@ mod tests {
277296
let deltas = vec![delta(&addr, &utxo, 1)];
278297

279298
// Apply deltas
280-
state.apply_address_deltas(&deltas)?;
299+
state.apply_address_deltas(&deltas);
281300

282301
// Verify UTxO is retrievable when in volatile
283302
let utxos = state.get_address_utxos(&addr).await?;
@@ -319,7 +338,7 @@ mod tests {
319338
let created = vec![delta(&addr, &utxo, 1)];
320339

321340
// Apply delta to volatile
322-
state.apply_address_deltas(&created)?;
341+
state.apply_address_deltas(&created);
323342

324343
// Drain volatile to immutable pending
325344
state.volatile.epoch_start_block = 1;
@@ -333,7 +352,7 @@ mod tests {
333352
assert_eq!(after_persist.as_ref().unwrap(), &[utxo]);
334353

335354
state.volatile.next_block();
336-
state.apply_address_deltas(&[delta(&addr, &utxo, -1)])?;
355+
state.apply_address_deltas(&[delta(&addr, &utxo, -1)]);
337356

338357
// Verify UTxO was removed while in volatile
339358
let after_spend_volatile = state.get_address_utxos(&addr).await?;
@@ -368,9 +387,9 @@ mod tests {
368387

369388
state.volatile.epoch_start_block = 1;
370389

371-
state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)])?;
390+
state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)]);
372391
state.volatile.next_block();
373-
state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)])?;
392+
state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)]);
374393

375394
// Verify Create and spend both in volatile is not included in address utxos
376395
let volatile = state.get_address_utxos(&addr).await?;
@@ -400,4 +419,67 @@ mod tests {
400419

401420
Ok(())
402421
}
422+
423+
#[tokio::test]
424+
async fn test_same_tx_deltas_sums_totals_in_volatile() -> Result<()> {
425+
let _ = tracing_subscriber::fmt::try_init();
426+
427+
let mut state = setup_state_and_store().await?;
428+
429+
let addr = dummy_address();
430+
let delta_1 = UTxOIdentifier::new(0, 1, 0);
431+
let delta_2 = UTxOIdentifier::new(0, 1, 1);
432+
433+
state.volatile.epoch_start_block = 1;
434+
435+
state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]);
436+
437+
// Verify only 1 totals entry with delta of 2
438+
let volatile = state
439+
.volatile
440+
.window
441+
.back()
442+
.expect("Window should have a delta")
443+
.get(&addr)
444+
.expect("Entry should be populated")
445+
.totals
446+
.as_ref()
447+
.expect("Totals should be populated");
448+
449+
assert_eq!(volatile.len(), 1);
450+
assert_eq!(volatile.first().expect("Should be populated").lovelace, 2);
451+
452+
Ok(())
453+
}
454+
455+
#[tokio::test]
456+
async fn test_same_tx_deltas_prevents_duplicate_identifier_in_volatile() -> Result<()> {
457+
let _ = tracing_subscriber::fmt::try_init();
458+
459+
let mut state = setup_state_and_store().await?;
460+
461+
let addr = dummy_address();
462+
let delta_1 = UTxOIdentifier::new(0, 1, 0);
463+
let delta_2 = UTxOIdentifier::new(0, 1, 1);
464+
465+
state.volatile.epoch_start_block = 1;
466+
467+
state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]);
468+
469+
// Verify only 1 transactions entry
470+
let volatile = state
471+
.volatile
472+
.window
473+
.back()
474+
.expect("Window should have a delta")
475+
.get(&addr)
476+
.expect("Entry should be populated")
477+
.transactions
478+
.as_ref()
479+
.expect("Transactions should be populated");
480+
481+
assert_eq!(volatile.len(), 1);
482+
483+
Ok(())
484+
}
403485
}

modules/rest_blockfrost/src/handlers/accounts.rs

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::sync::Arc;
33

44
use crate::handlers_config::HandlersConfig;
55
use crate::types::{
6-
AccountAddressREST, AccountRewardREST, AccountUTxOREST, AccountWithdrawalREST, AmountList,
7-
DelegationUpdateREST, RegistrationUpdateREST,
6+
AccountAddressREST, AccountRewardREST, AccountTotalsREST, AccountUTxOREST,
7+
AccountWithdrawalREST, AmountList, DelegationUpdateREST, RegistrationUpdateREST,
88
};
99
use acropolis_common::messages::{Message, RESTResponse, StateQuery, StateQueryResponse};
1010
use acropolis_common::queries::accounts::{AccountsStateQuery, AccountsStateQueryResponse};
@@ -563,7 +563,7 @@ pub async fn handle_account_assets_blockfrost(
563563
.await?;
564564

565565
let Some(addresses) = addresses else {
566-
return Ok(RESTResponse::with_text(404, "Account not found"));
566+
return Err(RESTError::not_found("Account not found"));
567567
};
568568

569569
// Get utxos from address state
@@ -621,11 +621,78 @@ pub async fn handle_account_assets_blockfrost(
621621

622622
/// Handle `/accounts/{stake_address}/addresses/total` Blockfrost-compatible endpoint
623623
pub async fn handle_account_totals_blockfrost(
624-
_context: Arc<Context<Message>>,
625-
_params: Vec<String>,
626-
_handlers_config: Arc<HandlersConfig>,
624+
context: Arc<Context<Message>>,
625+
params: Vec<String>,
626+
handlers_config: Arc<HandlersConfig>,
627627
) -> Result<RESTResponse, RESTError> {
628-
Err(RESTError::not_implemented("Account totals not implemented"))
628+
let account = parse_stake_address(&params)?;
629+
630+
// Get addresses from historical accounts state
631+
let msg = Arc::new(Message::StateQuery(StateQuery::Accounts(
632+
AccountsStateQuery::GetAccountAssociatedAddresses {
633+
account: account.clone(),
634+
},
635+
)));
636+
let addresses = query_state(
637+
&context,
638+
&handlers_config.historical_accounts_query_topic,
639+
msg,
640+
|message| match message {
641+
Message::StateQueryResponse(StateQueryResponse::Accounts(
642+
AccountsStateQueryResponse::AccountAssociatedAddresses(addresses),
643+
)) => Ok(Some(addresses)),
644+
Message::StateQueryResponse(StateQueryResponse::Accounts(
645+
AccountsStateQueryResponse::Error(QueryError::NotFound { .. }),
646+
)) => Ok(None),
647+
Message::StateQueryResponse(StateQueryResponse::Addresses(
648+
AddressStateQueryResponse::Error(e),
649+
)) => Err(e),
650+
_ => Err(QueryError::internal_error(
651+
"Unexpected message type while retrieving account addresses",
652+
)),
653+
},
654+
)
655+
.await?;
656+
657+
let Some(addresses) = addresses else {
658+
return Err(RESTError::not_found("Account not found"));
659+
};
660+
661+
// Get totals from address state
662+
let msg = Arc::new(Message::StateQuery(StateQuery::Addresses(
663+
AddressStateQuery::GetAddressesTotals { addresses },
664+
)));
665+
let totals = query_state(
666+
&context,
667+
&handlers_config.addresses_query_topic,
668+
msg,
669+
|message| match message {
670+
Message::StateQueryResponse(StateQueryResponse::Addresses(
671+
AddressStateQueryResponse::AddressesTotals(totals),
672+
)) => Ok(totals),
673+
Message::StateQueryResponse(StateQueryResponse::Addresses(
674+
AddressStateQueryResponse::Error(e),
675+
)) => Err(e),
676+
_ => Err(QueryError::internal_error(
677+
"Unexpected message type while retrieving account totals",
678+
)),
679+
},
680+
)
681+
.await?;
682+
683+
// TODO: Query historical accounts state to retrieve account tx count instead of
684+
// using the addresses totals as the addresses totals does not deduplicate
685+
// for multi-address transactions, overstating count
686+
687+
let rest_response = AccountTotalsREST {
688+
stake_address: account.to_string()?,
689+
received_sum: totals.received.into(),
690+
sent_sum: totals.sent.into(),
691+
tx_count: totals.tx_count,
692+
};
693+
694+
let json = serde_json::to_string_pretty(&rest_response)?;
695+
Ok(RESTResponse::with_json(200, &json))
629696
}
630697

631698
/// Handle `/accounts/{stake_address}/utxos` Blockfrost-compatible endpoint
@@ -659,7 +726,7 @@ pub async fn handle_account_utxos_blockfrost(
659726
.await?;
660727

661728
let Some(addresses) = addresses else {
662-
return Ok(RESTResponse::with_text(404, "Account not found"));
729+
return Err(RESTError::not_found("Account not found"));
663730
};
664731

665732
// Get utxos from address state

0 commit comments

Comments
 (0)