Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.

Commit a013e6c

Browse files
authored
feat: Configurable option for compression (#151)
1 parent 750650e commit a013e6c

File tree

3 files changed

+52
-11
lines changed

3 files changed

+52
-11
lines changed

src/append_session.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
async fn connect(
3030
stream_client: &StreamClient,
3131
frame_signal: FrameSignal,
32+
compression: bool,
3233
) -> Result<
3334
(
3435
mpsc::Sender<types::AppendInput>,
@@ -44,6 +45,7 @@ async fn connect(
4445
.frame_monitoring_stream_service_client(frame_signal.clone()),
4546
&stream_client.stream,
4647
ReceiverStream::new(input_rx),
48+
compression,
4749
);
4850

4951
Ok((input_tx, stream_client.inner.send(service_req).await?))
@@ -214,6 +216,7 @@ async fn session_inner<S>(
214216
frame_signal: FrameSignal,
215217
stream_client: StreamClient,
216218
output_tx: mpsc::Sender<Result<types::AppendOutput, ClientError>>,
219+
compression: bool,
217220
) -> Result<(), ClientError>
218221
where
219222
S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
@@ -230,7 +233,8 @@ where
230233

231234
assert!(*inflight_size <= stream_client.inner.config.max_append_inflight_bytes);
232235

233-
let (s2_input_tx, mut s2_ack_stream) = connect(&stream_client, frame_signal.clone()).await?;
236+
let (s2_input_tx, mut s2_ack_stream) =
237+
connect(&stream_client, frame_signal.clone(), compression).await?;
234238
let batch_ack_deadline = stream_client.inner.config.request_timeout;
235239

236240
if !inflight.is_empty() {
@@ -346,6 +350,7 @@ pub(crate) async fn manage_session<S>(
346350
stream_client: StreamClient,
347351
input: S,
348352
output_tx: mpsc::Sender<Result<types::AppendOutput, ClientError>>,
353+
compression: bool,
349354
) where
350355
S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
351356
{
@@ -367,6 +372,7 @@ pub(crate) async fn manage_session<S>(
367372
frame_signal.clone(),
368373
stream_client.clone(),
369374
output_tx.clone(),
375+
compression,
370376
)
371377
.await
372378
{

src/client.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ pub struct ClientConfig {
246246
pub(crate) retry_backoff_duration: Duration,
247247
pub(crate) max_attempts: usize,
248248
pub(crate) max_append_inflight_bytes: u64,
249+
pub(crate) compression: bool,
249250
}
250251

251252
impl ClientConfig {
@@ -263,6 +264,7 @@ impl ClientConfig {
263264
retry_backoff_duration: Duration::from_millis(100),
264265
max_attempts: 3,
265266
max_append_inflight_bytes: 100 * MIB_BYTES,
267+
compression: false,
266268
}
267269
}
268270

@@ -353,6 +355,15 @@ impl ClientConfig {
353355
..self
354356
}
355357
}
358+
359+
/// Configure compression for requests and responses.
360+
/// Disabled by default.
361+
pub fn with_compression(self, compression: bool) -> Self {
362+
Self {
363+
compression,
364+
..self
365+
}
366+
}
356367
}
357368

358369
/// Error from client operations.
@@ -616,6 +627,7 @@ impl StreamClient {
616627
self.inner.stream_service_client(),
617628
&self.stream,
618629
req,
630+
self.inner.config.compression,
619631
))
620632
.await
621633
}
@@ -625,8 +637,12 @@ impl StreamClient {
625637
&self,
626638
req: types::ReadSessionRequest,
627639
) -> Result<Streaming<types::ReadOutput>, ClientError> {
628-
let request =
629-
ReadSessionServiceRequest::new(self.inner.stream_service_client(), &self.stream, req);
640+
let request = ReadSessionServiceRequest::new(
641+
self.inner.stream_service_client(),
642+
&self.stream,
643+
req,
644+
self.inner.config.compression,
645+
);
630646
self.inner
631647
.send_retryable(request.clone())
632648
.await
@@ -653,6 +669,7 @@ impl StreamClient {
653669
frame_signal,
654670
&self.stream,
655671
req,
672+
self.inner.config.compression,
656673
))
657674
.await
658675
}
@@ -671,6 +688,7 @@ impl StreamClient {
671688
self.clone(),
672689
req,
673690
response_tx,
691+
self.inner.config.compression,
674692
));
675693

676694
Ok(Box::pin(ReceiverStream::new(response_rx)))
@@ -817,6 +835,7 @@ impl ClientInner {
817835
pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
818836
StreamServiceClient::new(self.channel.clone())
819837
}
838+
820839
pub(crate) fn frame_monitoring_stream_service_client(
821840
&self,
822841
frame_signal: FrameSignal,

src/service/stream.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,16 @@ pub struct ReadServiceRequest {
6464

6565
impl ReadServiceRequest {
6666
pub fn new(
67-
client: StreamServiceClient<Channel>,
67+
mut client: StreamServiceClient<Channel>,
6868
stream: impl Into<String>,
6969
req: types::ReadRequest,
70+
compression: bool,
7071
) -> Self {
72+
if compression {
73+
client = client.accept_compressed(CompressionEncoding::Zstd);
74+
}
7175
Self {
72-
client: client.accept_compressed(CompressionEncoding::Zstd),
76+
client,
7377
stream: stream.into(),
7478
req,
7579
}
@@ -111,12 +115,16 @@ pub struct ReadSessionServiceRequest {
111115

112116
impl ReadSessionServiceRequest {
113117
pub fn new(
114-
client: StreamServiceClient<Channel>,
118+
mut client: StreamServiceClient<Channel>,
115119
stream: impl Into<String>,
116120
req: types::ReadSessionRequest,
121+
compression: bool,
117122
) -> Self {
123+
if compression {
124+
client = client.accept_compressed(CompressionEncoding::Zstd);
125+
}
118126
Self {
119-
client: client.accept_compressed(CompressionEncoding::Zstd),
127+
client,
120128
stream: stream.into(),
121129
req,
122130
}
@@ -181,14 +189,18 @@ pub struct AppendServiceRequest {
181189

182190
impl AppendServiceRequest {
183191
pub fn new(
184-
client: StreamServiceClient<RequestFrameMonitor>,
192+
mut client: StreamServiceClient<RequestFrameMonitor>,
185193
append_retry_policy: AppendRetryPolicy,
186194
frame_signal: FrameSignal,
187195
stream: impl Into<String>,
188196
req: types::AppendInput,
197+
compression: bool,
189198
) -> Self {
199+
if compression {
200+
client = client.send_compressed(CompressionEncoding::Zstd);
201+
}
190202
Self {
191-
client: client.send_compressed(CompressionEncoding::Zstd),
203+
client,
192204
append_retry_policy,
193205
frame_signal,
194206
stream: stream.into(),
@@ -259,12 +271,16 @@ where
259271
S: Send + futures::Stream<Item = types::AppendInput> + Unpin,
260272
{
261273
pub fn new(
262-
client: StreamServiceClient<RequestFrameMonitor>,
274+
mut client: StreamServiceClient<RequestFrameMonitor>,
263275
stream: impl Into<String>,
264276
req: S,
277+
compression: bool,
265278
) -> Self {
279+
if compression {
280+
client = client.send_compressed(CompressionEncoding::Zstd);
281+
}
266282
Self {
267-
client: client.send_compressed(CompressionEncoding::Zstd),
283+
client,
268284
stream: stream.into(),
269285
req: Some(req),
270286
}

0 commit comments

Comments
 (0)