Skip to content

Commit 5124830

Browse files
authored
chore(socket): consistent builder pattern across repo (#123)
2 parents f6de6eb + b9bd969 commit 5124830

File tree

26 files changed

+175
-136
lines changed

26 files changed

+175
-136
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ all = "warn"
108108
[workspace.lints.rust]
109109
missing_debug_implementations = "warn"
110110
missing_docs = "warn"
111-
rust-2018-idioms = { level = "deny", priority = -1 }
112-
unreachable-pub = "warn"
113-
unused-must-use = "deny"
111+
rust_2018_idioms = { level = "deny", priority = -1 }
112+
unreachable_pub = "warn"
113+
unused_must_use = "deny"
114+
rust_2024_incompatible_pat = "warn"
114115

115116
[workspace.lints.clippy]
116117
# These are some of clippy's nursery (i.e., experimental) lints that we like.

book/src/usage/authentication.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async fn main() {
5555
// The identifier will be sent to the server when the connection is established.
5656
let mut req = ReqSocket::with_options(
5757
Tcp::default(),
58-
ReqOptions::default().auth_token(Bytes::from("client1")),
58+
ReqOptions::default().with_auth_token(Bytes::from("client1")),
5959
);
6060

6161
...

msg-socket/src/connection/backoff.rs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
use futures::{FutureExt, Stream};
2-
use std::{pin::Pin, task::Poll, time::Duration};
2+
use std::{
3+
pin::Pin,
4+
task::{Context, Poll},
5+
time::Duration,
6+
};
37
use tokio::time::sleep;
48

59
/// Helper trait alias for backoff streams.
610
/// We define any stream that yields `Duration`s as a backoff
711
pub trait Backoff: Stream<Item = Duration> + Unpin {}
812

9-
/// Blanket implementation of `Backoff` for any stream that yields `Duration`s.
13+
// Blanket implementation of `Backoff` for any stream that yields `Duration`s.
1014
impl<T> Backoff for T where T: Stream<Item = Duration> + Unpin {}
1115

1216
/// A stream that yields exponentially increasing backoff durations.
@@ -23,6 +27,7 @@ pub struct ExponentialBackoff {
2327
}
2428

2529
impl ExponentialBackoff {
30+
/// Creates a new exponential backoff stream with the given initial duration and max retries.
2631
pub fn new(initial: Duration, max_retries: usize) -> Self {
2732
Self { retry_count: 0, max_retries, backoff: initial, timeout: None }
2833
}
@@ -38,38 +43,36 @@ impl Stream for ExponentialBackoff {
3843

3944
/// Polls the exponential backoff stream. Returns `Poll::Ready` with the current backoff
4045
/// duration if the backoff timeout has elapsed, otherwise returns `Poll::Pending`.
41-
fn poll_next(
42-
self: Pin<&mut Self>,
43-
cx: &mut std::task::Context<'_>,
44-
) -> Poll<Option<Self::Item>> {
46+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4547
let this = self.get_mut();
4648

4749
loop {
48-
if let Some(ref mut timeout) = this.timeout {
49-
if timeout.poll_unpin(cx).is_ready() {
50-
// Timeout has elapsed, so reset the timeout and double the backoff
51-
this.backoff *= 2;
52-
this.retry_count += 1;
50+
let Some(ref mut timeout) = this.timeout else {
51+
// Set the initial timeout
52+
this.reset_timeout();
53+
continue;
54+
};
55+
56+
if timeout.poll_unpin(cx).is_ready() {
57+
// Timeout has elapsed, so reset the timeout and double the backoff
58+
this.backoff *= 2;
59+
this.retry_count += 1;
5360

54-
// Close the stream
55-
if this.retry_count >= this.max_retries {
56-
return Poll::Ready(None);
57-
}
61+
// Close the stream
62+
if this.retry_count >= this.max_retries {
63+
return Poll::Ready(None);
64+
}
5865

59-
this.reset_timeout();
66+
this.reset_timeout();
6067

61-
// Wake up the task to poll the timeout again
62-
cx.waker().wake_by_ref();
68+
// Wake up the task to poll the timeout again
69+
cx.waker().wake_by_ref();
6370

64-
// Return the current backoff duration
65-
return Poll::Ready(Some(this.backoff));
66-
} else {
67-
// Timeout has not elapsed, so return pending
68-
return Poll::Pending;
69-
}
71+
// Return the current backoff duration
72+
return Poll::Ready(Some(this.backoff));
7073
} else {
71-
// Set initial timeout
72-
this.reset_timeout();
74+
// Timeout has not elapsed, so return pending
75+
return Poll::Pending;
7376
}
7477
}
7578
}

msg-socket/src/connection/state.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use msg_transport::Address;
2-
3-
use super::Backoff;
4-
51
/// Abstraction to represent the state of a connection.
62
///
73
/// * `C` is the channel type, which is used to send and receive generic messages.
84
/// * `B` is the backoff type, used to control the backoff state for inactive connections.
9-
pub enum ConnectionState<C, B, A: Address> {
5+
/// * `A` is the address type, used to represent the address of the connection.
6+
pub enum ConnectionState<C, B, A> {
107
Active {
118
/// Channel to control the underlying connection. This is used to send
129
/// and receive any kind of message in any direction.
@@ -19,15 +16,13 @@ pub enum ConnectionState<C, B, A: Address> {
1916
},
2017
}
2118

22-
impl<C, B: Backoff, A: Address> ConnectionState<C, B, A> {
19+
impl<C, B, A> ConnectionState<C, B, A> {
2320
/// Returns `true` if the connection is active.
24-
#[allow(unused)]
2521
pub fn is_active(&self) -> bool {
2622
matches!(self, Self::Active { .. })
2723
}
2824

2925
/// Returns `true` if the connection is inactive.
30-
#[allow(unused)]
3126
pub fn is_inactive(&self) -> bool {
3227
matches!(self, Self::Inactive { .. })
3328
}

msg-socket/src/pub/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,35 +73,35 @@ impl Default for PubOptions {
7373

7474
impl PubOptions {
7575
/// Sets the maximum number of concurrent clients.
76-
pub fn max_clients(mut self, max_clients: usize) -> Self {
76+
pub fn with_max_clients(mut self, max_clients: usize) -> Self {
7777
self.max_clients = Some(max_clients);
7878
self
7979
}
8080

8181
/// Sets the session channel buffer size. This is the amount of messages that can be buffered
8282
/// per session before messages start being dropped.
83-
pub fn session_buffer_size(mut self, session_buffer_size: usize) -> Self {
83+
pub fn with_session_buffer_size(mut self, session_buffer_size: usize) -> Self {
8484
self.session_buffer_size = session_buffer_size;
8585
self
8686
}
8787

8888
/// Sets the maximum number of bytes that can be buffered in the session before being flushed.
8989
/// This internally sets [`Framed::set_backpressure_boundary`](tokio_util::codec::Framed).
90-
pub fn backpressure_boundary(mut self, backpressure_boundary: usize) -> Self {
90+
pub fn with_backpressure_boundary(mut self, backpressure_boundary: usize) -> Self {
9191
self.backpressure_boundary = backpressure_boundary;
9292
self
9393
}
9494

9595
/// Sets the interval at which each session should be flushed. If this is `None`,
9696
/// the session will be flushed on every publish, which can add a lot of overhead.
97-
pub fn flush_interval(mut self, flush_interval: std::time::Duration) -> Self {
97+
pub fn with_flush_interval(mut self, flush_interval: std::time::Duration) -> Self {
9898
self.flush_interval = Some(flush_interval);
9999
self
100100
}
101101

102102
/// Sets the minimum payload size in bytes for compression to be used. If the payload is smaller
103103
/// than this threshold, it will not be compressed.
104-
pub fn min_compress_size(mut self, min_compress_size: usize) -> Self {
104+
pub fn with_min_compress_size(mut self, min_compress_size: usize) -> Self {
105105
self.min_compress_size = min_compress_size;
106106
self
107107
}
@@ -224,7 +224,7 @@ mod tests {
224224

225225
let mut sub_socket = SubSocket::with_options(
226226
Tcp::default(),
227-
SubOptions::default().auth_token(Bytes::from("client1")),
227+
SubOptions::default().with_auth_token(Bytes::from("client1")),
228228
);
229229

230230
pub_socket.bind("0.0.0.0:0").await.unwrap();
@@ -250,7 +250,7 @@ mod tests {
250250

251251
let mut sub_socket = SubSocket::with_options(
252252
Quic::default(),
253-
SubOptions::default().auth_token(Bytes::from("client1")),
253+
SubOptions::default().with_auth_token(Bytes::from("client1")),
254254
);
255255

256256
pub_socket.bind("0.0.0.0:0").await.unwrap();
@@ -389,7 +389,7 @@ mod tests {
389389
let _ = tracing_subscriber::fmt::try_init();
390390

391391
let mut pub_socket =
392-
PubSocket::with_options(Tcp::default(), PubOptions::default().max_clients(1));
392+
PubSocket::with_options(Tcp::default(), PubOptions::default().with_max_clients(1));
393393

394394
pub_socket.bind("0.0.0.0:0").await.unwrap();
395395

msg-socket/src/rep/mod.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,19 @@ impl Default for RepOptions {
5050

5151
impl RepOptions {
5252
/// Sets the number of maximum concurrent clients.
53-
pub fn max_clients(mut self, max_clients: usize) -> Self {
53+
pub fn with_max_clients(mut self, max_clients: usize) -> Self {
5454
self.max_clients = Some(max_clients);
5555
self
5656
}
5757

5858
/// Sets the minimum payload size for compression.
5959
/// If the payload is smaller than this value, it will not be compressed.
60-
pub fn min_compress_size(mut self, min_compress_size: usize) -> Self {
60+
pub fn with_min_compress_size(mut self, min_compress_size: usize) -> Self {
6161
self.min_compress_size = min_compress_size;
6262
self
6363
}
6464

65-
pub fn backpressure_boundary(mut self, backpressure_boundary: usize) -> Self {
65+
pub fn with_backpressure_boundary(mut self, backpressure_boundary: usize) -> Self {
6666
self.backpressure_boundary = backpressure_boundary;
6767
self
6868
}
@@ -210,7 +210,7 @@ mod tests {
210210
// Initialize socket with a client ID. This will implicitly enable authentication.
211211
let mut req = ReqSocket::with_options(
212212
Tcp::default(),
213-
ReqOptions::default().auth_token(Bytes::from("REQ")),
213+
ReqOptions::default().with_auth_token(Bytes::from("REQ")),
214214
);
215215

216216
req.connect(rep.local_addr().unwrap()).await.unwrap();
@@ -247,7 +247,8 @@ mod tests {
247247
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
248248
async fn rep_max_connections() {
249249
let _ = tracing_subscriber::fmt::try_init();
250-
let mut rep = RepSocket::with_options(Tcp::default(), RepOptions::default().max_clients(1));
250+
let mut rep =
251+
RepSocket::with_options(Tcp::default(), RepOptions::default().with_max_clients(1));
251252
rep.bind("127.0.0.1:0").await.unwrap();
252253
let addr = rep.local_addr().unwrap();
253254

@@ -264,15 +265,19 @@ mod tests {
264265

265266
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
266267
async fn test_basic_reqrep_with_compression() {
267-
let mut rep =
268-
RepSocket::with_options(Tcp::default(), RepOptions::default().min_compress_size(0))
269-
.with_compressor(SnappyCompressor);
268+
let mut rep = RepSocket::with_options(
269+
Tcp::default(),
270+
RepOptions::default().with_min_compress_size(0),
271+
)
272+
.with_compressor(SnappyCompressor);
270273

271274
rep.bind("0.0.0.0:4445").await.unwrap();
272275

273-
let mut req =
274-
ReqSocket::with_options(Tcp::default(), ReqOptions::default().min_compress_size(0))
275-
.with_compressor(GzipCompressor::new(6));
276+
let mut req = ReqSocket::with_options(
277+
Tcp::default(),
278+
ReqOptions::default().with_min_compress_size(0),
279+
)
280+
.with_compressor(GzipCompressor::new(6));
276281

277282
req.connect("0.0.0.0:4445").await.unwrap();
278283

msg-socket/src/req/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,25 @@ pub struct ReqOptions {
7272

7373
impl ReqOptions {
7474
/// Sets the authentication token for the socket.
75-
pub fn auth_token(mut self, auth_token: Bytes) -> Self {
75+
pub fn with_auth_token(mut self, auth_token: Bytes) -> Self {
7676
self.auth_token = Some(auth_token);
7777
self
7878
}
7979

8080
/// Sets the timeout for the socket.
81-
pub fn timeout(mut self, timeout: Duration) -> Self {
81+
pub fn with_timeout(mut self, timeout: Duration) -> Self {
8282
self.timeout = timeout;
8383
self
8484
}
8585

8686
/// Enables blocking initial connections to the target.
87-
pub fn blocking_connect(mut self) -> Self {
87+
pub fn with_blocking_connect(mut self) -> Self {
8888
self.blocking_connect = true;
8989
self
9090
}
9191

9292
/// Sets the backoff duration for the socket.
93-
pub fn backoff_duration(mut self, backoff_duration: Duration) -> Self {
93+
pub fn with_backoff_duration(mut self, backoff_duration: Duration) -> Self {
9494
self.backoff_duration = backoff_duration;
9595
self
9696
}
@@ -99,29 +99,29 @@ impl ReqOptions {
9999
/// throughput, but at the cost of higher latency. Note that this behaviour can be
100100
/// completely useless if the `backpressure_boundary` is set too low (which will trigger a
101101
/// flush before the interval is reached).
102-
pub fn flush_interval(mut self, flush_interval: Duration) -> Self {
102+
pub fn with_flush_interval(mut self, flush_interval: Duration) -> Self {
103103
self.flush_interval = Some(flush_interval);
104104
self
105105
}
106106

107107
/// Sets the backpressure boundary for the socket. This is the maximum number of bytes that can
108108
/// be buffered in the session before being flushed. This internally sets
109109
/// [`Framed::set_backpressure_boundary`](tokio_util::codec::Framed).
110-
pub fn backpressure_boundary(mut self, backpressure_boundary: usize) -> Self {
110+
pub fn with_backpressure_boundary(mut self, backpressure_boundary: usize) -> Self {
111111
self.backpressure_boundary = backpressure_boundary;
112112
self
113113
}
114114

115115
/// Sets the maximum number of retry attempts. If `None`, all connections will be retried
116116
/// indefinitely.
117-
pub fn retry_attempts(mut self, retry_attempts: usize) -> Self {
117+
pub fn with_retry_attempts(mut self, retry_attempts: usize) -> Self {
118118
self.retry_attempts = Some(retry_attempts);
119119
self
120120
}
121121

122122
/// Sets the minimum payload size in bytes for compression to be used. If the payload is smaller
123123
/// than this threshold, it will not be compressed.
124-
pub fn min_compress_size(mut self, min_compress_size: usize) -> Self {
124+
pub fn with_min_compress_size(mut self, min_compress_size: usize) -> Self {
125125
self.min_compress_size = min_compress_size;
126126
self
127127
}

msg-socket/src/req/socket.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ where
113113
/// Tries to connect to the target endpoint with the default options.
114114
/// A ReqSocket can only be connected to a single address.
115115
pub async fn try_connect(&mut self, endpoint: A) -> Result<(), ReqError> {
116-
// Initialize communication channels
117116
let (to_driver, from_socket) = mpsc::channel(DEFAULT_BUFFER_SIZE);
118117

119118
// TODO: Don't panic, return error

msg-socket/src/sub/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,27 +71,27 @@ pub struct SubOptions {
7171
impl SubOptions {
7272
/// Sets the authentication token for this socket. This will activate the authentication layer
7373
/// and send the token to the publisher.
74-
pub fn auth_token(mut self, auth_token: Bytes) -> Self {
74+
pub fn with_auth_token(mut self, auth_token: Bytes) -> Self {
7575
self.auth_token = Some(auth_token);
7676
self
7777
}
7878

7979
/// Sets the ingress buffer size. This is the maximum amount of incoming messages that will be
8080
/// buffered. If the consumer cannot keep up with the incoming messages, messages will start
8181
/// being dropped.
82-
pub fn ingress_buffer_size(mut self, ingress_buffer_size: usize) -> Self {
82+
pub fn with_ingress_buffer_size(mut self, ingress_buffer_size: usize) -> Self {
8383
self.ingress_buffer_size = ingress_buffer_size;
8484
self
8585
}
8686

8787
/// Sets the read buffer size. This sets the size of the read buffer for each session.
88-
pub fn read_buffer_size(mut self, read_buffer_size: usize) -> Self {
88+
pub fn with_read_buffer_size(mut self, read_buffer_size: usize) -> Self {
8989
self.read_buffer_size = read_buffer_size;
9090
self
9191
}
9292

9393
/// Set the initial backoff for reconnecting to a publisher.
94-
pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
94+
pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
9595
self.initial_backoff = initial_backoff;
9696
self
9797
}

0 commit comments

Comments
 (0)