Skip to content

Commit ce8336e

Browse files
committed
store events: Refactor to abstract event sink
1 parent 1863bce commit ce8336e

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

store/postgres/src/store_events.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,27 @@ impl StoreEventListener {
8383
}
8484
}
8585

86+
#[async_trait]
87+
trait EventSink: Send + Sync {
88+
async fn send(&self, event: Arc<StoreEvent>) -> Result<(), Error>;
89+
fn is_closed(&self) -> bool;
90+
}
91+
92+
#[async_trait]
93+
impl EventSink for Sender<Arc<StoreEvent>> {
94+
async fn send(&self, event: Arc<StoreEvent>) -> Result<(), Error> {
95+
Ok(self.send(event).await?)
96+
}
97+
98+
fn is_closed(&self) -> bool {
99+
self.is_closed()
100+
}
101+
}
102+
86103
/// Manage subscriptions to the `StoreEvent` stream. Keep a list of
87104
/// currently active subscribers and forward new events to each of them
88105
pub struct SubscriptionManager {
89-
subscriptions:
90-
Arc<RwLock<HashMap<String, (Arc<Vec<SubscriptionFilter>>, Sender<Arc<StoreEvent>>)>>>,
106+
subscriptions: Arc<RwLock<HashMap<String, (Arc<Vec<SubscriptionFilter>>, Arc<dyn EventSink>)>>>,
91107

92108
/// Keep the notification listener alive
93109
listener: StoreEventListener,
@@ -182,7 +198,7 @@ impl SubscriptionManagerTrait for SubscriptionManager {
182198
self.subscriptions
183199
.write()
184200
.unwrap()
185-
.insert(id, (Arc::new(entities.clone()), sender));
201+
.insert(id, (Arc::new(entities.clone()), Arc::new(sender)));
186202

187203
// Return the subscription ID and entity change stream
188204
StoreEventStream::new(Box::new(ReceiverStream::new(receiver).map(Ok).compat()))

0 commit comments

Comments
 (0)