1
1
use crate :: access:: check_access;
2
2
use crate :: db:: event_aggregate:: insert_event_aggregate;
3
+ use crate :: db:: get_channel_by_id;
3
4
use crate :: db:: DbPool ;
4
5
use crate :: event_reducer;
5
6
use crate :: Application ;
@@ -15,21 +16,18 @@ use std::collections::HashMap;
15
16
use std:: sync:: Arc ;
16
17
use std:: time:: Duration ;
17
18
use tokio:: time:: delay_for;
18
- use crate :: db:: { get_channel_by_id} ;
19
-
20
19
21
20
#[ derive( Debug ) ]
22
21
struct Record {
23
22
channel : Channel ,
24
- aggregate : EventAggregate
23
+ aggregate : EventAggregate ,
25
24
}
26
25
27
- type Recorder = Arc < RwLock < HashMap < ChannelId , Record > > > ;
26
+ type Recorder = Arc < RwLock < HashMap < ChannelId , Record > > > ;
28
27
29
28
#[ derive( Default , Clone ) ]
30
29
pub struct EventAggregator {
31
- // aggregate: Aggregate,
32
- recorder : Recorder
30
+ recorder : Recorder ,
33
31
}
34
32
35
33
pub fn new_aggr ( channel_id : & ChannelId ) -> EventAggregate {
@@ -45,12 +43,12 @@ async fn store(db: &DbPool, channel_id: &ChannelId, logger: &Logger, recorder: R
45
43
let record: Option < & Record > = channel_recorder. get ( channel_id) ;
46
44
if let Some ( data) = record {
47
45
if let Err ( e) = insert_event_aggregate ( & db, & channel_id, & data. aggregate ) . await {
48
- error ! ( & logger, "{}" , e; "event_aggregator" => "store" ) ;
46
+ error ! ( & logger, "{}" , e; "module" => " event_aggregator" , "in " => "store" ) ;
49
47
} else {
50
48
// reset aggr record
51
49
let record = Record {
52
50
channel : data. channel . to_owned ( ) ,
53
- aggregate : new_aggr ( & channel_id)
51
+ aggregate : new_aggr ( & channel_id) ,
54
52
} ;
55
53
channel_recorder. insert ( channel_id. to_owned ( ) , record) ;
56
54
} ;
@@ -76,24 +74,24 @@ impl EventAggregator {
76
74
None => {
77
75
// fetch channel
78
76
let channel = get_channel_by_id ( & app. pool , & channel_id)
79
- . await ?
80
- . ok_or_else ( || ResponseError :: NotFound ) ?;
77
+ . await ?
78
+ . ok_or_else ( || ResponseError :: NotFound ) ?;
81
79
82
80
let withdraw_period_start = channel. spec . withdraw_period_start ;
83
81
let channel_id = channel. id ;
84
82
let record = Record {
85
83
channel,
86
- aggregate : new_aggr ( & channel_id)
84
+ aggregate : new_aggr ( & channel_id) ,
87
85
} ;
88
86
89
87
// insert into
90
88
channel_recorder. insert ( channel_id. to_owned ( ) , record) ;
91
89
92
- //
90
+ //
93
91
// spawn async task that persists
94
92
// the channel events to database
95
- let recorder = recorder. clone ( ) ;
96
93
if aggr_throttle > 0 {
94
+ let recorder = recorder. clone ( ) ;
97
95
tokio:: spawn ( async move {
98
96
loop {
99
97
// break loop if the
0 commit comments