Skip to content

Commit d0331ca

Browse files
committed
Revert "*: Add --readonly-db flag disabling WS and db listeners"
This reverts commit 9e49c1c.
1 parent 212d272 commit d0331ca

File tree

10 files changed

+199
-237
lines changed

10 files changed

+199
-237
lines changed

chain/ethereum/src/block_stream.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,7 @@ where
170170
BlockStream {
171171
state: Mutex::new(BlockStreamState::New),
172172
consecutive_err_count: 0,
173-
chain_head_update_stream: chain_store
174-
.chain_head_updates()
175-
.expect("running against a readonly database"),
173+
chain_head_update_stream: chain_store.chain_head_updates(),
176174
ctx: BlockStreamContext {
177175
subgraph_store,
178176
chain_store,

core/src/subgraph/registrar.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ where
154154
.subscribe(vec![
155155
SubgraphDeploymentAssignmentEntity::subgraph_entity_pair(),
156156
])
157-
.expect("running against a readonly database")
158157
.map_err(|()| format_err!("Entity change stream failed"))
159158
.map(|event| {
160159
// We're only interested in the SubgraphDeploymentAssignment change; we

graph/src/components/store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ pub trait Store: Send + Sync + 'static {
881881
/// Subscribe to changes for specific subgraphs and entities.
882882
///
883883
/// Returns a stream of store events that match the input arguments.
884-
fn subscribe(&self, entities: Vec<SubgraphEntityPair>) -> Option<StoreEventStreamBox>;
884+
fn subscribe(&self, entities: Vec<SubgraphEntityPair>) -> StoreEventStreamBox;
885885

886886
fn resolve_subgraph_name_to_id(
887887
&self,
@@ -1304,7 +1304,7 @@ impl Store for MockStore {
13041304
unimplemented!()
13051305
}
13061306

1307-
fn subscribe(&self, _entities: Vec<SubgraphEntityPair>) -> Option<StoreEventStreamBox> {
1307+
fn subscribe(&self, _entities: Vec<SubgraphEntityPair>) -> StoreEventStreamBox {
13081308
unimplemented!()
13091309
}
13101310

@@ -1403,7 +1403,7 @@ pub trait ChainStore: Send + Sync + 'static {
14031403
fn attempt_chain_head_update(&self, ancestor_count: u64) -> Result<Vec<H256>, Error>;
14041404

14051405
/// Subscribe to chain head updates.
1406-
fn chain_head_updates(&self) -> Option<ChainHeadUpdateStream>;
1406+
fn chain_head_updates(&self) -> ChainHeadUpdateStream;
14071407

14081408
/// Get the current head block pointer for this chain.
14091409
/// Any changes to the head block pointer will be to a block with a larger block number, never

graph/src/data/query/error.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ pub enum QueryExecutionError {
6161
Panic(String),
6262
EventStreamError,
6363
FulltextQueryRequiresFilter,
64-
SubscriptionsDisabled,
6564
}
6665

6766
impl Error for QueryExecutionError {
@@ -209,7 +208,6 @@ impl fmt::Display for QueryExecutionError {
209208
Panic(msg) => write!(f, "panic processing query: {}", msg),
210209
EventStreamError => write!(f, "error in the subscription event stream"),
211210
FulltextQueryRequiresFilter => write!(f, "fulltext search queries can only use EntityFilter::Equal"),
212-
SubscriptionsDisabled => write!(f, "subscriptions are disabled"),
213211
TooExpensive => write!(f, "query is too expensive")
214212
}
215213
}

graphql/src/store/resolver.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,15 +234,11 @@ where
234234

235235
// Subscribe to the store and return the entity change stream
236236
let deployment_id = parse_subgraph_id(object_type)?;
237-
238-
match self.store.subscribe(entities) {
239-
Some(stream) => Ok(stream.throttle_while_syncing(
240-
&self.logger,
241-
self.store.clone(),
242-
deployment_id,
243-
*SUBSCRIPTION_THROTTLE_INTERVAL,
244-
)),
245-
None => Err(QueryExecutionError::SubscriptionsDisabled),
246-
}
237+
Ok(self.store.subscribe(entities).throttle_while_syncing(
238+
&self.logger,
239+
self.store.clone(),
240+
deployment_id,
241+
*SUBSCRIPTION_THROTTLE_INTERVAL,
242+
))
247243
}
248244
}

mock/src/store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ mock! {
3636

3737
fn attempt_chain_head_update(&self, ancestor_count: u64) -> Result<Vec<H256>, Error>;
3838

39-
fn chain_head_updates(&self) -> Option<ChainHeadUpdateStream>;
39+
fn chain_head_updates(&self) -> ChainHeadUpdateStream;
4040

4141
fn chain_head_ptr(&self) -> Result<Option<EthereumBlockPointer>, Error>;
4242

@@ -145,7 +145,7 @@ impl Store for MockStore {
145145
unimplemented!()
146146
}
147147

148-
fn subscribe(&self, _entities: Vec<SubgraphEntityPair>) -> Option<StoreEventStreamBox> {
148+
fn subscribe(&self, _entities: Vec<SubgraphEntityPair>) -> StoreEventStreamBox {
149149
unimplemented!()
150150
}
151151

0 commit comments

Comments
 (0)