Skip to content

Commit 990ef4d

Browse files
authored
Store synced_at_block_number when a deployment syncs (#5610)
This is a follow-up to #5566
1 parent 6cfc444 commit 990ef4d

File tree

12 files changed

+44
-19
lines changed

12 files changed

+44
-19
lines changed

graph/src/components/store/traits.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
350350
) -> Result<(), StoreError>;
351351

352352
/// Force synced status, used for testing.
353-
fn deployment_synced(&self) -> Result<(), StoreError>;
353+
fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError>;
354354

355355
/// Return true if the deployment with the given id is fully synced, and return false otherwise.
356356
/// Cheap, cached operation.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE subgraphs.subgraph_deployment DROP COLUMN synced_at_block_number;
2+
ALTER TABLE unused_deployments DROP COLUMN synced_at_block_number;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE subgraphs.subgraph_deployment ADD COLUMN synced_at_block_number INT4;
2+
ALTER TABLE unused_deployments ADD COLUMN synced_at_block_number INT4;

store/postgres/src/deployment.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ table! {
133133
failed -> Bool,
134134
health -> crate::deployment::SubgraphHealthMapping,
135135
synced_at -> Nullable<Timestamptz>,
136+
synced_at_block_number -> Nullable<Int4>,
136137
fatal_error -> Nullable<Text>,
137138
non_fatal_errors -> Array<Text>,
138139
earliest_block_number -> Integer,
@@ -731,15 +732,22 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result<DeploymentSt
731732
}
732733

733734
/// Mark the deployment `id` as synced
734-
pub fn set_synced(conn: &mut PgConnection, id: &DeploymentHash) -> Result<(), StoreError> {
735+
pub fn set_synced(
736+
conn: &mut PgConnection,
737+
id: &DeploymentHash,
738+
block_ptr: BlockPtr,
739+
) -> Result<(), StoreError> {
735740
use subgraph_deployment as d;
736741

737742
update(
738743
d::table
739744
.filter(d::deployment.eq(id.as_str()))
740745
.filter(d::synced_at.is_null()),
741746
)
742-
.set(d::synced_at.eq(now))
747+
.set((
748+
d::synced_at.eq(now),
749+
d::synced_at_block_number.eq(block_ptr.number),
750+
))
743751
.execute(conn)?;
744752
Ok(())
745753
}

store/postgres/src/deployment_store.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,9 +563,13 @@ impl DeploymentStore {
563563
deployment::exists_and_synced(&mut conn, id.as_str())
564564
}
565565

566-
pub(crate) fn deployment_synced(&self, id: &DeploymentHash) -> Result<(), StoreError> {
566+
pub(crate) fn deployment_synced(
567+
&self,
568+
id: &DeploymentHash,
569+
block_ptr: BlockPtr,
570+
) -> Result<(), StoreError> {
567571
let mut conn = self.get_conn()?;
568-
conn.transaction(|conn| deployment::set_synced(conn, id))
572+
conn.transaction(|conn| deployment::set_synced(conn, id, block_ptr))
569573
}
570574

571575
/// Look up the on_sync action for this deployment

store/postgres/src/detail.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub struct DeploymentDetail {
5050
pub failed: bool,
5151
health: HealthType,
5252
pub synced_at: Option<DateTime<Utc>>,
53+
pub synced_at_block_number: Option<i32>,
5354
fatal_error: Option<String>,
5455
non_fatal_errors: Vec<String>,
5556
/// The earliest block for which we have history

store/postgres/src/primary.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ table! {
180180
latest_ethereum_block_number -> Nullable<Integer>,
181181
failed -> Bool,
182182
synced_at -> Nullable<Timestamptz>,
183+
synced_at_block_number -> Nullable<Int4>,
183184
}
184185
}
185186

@@ -233,6 +234,7 @@ pub struct UnusedDeployment {
233234
pub latest_ethereum_block_number: Option<i32>,
234235
pub failed: bool,
235236
pub synced_at: Option<DateTime<Utc>>,
237+
pub synced_at_block_number: Option<i32>,
236238
}
237239

238240
#[derive(Clone, Debug, PartialEq, Eq, Hash, AsExpression, FromSqlRow)]
@@ -1681,6 +1683,7 @@ impl<'a> Connection<'a> {
16811683
u::latest_ethereum_block_number.eq(latest_number),
16821684
u::failed.eq(detail.failed),
16831685
u::synced_at.eq(detail.synced_at),
1686+
u::synced_at_block_number.eq(detail.synced_at_block_number.clone()),
16841687
))
16851688
.execute(self.conn.as_mut())?;
16861689
}

store/postgres/src/writable.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ impl SyncStore {
420420
}
421421
}
422422

423-
fn deployment_synced(&self) -> Result<(), StoreError> {
423+
fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError> {
424424
retry::forever(&self.logger, "deployment_synced", || {
425425
let event = {
426426
// Make sure we drop `pconn` before we call into the deployment
@@ -452,7 +452,8 @@ impl SyncStore {
452452
}
453453
}
454454

455-
self.writable.deployment_synced(&self.site.deployment)?;
455+
self.writable
456+
.deployment_synced(&self.site.deployment, block_ptr.clone())?;
456457

457458
self.store.send_store_event(&event)
458459
})
@@ -1659,7 +1660,7 @@ impl WritableStoreTrait for WritableStore {
16591660
is_caught_up_with_chain_head: bool,
16601661
) -> Result<(), StoreError> {
16611662
if is_caught_up_with_chain_head {
1662-
self.deployment_synced()?;
1663+
self.deployment_synced(block_ptr_to.clone())?;
16631664
} else {
16641665
self.writer.start_batching();
16651666
}
@@ -1696,10 +1697,10 @@ impl WritableStoreTrait for WritableStore {
16961697
/// - Disable the time-to-sync metrics gathering.
16971698
/// - Stop batching writes.
16981699
/// - Promote it to 'synced' status in the DB, if that hasn't been done already.
1699-
fn deployment_synced(&self) -> Result<(), StoreError> {
1700+
fn deployment_synced(&self, block_ptr: BlockPtr) -> Result<(), StoreError> {
17001701
self.writer.deployment_synced();
17011702
if !self.is_deployment_synced.load(Ordering::SeqCst) {
1702-
self.store.deployment_synced()?;
1703+
self.store.deployment_synced(block_ptr)?;
17031704
self.is_deployment_synced.store(true, Ordering::SeqCst);
17041705
}
17051706
Ok(())

store/test-store/tests/graph/entity_cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ impl WritableStore for MockStore {
156156
unimplemented!()
157157
}
158158

159-
fn deployment_synced(&self) -> Result<(), StoreError> {
159+
fn deployment_synced(&self, _block_ptr: BlockPtr) -> Result<(), StoreError> {
160160
unimplemented!()
161161
}
162162

store/test-store/tests/postgres/graft.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ fn on_sync() {
482482
.await?;
483483

484484
writable.start_subgraph_deployment(&LOGGER).await?;
485-
writable.deployment_synced()?;
485+
writable.deployment_synced(BLOCKS[0].clone())?;
486486

487487
let mut primary = primary_connection();
488488
let src_site = primary.locate_site(src)?.unwrap();
@@ -539,7 +539,7 @@ fn on_sync() {
539539
store.activate(&dst)?;
540540
store.remove_deployment(src.id.into())?;
541541

542-
let res = writable.deployment_synced();
542+
let res = writable.deployment_synced(BLOCKS[2].clone());
543543
assert!(res.is_ok());
544544
}
545545
Ok(())

0 commit comments

Comments
 (0)