Skip to content

Commit cea35db

Browse files
authored
Fix lost statistics (#125)
* Lost events * more logging
1 parent a3aefab commit cea35db

File tree

2 files changed

+37
-17
lines changed

2 files changed

+37
-17
lines changed

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ async fn main() {
133133
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
134134

135135
// Statistics reporting.
136-
let (tx, rx) = mpsc::channel(100);
136+
let (tx, rx) = mpsc::channel(100_000);
137137
REPORTER.store(Arc::new(Reporter::new(tx.clone())));
138138

139139
// Connection pool that allows to query all shards and replicas.

src/stats.rs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use arc_swap::ArcSwap;
22
/// Statistics and reporting.
3-
use log::info;
3+
use log::{error, info, trace};
44
use once_cell::sync::Lazy;
55
use parking_lot::Mutex;
66
use std::collections::HashMap;
7+
use tokio::sync::mpsc::error::TrySendError;
78
use tokio::sync::mpsc::{channel, Receiver, Sender};
89

910
use crate::pool::get_number_of_addresses;
@@ -43,7 +44,7 @@ enum EventName {
4344

4445
/// Event data sent to the collector
4546
/// from clients and servers.
46-
#[derive(Debug)]
47+
#[derive(Debug, Clone)]
4748
pub struct Event {
4849
/// The name of the event being reported.
4950
name: EventName,
@@ -79,6 +80,25 @@ impl Reporter {
7980
Reporter { tx: tx }
8081
}
8182

83+
/// Send statistics to the task keeping track of stats.
84+
fn send(&self, event: Event) {
85+
let name = event.name;
86+
let result = self.tx.try_send(event);
87+
88+
match result {
89+
Ok(_) => trace!(
90+
"{:?} event reported successfully, capacity: {}",
91+
name,
92+
self.tx.capacity()
93+
),
94+
95+
Err(err) => match err {
96+
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
97+
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
98+
},
99+
};
100+
}
101+
82102
/// Report a query executed by a client against
83103
/// a server identified by the `address_id`.
84104
pub fn query(&self, process_id: i32, address_id: usize) {
@@ -89,7 +109,7 @@ impl Reporter {
89109
address_id: address_id,
90110
};
91111

92-
let _ = self.tx.try_send(event);
112+
self.send(event);
93113
}
94114

95115
/// Report a transaction executed by a client against
@@ -102,7 +122,7 @@ impl Reporter {
102122
address_id: address_id,
103123
};
104124

105-
let _ = self.tx.try_send(event);
125+
self.send(event)
106126
}
107127

108128
/// Report data sent to a server identified by `address_id`.
@@ -115,7 +135,7 @@ impl Reporter {
115135
address_id: address_id,
116136
};
117137

118-
let _ = self.tx.try_send(event);
138+
self.send(event)
119139
}
120140

121141
/// Report data received from a server identified by `address_id`.
@@ -128,7 +148,7 @@ impl Reporter {
128148
address_id: address_id,
129149
};
130150

131-
let _ = self.tx.try_send(event);
151+
self.send(event)
132152
}
133153

134154
/// Time spent waiting to get a healthy connection from the pool
@@ -142,7 +162,7 @@ impl Reporter {
142162
address_id: address_id,
143163
};
144164

145-
let _ = self.tx.try_send(event);
165+
self.send(event)
146166
}
147167

148168
/// Reports a client identified by `process_id` waiting for a connection
@@ -155,7 +175,7 @@ impl Reporter {
155175
address_id: address_id,
156176
};
157177

158-
let _ = self.tx.try_send(event);
178+
self.send(event)
159179
}
160180

161181
/// Reports a client identified by `process_id` is done waiting for a connection
@@ -168,7 +188,7 @@ impl Reporter {
168188
address_id: address_id,
169189
};
170190

171-
let _ = self.tx.try_send(event);
191+
self.send(event)
172192
}
173193

174194
/// Reports a client identified by `process_id` is done querying the server
@@ -181,7 +201,7 @@ impl Reporter {
181201
address_id: address_id,
182202
};
183203

184-
let _ = self.tx.try_send(event);
204+
self.send(event)
185205
}
186206

187207
/// Reports a client identified by `process_id` is disconecting from the pooler.
@@ -194,7 +214,7 @@ impl Reporter {
194214
address_id: address_id,
195215
};
196216

197-
let _ = self.tx.try_send(event);
217+
self.send(event)
198218
}
199219

200220
/// Reports a server connection identified by `process_id` for
@@ -208,7 +228,7 @@ impl Reporter {
208228
address_id: address_id,
209229
};
210230

211-
let _ = self.tx.try_send(event);
231+
self.send(event)
212232
}
213233

214234
/// Reports a server connection identified by `process_id` for
@@ -222,7 +242,7 @@ impl Reporter {
222242
address_id: address_id,
223243
};
224244

225-
let _ = self.tx.try_send(event);
245+
self.send(event)
226246
}
227247

228248
/// Reports a server connection identified by `process_id` for
@@ -236,7 +256,7 @@ impl Reporter {
236256
address_id: address_id,
237257
};
238258

239-
let _ = self.tx.try_send(event);
259+
self.send(event)
240260
}
241261

242262
/// Reports a server connection identified by `process_id` for
@@ -250,7 +270,7 @@ impl Reporter {
250270
address_id: address_id,
251271
};
252272

253-
let _ = self.tx.try_send(event);
273+
self.send(event)
254274
}
255275

256276
/// Reports a server connection identified by `process_id` is disconecting from the pooler.
@@ -263,7 +283,7 @@ impl Reporter {
263283
address_id: address_id,
264284
};
265285

266-
let _ = self.tx.try_send(event);
286+
self.send(event)
267287
}
268288
}
269289

0 commit comments

Comments
 (0)