Skip to content

Commit fcccf3c

Browse files
committed
fix: refactor async persist task
1 parent 954c1ff commit fcccf3c

File tree

1 file changed

+31
-22
lines changed

1 file changed

+31
-22
lines changed

sentry/src/event_aggregator.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use crate::Application;
66
use crate::ResponseError;
77
use crate::Session;
88
use async_std::sync::RwLock;
9-
use chrono::{Duration, Utc};
9+
use chrono::Utc;
1010
use primitives::adapter::Adapter;
1111
use primitives::sentry::{Event, EventAggregate};
1212
use primitives::{Channel, ChannelId};
1313
use std::collections::HashMap;
1414
use std::sync::Arc;
15-
use std::time::Duration as TimeDuration;
15+
use std::time::Duration;
1616
use tokio::time::delay_for;
1717

1818
#[derive(Default, Clone)]
@@ -66,45 +66,54 @@ impl EventAggregator {
6666
}
6767

6868
let mut recorder = self.aggregate.write().await;
69+
let aggr_throttle = app.config.aggr_throttle;
70+
let dbpool = app.pool.clone();
71+
let aggregate = self.aggregate.clone();
72+
let withdraw_period_start = channel.spec.withdraw_period_start;
73+
let channel_id = channel.id;
74+
6975
let mut aggr: &mut EventAggregate =
7076
if let Some(aggr) = recorder.get_mut(&channel.id.to_string()) {
7177
aggr
7278
} else {
7379
// insert into
7480
recorder.insert(channel.id.to_string(), new_aggr(&channel.id));
81+
82+
// spawn async task that persists
83+
// the channel events to database
84+
if aggr_throttle > 0 {
85+
tokio::spawn(async move {
86+
loop {
87+
// break loop if the
88+
// channel withdraw period has started
89+
// since no event is allowed once a channel
90+
// is in withdraw period
91+
92+
if Utc::now() > withdraw_period_start {
93+
break;
94+
}
95+
96+
delay_for(Duration::from_secs(aggr_throttle as u64)).await;
97+
store(&dbpool, &channel_id, aggregate.clone()).await;
98+
}
99+
});
100+
}
101+
75102
recorder
76103
.get_mut(&channel.id.to_string())
77104
.expect("should have aggr, we just inserted")
78105
};
79106

80-
// if aggr is none
81107
events
82108
.iter()
83109
.for_each(|ev| event_reducer::reduce(&channel, &mut aggr, ev));
84-
let created = aggr.created;
85-
let dbpool = app.pool.clone();
86-
let aggr_throttle = app.config.aggr_throttle;
87-
let aggregate = self.aggregate.clone();
88110

89111
// drop write access to RwLock
90112
// this is required to prevent a deadlock in store
91113
drop(recorder);
92114

93-
// Checks if aggr_throttle is set
94-
// and if current time is greater than aggr.created plus throttle seconds
95-
//
96-
// This approach spawns an async task every > AGGR_THROTTLE seconds
97-
// Each spawned task resolves after AGGR_THROTTLE seconds
98-
//
99-
100-
if aggr_throttle > 0 && Utc::now() > (created + Duration::seconds(aggr_throttle as i64)) {
101-
// spawn a tokio task for saving to database
102-
tokio::spawn(async move {
103-
delay_for(TimeDuration::from_secs(aggr_throttle as u64)).await;
104-
store(&dbpool, &channel.id, aggregate).await;
105-
});
106-
} else {
107-
store(&app.pool, &channel.id, aggregate).await;
115+
if aggr_throttle == 0 {
116+
store(&app.pool, &channel.id, self.aggregate.clone()).await;
108117
}
109118

110119
Ok(())

0 commit comments

Comments
 (0)