Skip to content

Commit 65d1457

Browse files
committed
fix: add last_allocation_id to indexing_agreements, and improve get_by_id
1 parent abcd7fe commit 65d1457

7 files changed

+87
-56
lines changed

.sqlx/query-47f848e049f3ff22e65bb53edc7ddc1646e68d6db58124b6e0d780c037f73513.json

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-cb01d485323ed4b697d3c0c645f5174cc42ea3884e5b54ffb5c94605637d0baa.json renamed to .sqlx/query-5e4575f3b015895c5ba3f066c5d010a09c48290e2200fc1c6426234192ce0f82.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-d0258d53c19174720f2d4d9cde481e457931b65eca471efed650b10d70d2282a.json

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/dips/src/lib.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,11 @@ pub async fn validate_and_cancel_agreement(
310310
decoded_request.request.agreement_id.into(),
311311
))
312312
.await?;
313-
let (agreement, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
314-
if cancelled {
313+
let stored_agreement = result.ok_or(DipsError::AgreementNotFound)?;
314+
if stored_agreement.cancelled {
315315
return Err(DipsError::AgreementCancelled);
316316
}
317-
let expected_signer = agreement.voucher.payer;
317+
let expected_signer = stored_agreement.voucher.voucher.payer;
318318
let id = Uuid::from_bytes(decoded_request.request.agreement_id.into());
319319
decoded_request.validate(domain, &expected_signer)?;
320320

@@ -391,11 +391,10 @@ mod test {
391391
.unwrap();
392392
assert_eq!(actual_id, id);
393393

394-
let actual = store.get_by_id(actual_id).await.unwrap();
394+
let stored_agreement = store.get_by_id(actual_id).await.unwrap().unwrap();
395395

396-
let (actual_voucher, actual_cancelled) = actual.unwrap();
397-
assert_eq!(voucher, actual_voucher);
398-
assert!(!actual_cancelled);
396+
assert_eq!(voucher, stored_agreement.voucher);
397+
assert!(!stored_agreement.cancelled);
399398
Ok(())
400399
}
401400

@@ -596,9 +595,8 @@ mod test {
596595
assert_eq!(agreement_id, cancelled_id);
597596

598597
// Verify agreement is cancelled
599-
let result = store.get_by_id(agreement_id).await?;
600-
let (_, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
601-
assert!(cancelled);
598+
let stored_agreement = store.get_by_id(agreement_id).await?.unwrap();
599+
assert!(stored_agreement.cancelled);
602600

603601
Ok(())
604602
}

crates/dips/src/store.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@ use crate::{
1111
SubgraphIndexingVoucherMetadata,
1212
};
1313

14+
#[derive(Debug, Clone)]
15+
pub struct StoredIndexingAgreement {
16+
pub voucher: SignedIndexingAgreementVoucher,
17+
pub metadata: SubgraphIndexingVoucherMetadata,
18+
pub cancelled: bool,
19+
pub current_allocation_id: Option<String>,
20+
pub last_allocation_id: Option<String>,
21+
}
22+
1423
#[async_trait]
1524
pub trait AgreementStore: Sync + Send + std::fmt::Debug {
16-
async fn get_by_id(
17-
&self,
18-
id: Uuid,
19-
) -> Result<Option<(SignedIndexingAgreementVoucher, bool)>, DipsError>;
25+
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError>;
2026
async fn create_agreement(
2127
&self,
2228
agreement: SignedIndexingAgreementVoucher,
@@ -30,15 +36,12 @@ pub trait AgreementStore: Sync + Send + std::fmt::Debug {
3036

3137
#[derive(Default, Debug)]
3238
pub struct InMemoryAgreementStore {
33-
pub data: tokio::sync::RwLock<HashMap<Uuid, (SignedIndexingAgreementVoucher, bool)>>,
39+
pub data: tokio::sync::RwLock<HashMap<Uuid, StoredIndexingAgreement>>,
3440
}
3541

3642
#[async_trait]
3743
impl AgreementStore for InMemoryAgreementStore {
38-
async fn get_by_id(
39-
&self,
40-
id: Uuid,
41-
) -> Result<Option<(SignedIndexingAgreementVoucher, bool)>, DipsError> {
44+
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError> {
4245
Ok(self
4346
.data
4447
.try_read()
@@ -49,15 +52,20 @@ impl AgreementStore for InMemoryAgreementStore {
4952
async fn create_agreement(
5053
&self,
5154
agreement: SignedIndexingAgreementVoucher,
52-
_medatadata: SubgraphIndexingVoucherMetadata,
55+
metadata: SubgraphIndexingVoucherMetadata,
5356
) -> Result<(), DipsError> {
57+
let id = Uuid::from_bytes(agreement.voucher.agreement_id.into());
58+
let stored_agreement = StoredIndexingAgreement {
59+
voucher: agreement,
60+
metadata,
61+
cancelled: false,
62+
current_allocation_id: None,
63+
last_allocation_id: None,
64+
};
5465
self.data
5566
.try_write()
5667
.map_err(|e| DipsError::UnknownError(e.into()))?
57-
.insert(
58-
Uuid::from_bytes(agreement.voucher.agreement_id.into()),
59-
(agreement.clone(), false),
60-
);
68+
.insert(id, stored_agreement);
6169

6270
Ok(())
6371
}
@@ -67,7 +75,7 @@ impl AgreementStore for InMemoryAgreementStore {
6775
) -> Result<Uuid, DipsError> {
6876
let id = Uuid::from_bytes(signed_cancellation.request.agreement_id.into());
6977

70-
let agreement = {
78+
let mut agreement = {
7179
let read_lock = self
7280
.data
7381
.try_read()
@@ -78,11 +86,17 @@ impl AgreementStore for InMemoryAgreementStore {
7886
.ok_or(DipsError::AgreementNotFound)?
7987
};
8088

89+
if agreement.cancelled {
90+
return Err(DipsError::AgreementCancelled);
91+
}
92+
93+
agreement.cancelled = true;
94+
8195
let mut write_lock = self
8296
.data
8397
.try_write()
8498
.map_err(|e| DipsError::UnknownError(e.into()))?;
85-
write_lock.insert(id, (agreement.0, true));
99+
write_lock.insert(id, agreement);
86100

87101
Ok(id)
88102
}

crates/service/src/database/dips.rs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use std::str::FromStr;
66
use axum::async_trait;
77
use build_info::chrono::{DateTime, Utc};
88
use indexer_dips::{
9-
store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
9+
store::{AgreementStore, StoredIndexingAgreement},
10+
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
1011
SubgraphIndexingVoucherMetadata,
1112
};
1213
use sqlx::{types::BigDecimal, PgPool};
@@ -25,10 +26,7 @@ fn uint256_to_bigdecimal(value: &uint256, field: &str) -> Result<BigDecimal, Dip
2526

2627
#[async_trait]
2728
impl AgreementStore for PsqlAgreementStore {
28-
async fn get_by_id(
29-
&self,
30-
id: Uuid,
31-
) -> Result<Option<(SignedIndexingAgreementVoucher, bool)>, DipsError> {
29+
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError> {
3230
let item = sqlx::query!("SELECT * FROM indexing_agreements WHERE id=$1", id,)
3331
.fetch_one(&self.pool)
3432
.await;
@@ -41,8 +39,17 @@ impl AgreementStore for PsqlAgreementStore {
4139

4240
let signed = SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true)
4341
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
42+
let metadata =
43+
SubgraphIndexingVoucherMetadata::abi_decode(signed.voucher.metadata.as_ref(), true)
44+
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
4445
let cancelled = item.cancelled_at.is_some();
45-
Ok(Some((signed, cancelled)))
46+
Ok(Some(StoredIndexingAgreement {
47+
voucher: signed,
48+
metadata,
49+
cancelled,
50+
current_allocation_id: item.current_allocation_id,
51+
last_allocation_id: item.last_allocation_id,
52+
}))
4653
}
4754
async fn create_agreement(
4855
&self,
@@ -72,7 +79,7 @@ impl AgreementStore for PsqlAgreementStore {
7279
let min_epochs_per_collection: i64 = agreement.voucher.minEpochsPerCollection.into();
7380
let max_epochs_per_collection: i64 = agreement.voucher.maxEpochsPerCollection.into();
7481
sqlx::query!(
75-
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null)",
82+
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null)",
7683
id,
7784
agreement.signature.as_ref(),
7885
bs,
@@ -130,7 +137,7 @@ pub(crate) mod test {
130137
use sqlx::PgPool;
131138
use thegraph_core::alloy::{
132139
primitives::{ruint::aliases::U256, Address},
133-
sol_types::{SolType, SolValue},
140+
sol_types::SolValue,
134141
};
135142
use uuid::Uuid;
136143

@@ -226,19 +233,15 @@ pub(crate) mod test {
226233
.unwrap();
227234

228235
// Retrieve agreement
229-
let (retrieved_signed_voucher, cancelled) = store.get_by_id(id).await.unwrap().unwrap();
230-
231-
let retrieved_voucher = &retrieved_signed_voucher.voucher;
232-
let retrieved_metadata =
233-
<indexer_dips::SubgraphIndexingVoucherMetadata as SolType>::abi_decode(
234-
retrieved_voucher.metadata.as_ref(),
235-
true,
236-
)
237-
.unwrap();
236+
let stored_agreement = store.get_by_id(id).await.unwrap().unwrap();
237+
238+
let retrieved_voucher = &stored_agreement.voucher;
239+
let retrieved_metadata = stored_agreement.metadata;
240+
238241
// Verify retrieved agreement matches original
239-
assert_eq!(retrieved_signed_voucher.signature, agreement.signature);
242+
assert_eq!(retrieved_voucher.signature, agreement.signature);
240243
assert_eq!(
241-
retrieved_voucher.durationEpochs,
244+
retrieved_voucher.voucher.durationEpochs,
242245
agreement.voucher.durationEpochs
243246
);
244247
assert_eq!(retrieved_metadata.protocolNetwork, metadata.protocolNetwork);
@@ -247,26 +250,29 @@ pub(crate) mod test {
247250
retrieved_metadata.subgraphDeploymentId,
248251
metadata.subgraphDeploymentId
249252
);
250-
assert_eq!(retrieved_voucher.payer, agreement.voucher.payer);
251-
assert_eq!(retrieved_voucher.recipient, agreement.voucher.recipient);
252-
assert_eq!(retrieved_voucher.service, agreement.voucher.service);
253+
assert_eq!(retrieved_voucher.voucher.payer, agreement.voucher.payer);
254+
assert_eq!(
255+
retrieved_voucher.voucher.recipient,
256+
agreement.voucher.recipient
257+
);
258+
assert_eq!(retrieved_voucher.voucher.service, agreement.voucher.service);
253259
assert_eq!(
254-
retrieved_voucher.maxInitialAmount,
260+
retrieved_voucher.voucher.maxInitialAmount,
255261
agreement.voucher.maxInitialAmount
256262
);
257263
assert_eq!(
258-
retrieved_voucher.maxOngoingAmountPerEpoch,
264+
retrieved_voucher.voucher.maxOngoingAmountPerEpoch,
259265
agreement.voucher.maxOngoingAmountPerEpoch
260266
);
261267
assert_eq!(
262-
retrieved_voucher.maxEpochsPerCollection,
268+
retrieved_voucher.voucher.maxEpochsPerCollection,
263269
agreement.voucher.maxEpochsPerCollection
264270
);
265271
assert_eq!(
266-
retrieved_voucher.minEpochsPerCollection,
272+
retrieved_voucher.voucher.minEpochsPerCollection,
267273
agreement.voucher.minEpochsPerCollection
268274
);
269-
assert!(!cancelled);
275+
assert!(!stored_agreement.cancelled);
270276
}
271277

272278
#[sqlx::test(migrations = "../../migrations")]

migrations/20241030141929_dips.up.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ CREATE TABLE IF NOT EXISTS indexing_agreements (
2828
cancelled_at TIMESTAMP WITH TIME ZONE,
2929
signed_cancellation_payload BYTEA,
3030

31-
current_allocation_id CHAR(40)
31+
current_allocation_id CHAR(40),
32+
last_allocation_id CHAR(40)
3233
);
3334

3435
CREATE UNIQUE INDEX IX_UNIQ_SIGNATURE_AGREEMENT on indexing_agreements(signature);

0 commit comments

Comments
 (0)