|
1 | 1 | use std::cmp; |
2 | 2 | use std::collections::{HashMap, VecDeque}; |
3 | 3 | use std::mem; |
4 | | -use std::sync::Mutex; |
5 | 4 | use std::time::Duration; |
6 | 5 |
|
7 | 6 | use graph::components::ethereum::{ |
@@ -122,7 +121,7 @@ impl<S, C> Clone for BlockStreamContext<S, C> { |
122 | 121 | } |
123 | 122 |
|
124 | 123 | pub struct BlockStream<S, C> { |
125 | | - state: Mutex<BlockStreamState>, |
| 124 | + state: BlockStreamState, |
126 | 125 | consecutive_err_count: u32, |
127 | 126 | chain_head_update_stream: ChainHeadUpdateStream, |
128 | 127 | ctx: BlockStreamContext<S, C>, |
@@ -159,7 +158,7 @@ where |
159 | 158 | metrics: Arc<BlockStreamMetrics>, |
160 | 159 | ) -> Self { |
161 | 160 | BlockStream { |
162 | | - state: Mutex::new(BlockStreamState::BeginReconciliation), |
| 161 | + state: BlockStreamState::BeginReconciliation, |
163 | 162 | consecutive_err_count: 0, |
164 | 163 | chain_head_update_stream: chain_store.chain_head_updates(), |
165 | 164 | ctx: BlockStreamContext { |
@@ -563,11 +562,8 @@ impl<S: Store, C: ChainStore> Stream for BlockStream<S, C> { |
563 | 562 | type Error = Error; |
564 | 563 |
|
565 | 564 | fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |
566 | | - // Lock Mutex to perform a state transition |
567 | | - let mut state_lock = self.state.lock().unwrap(); |
568 | | - |
569 | 565 | let mut state = BlockStreamState::Transition; |
570 | | - mem::swap(&mut *state_lock, &mut state); |
| 566 | + mem::swap(&mut self.state, &mut state); |
571 | 567 |
|
572 | 568 | let result = loop { |
573 | 569 | match state { |
@@ -717,7 +713,7 @@ impl<S: Store, C: ChainStore> Stream for BlockStream<S, C> { |
717 | 713 | } |
718 | 714 | }; |
719 | 715 |
|
720 | | - *state_lock = state; |
| 716 | + self.state = state; |
721 | 717 |
|
722 | 718 | result |
723 | 719 | } |
|
0 commit comments