Skip to content

Commit f7f87b1

Browse files
committed
Add additional tracing instrumentation.
This commit lays the ground-work for making mini-redis a better example of an instrumented tokio application. While it does not go so far to turn mini-redis into a fully-featured guide for using `tracing`, it ensures that people who use mini-redis as a basis for experimenting with different subscribers get a more complete experience out-of-the-box.
1 parent 4b4ecf0 commit f7f87b1

File tree

12 files changed

+340
-270
lines changed

12 files changed

+340
-270
lines changed

Cargo.lock

Lines changed: 152 additions & 206 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ bytes = "1"
2727
structopt = "0.3.14"
2828
tokio = { version = "1", features = ["full"] }
2929
tokio-stream = "0.1"
30-
tracing = "0.1.13"
30+
tracing = "0.1.34"
3131
tracing-futures = { version = "0.2.3" }
32-
tracing-subscriber = "0.2.2"
32+
tracing-subscriber = "0.3"
3333

3434
[dev-dependencies]
3535
# Enable test-utilities in dev mode only. This is mostly for tests.

src/cmd/get.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Get {
4747
/// ```text
4848
/// GET key
4949
/// ```
50+
#[instrument(level = "trace", name = "Get::parse_frames", skip(parse))]
5051
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Get> {
5152
// The `GET` string has already been consumed. The next value is the
5253
// name of the key to get. If the next value is not a string or the
@@ -60,7 +61,14 @@ impl Get {
6061
///
6162
/// The response is written to `dst`. This is called by the server in order
6263
/// to execute a received command.
63-
#[instrument(skip(self, db, dst))]
64+
#[instrument(
65+
level = "trace",
66+
name = "Get::apply",
67+
skip(self, db, dst),
68+
fields(
69+
key = self.key.as_str(),
70+
),
71+
)]
6472
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
6573
// Get the value from the shared database state
6674
let response = if let Some(value) = db.get(&self.key) {

src/cmd/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod unknown;
1717
pub use unknown::Unknown;
1818

1919
use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown};
20+
use tracing::instrument;
2021

2122
/// Enumeration of supported Redis commands.
2223
///
@@ -41,6 +42,7 @@ impl Command {
4142
/// # Returns
4243
///
4344
/// On success, the command value is returned, otherwise, `Err` is returned.
45+
#[instrument(level = "trace", name = "Command::from_frame", skip(frame), err)]
4446
pub fn from_frame(frame: Frame) -> crate::Result<Command> {
4547
// The frame value is decorated with `Parse`. `Parse` provides a
4648
// "cursor" like API which makes parsing the command easier.
@@ -87,6 +89,12 @@ impl Command {
8789
///
8890
/// The response is written to `dst`. This is called by the server in order
8991
/// to execute a received command.
92+
#[instrument(
93+
level = "trace",
94+
name = "Command::apply",
95+
skip(self, db, dst, shutdown),
96+
err
97+
)]
9098
pub(crate) async fn apply(
9199
self,
92100
db: &Db,

src/cmd/ping.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl Ping {
3939
/// ```text
4040
/// PING [message]
4141
/// ```
42+
#[instrument(level = "trace", name = "Ping::parse_frames", skip(parse))]
4243
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Ping> {
4344
match parse.next_string() {
4445
Ok(msg) => Ok(Ping::new(Some(msg))),
@@ -51,7 +52,13 @@ impl Ping {
5152
///
5253
/// The response is written to `dst`. This is called by the server in order
5354
/// to execute a received command.
54-
#[instrument(skip(self, dst))]
55+
#[instrument(
56+
name = "Ping::apply",
57+
skip(self, dst),
58+
fields(
59+
?msg = self.msg,
60+
),
61+
)]
5562
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
5663
let response = match self.msg {
5764
None => Frame::Simple("PONG".to_string()),

src/cmd/publish.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{Connection, Db, Frame, Parse};
22

33
use bytes::Bytes;
4+
use tracing::instrument;
45

56
/// Posts a message to the given channel.
67
///
@@ -47,6 +48,7 @@ impl Publish {
4748
/// ```text
4849
/// PUBLISH channel message
4950
/// ```
51+
#[instrument(level = "trace", name = "Publish::parse_frames", skip(parse))]
5052
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Publish> {
5153
// The `PUBLISH` string has already been consumed. Extract the `channel`
5254
// and `message` values from the frame.
@@ -64,6 +66,14 @@ impl Publish {
6466
///
6567
/// The response is written to `dst`. This is called by the server in order
6668
/// to execute a received command.
69+
#[instrument(
70+
level = "trace",
71+
name = "Publish::apply",
72+
skip(self, db, dst),
73+
fields(
74+
channel = self.channel.as_str(),
75+
),
76+
)]
6777
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
6878
// The shared state contains the `tokio::sync::broadcast::Sender` for
6979
// all active channels. Calling `db.publish` dispatches the message into

src/cmd/set.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{Connection, Db, Frame};
33

44
use bytes::Bytes;
55
use std::time::Duration;
6-
use tracing::{debug, instrument};
6+
use tracing::instrument;
77

88
/// Set `key` to hold the string `value`.
99
///
@@ -77,6 +77,7 @@ impl Set {
7777
/// ```text
7878
/// SET key value [EX seconds|PX milliseconds]
7979
/// ```
80+
#[instrument(level = "trace", name = "Set::parse_frames", skip(parse))]
8081
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Set> {
8182
use ParseError::EndOfStream;
8283

@@ -124,14 +125,21 @@ impl Set {
124125
///
125126
/// The response is written to `dst`. This is called by the server in order
126127
/// to execute a received command.
127-
#[instrument(skip(self, db, dst))]
128+
#[instrument(
129+
level = "trace",
130+
name = "Set::apply",
131+
skip(self, db, dst),
132+
fields(
133+
key = self.key.as_str(),
134+
?expire = self.expire.as_ref().map(Duration::as_secs_f64),
135+
),
136+
)]
128137
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
129138
// Set the value in the shared database state.
130139
db.set(self.key, self.value, self.expire);
131140

132141
// Create a success response and write it to `dst`.
133142
let response = Frame::Simple("OK".to_string());
134-
debug!(?response);
135143
dst.write_frame(&response).await?;
136144

137145
Ok(())

src/cmd/subscribe.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::pin::Pin;
66
use tokio::select;
77
use tokio::sync::broadcast;
88
use tokio_stream::{Stream, StreamExt, StreamMap};
9+
use tracing::instrument;
910

1011
/// Subscribes the client to one or more channels.
1112
///
@@ -60,6 +61,7 @@ impl Subscribe {
6061
/// ```text
6162
/// SUBSCRIBE channel [channel ...]
6263
/// ```
64+
#[instrument(level = "trace", name = "Subscribe::parse_frames", skip(parse))]
6365
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Subscribe> {
6466
use ParseError::EndOfStream;
6567

@@ -99,6 +101,14 @@ impl Subscribe {
99101
/// are updated accordingly.
100102
///
101103
/// [here]: https://redis.io/topics/pubsub
104+
#[instrument(
105+
level = "trace",
106+
name = "Suscribe::apply",
107+
skip(self, db, dst),
108+
fields(
109+
channels = "UNIMPLEMENTED", // FIXME
110+
),
111+
)]
102112
pub(crate) async fn apply(
103113
mut self,
104114
db: &Db,

src/cmd/unknown.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,14 @@ impl Unknown {
2525
/// Responds to the client, indicating the command is not recognized.
2626
///
2727
/// This usually means the command is not yet implemented by `mini-redis`.
28-
#[instrument(skip(self, dst))]
28+
#[instrument(
29+
level = "trace",
30+
name = "Unknown::apply",
31+
skip(self, dst),
32+
fields(
33+
command_name = self.command_name.as_str(),
34+
),
35+
)]
2936
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
3037
let response = Frame::Error(format!("ERR unknown command '{}'", self.command_name));
3138

src/connection.rs

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use crate::frame::{self, Frame};
22

33
use bytes::{Buf, BytesMut};
4-
use std::io::{self, Cursor};
4+
use std::io::{self, Cursor, ErrorKind::ConnectionReset};
5+
use std::net::SocketAddr;
56
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
67
use tokio::net::TcpStream;
8+
use tracing::warn;
79

810
/// Send and receive `Frame` values from a remote peer.
911
///
@@ -28,6 +30,16 @@ pub struct Connection {
2830
buffer: BytesMut,
2931
}
3032

33+
/// The result of [`Connection::maybe_read_bytes`].
34+
enum ConnectionState {
35+
/// The connection was gracefully closed when reading was attempted.
36+
Closed,
37+
/// The connection was open when reading was attempted.
38+
Open,
39+
/// The connection was abruptly reset by the peer when reading was attempted.
40+
Reset,
41+
}
42+
3143
impl Connection {
3244
/// Create a new `Connection`, backed by `socket`. Read and write buffers
3345
/// are initialized.
@@ -42,6 +54,11 @@ impl Connection {
4254
}
4355
}
4456

57+
/// Returns the remote address that this connection is bound to.
58+
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
59+
self.stream.get_ref().peer_addr()
60+
}
61+
4562
/// Read a single `Frame` value from the underlying stream.
4663
///
4764
/// The function waits until it has retrieved enough data to parse a frame.
@@ -63,23 +80,40 @@ impl Connection {
6380

6481
// There is not enough buffered data to read a frame. Attempt to
6582
// read more data from the socket.
66-
//
67-
// On success, the number of bytes is returned. `0` indicates "end
68-
// of stream".
69-
if 0 == self.stream.read_buf(&mut self.buffer).await? {
70-
// The remote closed the connection. For this to be a clean
71-
// shutdown, there should be no data in the read buffer. If
72-
// there is, this means that the peer closed the socket while
73-
// sending a frame.
74-
if self.buffer.is_empty() {
83+
match self.maybe_read_bytes().await? {
84+
ConnectionState::Open => continue,
85+
ConnectionState::Closed | ConnectionState::Reset => {
86+
if !self.buffer.is_empty() {
87+
warn! {
88+
incomplete =? self.buffer,
89+
"connection closed with incomplete frame"
90+
};
91+
}
7592
return Ok(None);
76-
} else {
77-
return Err("connection reset by peer".into());
7893
}
7994
}
8095
}
8196
}
8297

98+
/// Attempt to read bytes from the connection.
99+
async fn maybe_read_bytes(&mut self) -> io::Result<ConnectionState> {
100+
match self.stream.read_buf(&mut self.buffer).await {
101+
// the connection was closed gracefully
102+
Ok(0) => Ok(ConnectionState::Closed),
103+
// the connection is still open
104+
Ok(_) => Ok(ConnectionState::Open),
105+
// the connection was closed abruptly by the peer
106+
Err(e) if e.kind() == ConnectionReset => {
107+
warn! {
108+
"connection closed abruptly by peer"
109+
};
110+
Ok(ConnectionState::Reset)
111+
}
112+
// reading failed for some other reason
113+
Err(err) => Err(err),
114+
}
115+
}
116+
83117
/// Tries to parse a frame from the buffer. If the buffer contains enough
84118
/// data, the frame is returned and the data removed from the buffer. If not
85119
/// enough data has been buffered yet, `Ok(None)` is returned. If the

0 commit comments

Comments
 (0)