@@ -4,7 +4,7 @@ use tokio::time::{self, Duration, Instant};
4
4
use bytes:: Bytes ;
5
5
use std:: collections:: { BTreeMap , HashMap } ;
6
6
use std:: sync:: { Arc , Mutex } ;
7
- use tracing:: debug;
7
+ use tracing:: { debug, instrument } ;
8
8
9
9
/// A wrapper around a `Db` instance. This exists to allow orderly cleanup
10
10
/// of the `Db` by signalling the background purge task to shut down when
@@ -150,6 +150,7 @@ impl Db {
150
150
/// Returns `None` if there is no value associated with the key. This may be
151
151
/// due to never having assigned a value to the key or a previously assigned
152
152
/// value expired.
153
+ #[ instrument( level = "trace" , name = "Db::get" , skip( self ) ) ]
153
154
pub ( crate ) fn get ( & self , key : & str ) -> Option < Bytes > {
154
155
// Acquire the lock, get the entry and clone the value.
155
156
//
@@ -163,6 +164,7 @@ impl Db {
163
164
/// Duration.
164
165
///
165
166
/// If a value is already associated with the key, it is removed.
167
+ #[ instrument( level = "trace" , name = "Db::set" , skip( self , value) ) ]
166
168
pub ( crate ) fn set ( & self , key : String , value : Bytes , expire : Option < Duration > ) {
167
169
let mut state = self . shared . state . lock ( ) . unwrap ( ) ;
168
170
@@ -231,6 +233,7 @@ impl Db {
231
233
///
232
234
/// The returned `Receiver` is used to receive values broadcast by `PUBLISH`
233
235
/// commands.
236
+ #[ instrument( level = "trace" , name = "Db::subscribe" , skip( self , key) ) ]
234
237
pub ( crate ) fn subscribe ( & self , key : String ) -> broadcast:: Receiver < Bytes > {
235
238
use std:: collections:: hash_map:: Entry ;
236
239
@@ -262,6 +265,7 @@ impl Db {
262
265
263
266
/// Publish a message to the channel. Returns the number of subscribers
264
267
/// listening on the channel.
268
+ #[ instrument( level = "trace" , name = "Db::publish" , skip( self , value) ) ]
265
269
pub ( crate ) fn publish ( & self , key : & str , value : Bytes ) -> usize {
266
270
let state = self . shared . state . lock ( ) . unwrap ( ) ;
267
271
@@ -279,6 +283,7 @@ impl Db {
279
283
280
284
/// Signals the purge background task to shut down. This is called by the
281
285
/// `DbShutdown`s `Drop` implementation.
286
+ #[ instrument( name = "Db::shutdown_purge_task" , skip( self ) ) ]
282
287
fn shutdown_purge_task ( & self ) {
283
288
// The background task must be signaled to shut down. This is done by
284
289
// setting `State::shutdown` to `true` and signalling the task.
@@ -296,6 +301,7 @@ impl Db {
296
301
impl Shared {
297
302
/// Purge all expired keys and return the `Instant` at which the **next**
298
303
/// key will expire. The background task will sleep until this instant.
304
+ #[ instrument( name = "Shared::purge_expired_keys" , skip( self ) ) ]
299
305
fn purge_expired_keys ( & self ) -> Option < Instant > {
300
306
let mut state = self . state . lock ( ) . unwrap ( ) ;
301
307
@@ -323,6 +329,10 @@ impl Shared {
323
329
}
324
330
325
331
// The key expired, remove it
332
+ debug ! {
333
+ key = & key. as_str( ) ,
334
+ "purged_expired_key" ,
335
+ }
326
336
state. entries . remove ( key) ;
327
337
state. expirations . remove ( & ( when, id) ) ;
328
338
}
@@ -352,6 +362,7 @@ impl State {
352
362
///
353
363
/// Wait to be notified. On notification, purge any expired keys from the shared
354
364
/// state handle. If `shutdown` is set, terminate the task.
365
+ #[ instrument( skip( shared) ) ]
355
366
async fn purge_expired_tasks ( shared : Arc < Shared > ) {
356
367
// If the shutdown flag is set, then the task should exit.
357
368
while !shared. is_shutdown ( ) {
0 commit comments