Skip to content

Commit f57b3ba

Browse files
author
Zoran Cvetkov
committed
add block tracking
1 parent 38c4286 commit f57b3ba

File tree

3 files changed

+42
-29
lines changed

3 files changed

+42
-29
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,11 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
212212
max_spec_version: Version,
213213
is_runner_test: bool,
214214
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn SourceableStore>)>> {
215-
let mut writable_stores = Vec::new();
215+
let mut sourceable_stores = Vec::new();
216216
let subgraph_store = self.subgraph_store.clone();
217217

218218
if is_runner_test {
219-
return Ok(writable_stores);
219+
return Ok(sourceable_stores);
220220
}
221221

222222
for hash in hashes {
@@ -235,7 +235,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
235235
.active_locator(&hash)?
236236
.ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?;
237237

238-
let readable_store = subgraph_store
238+
let sourceable_store = subgraph_store
239239
.clone()
240240
.sourceable(
241241
logger.clone(),
@@ -244,10 +244,10 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
244244
)
245245
.await?;
246246

247-
writable_stores.push((loc.hash, readable_store));
247+
sourceable_stores.push((loc.hash, sourceable_store));
248248
}
249249

250-
Ok(writable_stores)
250+
Ok(sourceable_stores)
251251
}
252252

253253
pub async fn build_subgraph_runner<C>(

graph/src/components/store/traits.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -269,27 +269,6 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
269269
}
270270
}
271271

272-
pub trait SourceableStore: Send + Sync + 'static {
273-
/// Returns all versions of entities of the given entity_type that were
274-
/// changed in the given block_range.
275-
fn get_range(
276-
&self,
277-
entity_type: &EntityType,
278-
block_range: Range<BlockNumber>,
279-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
280-
}
281-
282-
// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable.
283-
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
284-
fn get_range(
285-
&self,
286-
entity_type: &EntityType,
287-
block_range: Range<BlockNumber>,
288-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
289-
(**self).get_range(entity_type, block_range)
290-
}
291-
}
292-
293272
pub trait DeploymentCursorTracker: Sync + Send + 'static {
294273
fn input_schema(&self) -> InputSchema;
295274

@@ -316,6 +295,27 @@ impl<T: ?Sized + DeploymentCursorTracker> DeploymentCursorTracker for Arc<T> {
316295
}
317296
}
318297

298+
pub trait SourceableStore: DeploymentCursorTracker {
299+
/// Returns all versions of entities of the given entity_type that were
300+
/// changed in the given block_range.
301+
fn get_range(
302+
&self,
303+
entity_type: &EntityType,
304+
block_range: Range<BlockNumber>,
305+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
306+
}
307+
308+
// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable.
309+
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
310+
fn get_range(
311+
&self,
312+
entity_type: &EntityType,
313+
block_range: Range<BlockNumber>,
314+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
315+
(**self).get_range(entity_type, block_range)
316+
}
317+
}
318+
319319
/// A view of the store for indexing. All indexing-related operations need
320320
/// to go through this trait. Methods in this trait will never return a
321321
/// `StoreError::DatabaseUnavailable`. Instead, they will retry the

store/postgres/src/writable.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,19 +1572,19 @@ impl ReadStore for WritableStore {
15721572
pub struct SourceableStore {
15731573
site: Arc<Site>,
15741574
store: Arc<DeploymentStore>,
1575-
_writable: Arc<dyn store::WritableStore>,
1575+
writable: Arc<dyn store::WritableStore>,
15761576
}
15771577

15781578
impl SourceableStore {
15791579
pub fn new(
15801580
site: Arc<Site>,
15811581
store: Arc<DeploymentStore>,
1582-
_writable: Arc<dyn store::WritableStore>,
1582+
writable: Arc<dyn store::WritableStore>,
15831583
) -> Self {
15841584
Self {
15851585
site,
15861586
store,
1587-
_writable,
1587+
writable,
15881588
}
15891589
}
15901590
}
@@ -1599,6 +1599,19 @@ impl store::SourceableStore for SourceableStore {
15991599
.get_range(self.site.clone(), entity_type, block_range)
16001600
}
16011601
}
1602+
impl DeploymentCursorTracker for SourceableStore {
1603+
fn input_schema(&self) -> InputSchema {
1604+
ReadStore::input_schema(&self.writable)
1605+
}
1606+
1607+
fn block_ptr(&self) -> Option<BlockPtr> {
1608+
self.writable.block_ptr()
1609+
}
1610+
1611+
fn firehose_cursor(&self) -> FirehoseCursor {
1612+
self.writable.firehose_cursor()
1613+
}
1614+
}
16021615

16031616
impl DeploymentCursorTracker for WritableStore {
16041617
fn block_ptr(&self) -> Option<BlockPtr> {

0 commit comments

Comments
 (0)