Skip to content

Commit 8543114

Browse files
committed
all: Change how WritableStore tracks the block pointer and cursor
1 parent baad0fe commit 8543114

File tree

6 files changed

+33
-21
lines changed

6 files changed

+33
-21
lines changed

chain/ethereum/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl Blockchain for Chain {
217217
.new(o!("component" => "FirehoseBlockStream"));
218218

219219
let firehose_mapper = Arc::new(FirehoseMapper {});
220-
let firehose_cursor = writable.block_cursor()?;
220+
let firehose_cursor = writable.block_cursor();
221221

222222
Ok(Box::new(FirehoseBlockStream::new(
223223
firehose_endpoint,

chain/near/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl Blockchain for Chain {
127127
.new(o!("component" => "FirehoseBlockStream"));
128128

129129
let firehose_mapper = Arc::new(FirehoseMapper {});
130-
let firehose_cursor = store.block_cursor()?;
130+
let firehose_cursor = store.block_cursor();
131131

132132
Ok(Box::new(FirehoseBlockStream::new(
133133
firehose_endpoint,

core/src/subgraph/instance_manager.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -386,11 +386,7 @@ where
386386

387387
// Initialize deployment_head with current deployment head. Any sort of trouble in
388388
// getting the deployment head ptr leads to initializing with 0
389-
let deployment_head = store
390-
.block_ptr()
391-
.ok()
392-
.and_then(|ptr| ptr.map(|ptr| ptr.number))
393-
.unwrap_or(0) as f64;
389+
let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64;
394390
block_stream_metrics.deployment_head.set(deployment_head);
395391

396392
let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new(
@@ -481,7 +477,7 @@ async fn new_block_stream<C: Blockchain>(
481477
inputs.unified_api_version.clone(),
482478
),
483479
false => {
484-
let start_block = inputs.store.block_ptr()?;
480+
let start_block = inputs.store.block_ptr();
485481

486482
chain.new_polling_block_stream(
487483
inputs.deployment.clone(),
@@ -677,7 +673,7 @@ where
677673
if should_try_unfail_deterministic {
678674
should_try_unfail_deterministic = false;
679675

680-
if let Some(current_ptr) = inputs.store.block_ptr()? {
676+
if let Some(current_ptr) = inputs.store.block_ptr() {
681677
if let Some(parent_ptr) =
682678
inputs.triggers_adapter.parent_ptr(&current_ptr).await?
683679
{

graph/src/components/store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,11 +1024,11 @@ pub trait SubgraphStore: Send + Sync + 'static {
10241024
#[async_trait]
10251025
pub trait WritableStore: Send + Sync + 'static {
10261026
/// Get a pointer to the most recently processed block in the subgraph.
1027-
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;
1027+
fn block_ptr(&self) -> Option<BlockPtr>;
10281028

10291029
/// Returns the Firehose `cursor` this deployment is currently at in the block stream of events. This
10301030
/// is used when re-connecting a Firehose stream to start back exactly where we left off.
1031-
fn block_cursor(&self) -> Result<Option<String>, StoreError>;
1031+
fn block_cursor(&self) -> Option<String>;
10321032

10331033
/// Start an existing subgraph deployment.
10341034
fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError>;

graph/tests/entity_cache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ impl MockStore {
4646
// The store trait must be implemented manually because mockall does not support async_trait, nor borrowing from arguments.
4747
#[async_trait]
4848
impl WritableStore for MockStore {
49-
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
49+
fn block_ptr(&self) -> Option<BlockPtr> {
5050
unimplemented!()
5151
}
5252

53-
fn block_cursor(&self) -> Result<Option<String>, StoreError> {
53+
fn block_cursor(&self) -> Option<String> {
5454
unimplemented!()
5555
}
5656

store/postgres/src/writable.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Mutex;
12
use std::time::Duration;
23
use std::{collections::BTreeMap, sync::Arc};
34

@@ -325,6 +326,8 @@ fn same_subgraph(mods: &Vec<EntityModification>, id: &DeploymentHash) -> bool {
325326
#[allow(dead_code)]
326327
pub struct WritableAgent {
327328
store: Arc<WritableStore>,
329+
block_ptr: Mutex<Option<BlockPtr>>,
330+
block_cursor: Mutex<Option<String>>,
328331
}
329332

330333
impl WritableAgent {
@@ -333,21 +336,26 @@ impl WritableAgent {
333336
logger: Logger,
334337
site: Arc<Site>,
335338
) -> Result<Self, StoreError> {
339+
let store = Arc::new(WritableStore::new(subgraph_store, logger, site)?);
340+
let block_ptr = Mutex::new(store.block_ptr()?);
341+
let block_cursor = Mutex::new(store.block_cursor()?);
336342
Ok(Self {
337-
store: Arc::new(WritableStore::new(subgraph_store, logger, site)?),
343+
store,
344+
block_ptr,
345+
block_cursor,
338346
})
339347
}
340348
}
341349

342350
#[allow(unused_variables)]
343351
#[async_trait::async_trait]
344352
impl WritableStoreTrait for WritableAgent {
345-
fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
346-
self.store.block_ptr()
353+
fn block_ptr(&self) -> Option<BlockPtr> {
354+
self.block_ptr.lock().unwrap().clone()
347355
}
348356

349-
fn block_cursor(&self) -> Result<Option<String>, StoreError> {
350-
self.store.block_cursor()
357+
fn block_cursor(&self) -> Option<String> {
358+
self.block_cursor.lock().unwrap().clone()
351359
}
352360

353361
fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> {
@@ -356,6 +364,9 @@ impl WritableStoreTrait for WritableAgent {
356364
}
357365

358366
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
367+
*self.block_ptr.lock().unwrap() = Some(block_ptr_to.clone());
368+
// FIXME: What about the firehose cursor? Why doesn't that get updated?
369+
359370
// TODO: If we haven't written the block yet, revert in memory. If
360371
// we have, revert in the database
361372
self.store.revert_block_operations(block_ptr_to)
@@ -396,13 +407,18 @@ impl WritableStoreTrait for WritableAgent {
396407
deterministic_errors: Vec<SubgraphError>,
397408
) -> Result<(), StoreError> {
398409
self.store.transact_block_operations(
399-
block_ptr_to,
400-
firehose_cursor,
410+
block_ptr_to.clone(),
411+
firehose_cursor.clone(),
401412
mods,
402413
stopwatch,
403414
data_sources,
404415
deterministic_errors,
405-
)
416+
)?;
417+
418+
*self.block_ptr.lock().unwrap() = Some(block_ptr_to);
419+
*self.block_cursor.lock().unwrap() = firehose_cursor;
420+
421+
Ok(())
406422
}
407423

408424
fn get_many(

0 commit comments

Comments
 (0)