Skip to content

Commit 5424fef

Browse files
committed
refactor: DEFAULT_QUEUE_SIZE
1 parent 4758403 commit 5424fef

File tree

10 files changed

+62
-59
lines changed

10 files changed

+62
-59
lines changed

msg-socket/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub use connection::*;
3636
/// The default buffer size for a socket.
3737
pub const DEFAULT_BUFFER_SIZE: usize = 8192;
3838

39+
/// The default queue size for a channel.
40+
pub const DEFAULT_QUEUE_SIZE: usize = 8192;
41+
3942
/// A request Identifier.
4043
pub struct RequestId(u32);
4144

msg-socket/src/rep/mod.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@ use msg_transport::Address;
66
use thiserror::Error;
77
use tokio::sync::oneshot;
88

9-
use crate::DEFAULT_BUFFER_SIZE;
10-
119
mod driver;
1210
mod socket;
13-
mod stats;
14-
use crate::{Profile, stats::SocketStats};
1511
pub use socket::*;
12+
13+
mod stats;
1614
use stats::RepStats;
1715

18-
const DEFAULT_MIN_COMPRESS_SIZE: usize = 8192;
16+
use crate::{DEFAULT_BUFFER_SIZE, DEFAULT_QUEUE_SIZE, Profile, stats::SocketStats};
1917

2018
/// Errors that can occur when using a reply socket.
2119
#[derive(Debug, Error)]
@@ -47,23 +45,29 @@ impl RepError {
4745
pub struct RepOptions {
4846
/// The maximum number of concurrent clients.
4947
pub(crate) max_clients: Option<usize>,
48+
/// Minimum payload size in bytes for compression to be used.
49+
///
50+
/// If the payload is smaller than this threshold, it will not be compressed.
5051
pub(crate) min_compress_size: usize,
52+
/// The size of the write buffer in bytes.
5153
pub(crate) write_buffer_size: usize,
54+
/// The maximum duration between flushes to the underlying transport
5255
pub(crate) write_buffer_linger: Option<Duration>,
53-
/// High-water mark for pending responses per peer. When this limit is reached,
54-
/// new requests will not be read from the underlying connection until pending
55-
/// responses are fulfilled.
56+
/// High-water mark for pending responses per peer.
57+
///
58+
/// When this limit is reached, new requests will not be read from the underlying connection
59+
/// until pending responses are fulfilled.
5660
pub(crate) max_pending_responses: usize,
5761
}
5862

5963
impl Default for RepOptions {
6064
fn default() -> Self {
6165
Self {
6266
max_clients: None,
63-
min_compress_size: DEFAULT_MIN_COMPRESS_SIZE,
64-
write_buffer_size: 8192,
67+
min_compress_size: DEFAULT_BUFFER_SIZE,
68+
write_buffer_size: DEFAULT_BUFFER_SIZE,
6569
write_buffer_linger: Some(Duration::from_micros(100)),
66-
max_pending_responses: DEFAULT_BUFFER_SIZE,
70+
max_pending_responses: DEFAULT_QUEUE_SIZE,
6771
}
6872
}
6973
}
@@ -115,6 +119,8 @@ impl RepOptions {
115119

116120
/// Sets the minimum payload size for compression.
117121
/// If the payload is smaller than this value, it will not be compressed.
122+
///
123+
/// Default: [`DEFAULT_BUFFER_SIZE`]
118124
pub fn with_min_compress_size(mut self, min_compress_size: usize) -> Self {
119125
self.min_compress_size = min_compress_size;
120126
self
@@ -123,7 +129,7 @@ impl RepOptions {
123129
/// Sets the size (max capacity) of the write buffer in bytes. When the buffer is full, it will
124130
/// be flushed to the underlying transport.
125131
///
126-
/// Default: 8KiB
132+
/// Default: [`DEFAULT_BUFFER_SIZE`]
127133
pub fn with_write_buffer_size(mut self, size: usize) -> Self {
128134
self.write_buffer_size = size;
129135
self
@@ -142,7 +148,7 @@ impl RepOptions {
142148
/// new requests will not be read from the underlying connection until pending
143149
/// responses are fulfilled.
144150
///
145-
/// Default: [`DEFAULT_BUFFER_SIZE`]
151+
/// Default: [`DEFAULT_QUEUE_SIZE`]
146152
pub fn with_max_pending_responses(mut self, hwm: usize) -> Self {
147153
self.max_pending_responses = hwm;
148154
self

msg-socket/src/rep/socket.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio_stream::StreamMap;
1616
use tracing::{debug, warn};
1717

1818
use crate::{
19-
Authenticator, DEFAULT_BUFFER_SIZE, RepOptions, Request,
19+
Authenticator, DEFAULT_QUEUE_SIZE, RepOptions, Request,
2020
rep::{RepError, SocketState, driver::RepDriver},
2121
};
2222

@@ -110,8 +110,8 @@ where
110110

111111
/// Binds the socket to the given address. This spawns the socket driver task.
112112
pub async fn try_bind(&mut self, addresses: Vec<A>) -> Result<(), RepError> {
113-
let (to_socket, from_backend) = mpsc::channel(DEFAULT_BUFFER_SIZE);
114-
let (control_tx, control_rx) = mpsc::channel(DEFAULT_BUFFER_SIZE);
113+
let (to_socket, from_backend) = mpsc::channel(DEFAULT_QUEUE_SIZE);
114+
let (control_tx, control_rx) = mpsc::channel(DEFAULT_QUEUE_SIZE);
115115

116116
let mut transport = self.transport.take().expect("transport has been moved already");
117117

msg-socket/src/req/mod.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub use socket::*;
2323
use crate::{Profile, stats::SocketStats};
2424
use stats::ReqStats;
2525

26-
use crate::DEFAULT_BUFFER_SIZE;
26+
use crate::{DEFAULT_BUFFER_SIZE, DEFAULT_QUEUE_SIZE};
2727

2828
pub(crate) static DRIVER_ID: AtomicUsize = AtomicUsize::new(0);
2929

@@ -194,6 +194,8 @@ impl ReqOptions {
194194
/// Sets the minimum payload size in bytes for compression to be used.
195195
///
196196
/// If the payload is smaller than this threshold, it will not be compressed.
197+
///
198+
/// Default: [`DEFAULT_BUFFER_SIZE`]
197199
pub fn with_min_compress_size(mut self, min_compress_size: usize) -> Self {
198200
self.min_compress_size = min_compress_size;
199201
self
@@ -202,7 +204,7 @@ impl ReqOptions {
202204
/// Sets the size (max capacity) of the write buffer in bytes.
203205
/// When the buffer is full, it will be flushed to the underlying transport.
204206
///
205-
/// Default: 8KiB
207+
/// Default: [`DEFAULT_BUFFER_SIZE`]
206208
pub fn with_write_buffer_size(mut self, size: usize) -> Self {
207209
self.write_buffer_size = size;
208210
self
@@ -221,7 +223,7 @@ impl ReqOptions {
221223
/// This controls how many requests can be queued, on top of the current pending requests,
222224
/// before the socket returns [`ReqError::HighWaterMarkReached`].
223225
///
224-
/// Default: [`DEFAULT_BUFFER_SIZE`]
226+
/// Default: [`DEFAULT_QUEUE_SIZE`]
225227
pub fn with_max_queue_size(mut self, size: usize) -> Self {
226228
self.max_queue_size = size;
227229
self
@@ -231,7 +233,7 @@ impl ReqOptions {
231233
/// will not be processed and will be queued up to [`Self::with_max_queue_size`] elements.
232234
/// Once both limits are reached, new requests will return [`ReqError::HighWaterMarkReached`].
233235
///
234-
/// Default: [`DEFAULT_BUFFER_SIZE`]
236+
/// Default: [`DEFAULT_QUEUE_SIZE`]
235237
pub fn with_max_pending_requests(mut self, hwm: usize) -> Self {
236238
self.max_pending_requests = hwm;
237239
self
@@ -244,11 +246,11 @@ impl Default for ReqOptions {
244246
conn: ConnOptions::default(),
245247
timeout: Duration::from_secs(5),
246248
blocking_connect: false,
247-
min_compress_size: 8192,
248-
write_buffer_size: 8192,
249+
min_compress_size: DEFAULT_BUFFER_SIZE,
250+
write_buffer_size: DEFAULT_BUFFER_SIZE,
249251
write_buffer_linger: Some(Duration::from_micros(100)),
250-
max_queue_size: DEFAULT_BUFFER_SIZE,
251-
max_pending_requests: DEFAULT_BUFFER_SIZE,
252+
max_queue_size: DEFAULT_QUEUE_SIZE,
253+
max_pending_requests: DEFAULT_QUEUE_SIZE,
252254
}
253255
}
254256
}

msg-socket/src/sub/mod.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@ mod socket;
1212
pub use socket::*;
1313

1414
mod stats;
15-
16-
use crate::stats::SocketStats;
1715
use stats::SubStats;
1816

1917
mod stream;
2018

2119
use msg_transport::Address;
2220
use msg_wire::pubsub;
2321

24-
use crate::DEFAULT_BUFFER_SIZE;
22+
use crate::{DEFAULT_BUFFER_SIZE, DEFAULT_QUEUE_SIZE, stats::SocketStats};
2523

2624
#[derive(Debug, Error)]
2725
pub enum SubError {
@@ -61,7 +59,7 @@ pub struct SubOptions {
6159
auth_token: Option<Bytes>,
6260
/// The maximum amount of incoming messages that will be buffered before being dropped due to
6361
/// a slow consumer.
64-
ingress_buffer_size: usize,
62+
ingress_queue_size: usize,
6563
/// The read buffer size for each session.
6664
read_buffer_size: usize,
6765
/// The initial backoff for reconnecting to a publisher.
@@ -78,15 +76,19 @@ impl SubOptions {
7876
self
7977
}
8078

81-
/// Sets the ingress buffer size. This is the maximum amount of incoming messages that will be
79+
/// Sets the ingress queue size. This is the maximum amount of incoming messages that will be
8280
/// buffered. If the consumer cannot keep up with the incoming messages, messages will start
8381
/// being dropped.
84-
pub fn with_ingress_buffer_size(mut self, ingress_buffer_size: usize) -> Self {
85-
self.ingress_buffer_size = ingress_buffer_size;
82+
///
83+
/// Default: [`DEFAULT_QUEUE_SIZE`]
84+
pub fn with_ingress_queue_size(mut self, ingress_queue_size: usize) -> Self {
85+
self.ingress_queue_size = ingress_queue_size;
8686
self
8787
}
8888

8989
/// Sets the read buffer size. This sets the size of the read buffer for each session.
90+
///
91+
/// Default: [`DEFAULT_BUFFER_SIZE`]
9092
pub fn with_read_buffer_size(mut self, read_buffer_size: usize) -> Self {
9193
self.read_buffer_size = read_buffer_size;
9294
self
@@ -110,7 +112,7 @@ impl Default for SubOptions {
110112
fn default() -> Self {
111113
Self {
112114
auth_token: None,
113-
ingress_buffer_size: DEFAULT_BUFFER_SIZE,
115+
ingress_queue_size: DEFAULT_QUEUE_SIZE,
114116
read_buffer_size: 8192,
115117
initial_backoff: Duration::from_millis(100),
116118
retry_attempts: Some(24),

msg-socket/src/sub/socket.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,9 @@ use tokio::{
1717
use msg_common::{IpAddrExt, JoinMap};
1818
use msg_transport::{Address, Transport};
1919

20-
// ADDED: Import the specific SubStats struct for the API
21-
use super::stats::SubStats;
22-
// Import the rest from the parent module (sub/mod.rs)
23-
use super::{
24-
// REMOVED: Old/removed stats structs
25-
// Command, PubMessage, SocketState, SocketStats, SocketWideStats, SubDriver, SubError,
26-
Command,
27-
DEFAULT_BUFFER_SIZE,
28-
PubMessage,
29-
SocketState,
30-
SubDriver,
31-
SubError,
32-
SubOptions,
20+
use crate::sub::{
21+
Command, DEFAULT_BUFFER_SIZE, PubMessage, SocketState, SubDriver, SubError, SubOptions,
22+
stats::SubStats,
3323
};
3424

3525
/// A subscriber socket. This socket implements [`Stream`] and yields incoming [`PubMessage`]s.
@@ -136,7 +126,7 @@ where
136126
/// Creates a new subscriber socket with the given transport and options.
137127
pub fn with_options(transport: T, options: SubOptions) -> Self {
138128
let (to_driver, from_socket) = mpsc::channel(DEFAULT_BUFFER_SIZE);
139-
let (to_socket, from_driver) = mpsc::channel(options.ingress_buffer_size);
129+
let (to_socket, from_driver) = mpsc::channel(options.ingress_queue_size);
140130

141131
let options = Arc::new(options);
142132

msg-socket/tests/it/reqrep.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::time::Duration;
22

33
use bytes::Bytes;
4-
use msg_socket::{DEFAULT_BUFFER_SIZE, RepSocket, ReqOptions, ReqSocket};
4+
use msg_socket::{DEFAULT_QUEUE_SIZE, RepSocket, ReqOptions, ReqSocket};
55
use msg_transport::{
66
tcp::Tcp,
77
tcp_tls::{self, TcpTls},
@@ -273,11 +273,11 @@ async fn reqrep_hwm_reached() {
273273
// Share req via Arc for concurrent access
274274
let req = std::sync::Arc::new(req);
275275

276-
const TOTAL_CAPACITY: usize = HWM + DEFAULT_BUFFER_SIZE;
276+
const TOTAL_CAPACITY: usize = HWM + DEFAULT_QUEUE_SIZE;
277277

278278
// Send requests until the channel is full (HighWaterMarkReached error)
279279
// - HWM requests will be moved to pending_requests
280-
// - DEFAULT_BUFFER_SIZE requests will be buffered in the channel (driver stops polling at HWM)
280+
// - DEFAULT_QUEUE_SIZE requests will be buffered in the channel (driver stops polling at HWM)
281281
// - The next request will fail with HighWaterMarkReached
282282
let mut success_receivers = Vec::new();
283283
let mut sent_count = 0;

msg/benches/pubsub.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ fn pubsub_single_thread_tcp(c: &mut Criterion) {
159159
Tcp::default(),
160160
SubOptions::default()
161161
.with_read_buffer_size(buffer_size)
162-
.with_ingress_buffer_size(N_REQS * 2),
162+
.with_ingress_queue_size(N_REQS * 2),
163163
);
164164

165165
let mut bench = PairBenchmark {
@@ -201,7 +201,7 @@ fn pubsub_multi_thread_tcp(c: &mut Criterion) {
201201
Tcp::default(),
202202
SubOptions::default()
203203
.with_read_buffer_size(buffer_size)
204-
.with_ingress_buffer_size(N_REQS * 2),
204+
.with_ingress_queue_size(N_REQS * 2),
205205
);
206206

207207
let mut bench = PairBenchmark {
@@ -242,7 +242,7 @@ fn pubsub_single_thread_quic(c: &mut Criterion) {
242242
Quic::default(),
243243
SubOptions::default()
244244
.with_read_buffer_size(buffer_size)
245-
.with_ingress_buffer_size(N_REQS * 2),
245+
.with_ingress_queue_size(N_REQS * 2),
246246
);
247247

248248
let mut bench = PairBenchmark {
@@ -284,7 +284,7 @@ fn pubsub_multi_thread_quic(c: &mut Criterion) {
284284
Quic::default(),
285285
SubOptions::default()
286286
.with_read_buffer_size(buffer_size)
287-
.with_ingress_buffer_size(N_REQS * 2),
287+
.with_ingress_queue_size(N_REQS * 2),
288288
);
289289

290290
let mut bench = PairBenchmark {
@@ -325,7 +325,7 @@ fn pubsub_single_thread_ipc(c: &mut Criterion) {
325325
Ipc::default(),
326326
SubOptions::default()
327327
.with_read_buffer_size(buffer_size)
328-
.with_ingress_buffer_size(N_REQS * 2),
328+
.with_ingress_queue_size(N_REQS * 2),
329329
);
330330

331331
let mut bench = PairBenchmark {
@@ -367,7 +367,7 @@ fn pubsub_multi_thread_ipc(c: &mut Criterion) {
367367
Ipc::default(),
368368
SubOptions::default()
369369
.with_read_buffer_size(buffer_size)
370-
.with_ingress_buffer_size(N_REQS * 2),
370+
.with_ingress_queue_size(N_REQS * 2),
371371
);
372372

373373
let mut bench = PairBenchmark {

msg/examples/pubsub.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ async fn main() {
2121
// Configure the subscribers with options
2222
let mut sub1 = SubSocket::with_options(
2323
Tcp::default(),
24-
SubOptions::default().with_ingress_buffer_size(1024),
24+
SubOptions::default().with_ingress_queue_size(1024),
2525
);
2626

2727
let mut sub2 = SubSocket::with_options(
2828
// TCP transport with blocking connect, usually connection happens in the background.
2929
Tcp::default(),
30-
SubOptions::default().with_ingress_buffer_size(1024),
30+
SubOptions::default().with_ingress_queue_size(1024),
3131
);
3232

3333
tracing::info!("Setting up the sockets...");

msg/examples/quic_vs_tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async fn run_tcp() {
2727
// Configure the subscribers with options
2828
let mut sub1 = SubSocket::with_options(
2929
Tcp::default(),
30-
SubOptions::default().with_ingress_buffer_size(1024),
30+
SubOptions::default().with_ingress_queue_size(1024),
3131
);
3232

3333
tracing::info!("Setting up the sockets...");
@@ -60,7 +60,7 @@ async fn run_quic() {
6060
let mut sub1 = SubSocket::with_options(
6161
// TCP transport with blocking connect, usually connection happens in the background.
6262
Quic::default(),
63-
SubOptions::default().with_ingress_buffer_size(1024),
63+
SubOptions::default().with_ingress_queue_size(1024),
6464
);
6565

6666
tracing::info!("Setting up the sockets...");

0 commit comments

Comments
 (0)