Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 50 additions & 5 deletions crates/full-node/sov-sequencer/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use sov_modules_api::{FullyBakedTx, RawTx, RuntimeEventProcessor, RuntimeEventRe
use sov_rest_utils::handle_bad_ws_request;
use sov_rest_utils::send_json;
use sov_rest_utils::{
errors, preconfigured_router_layers, serve_generic_ws_subscription, ApiResult, FilterQuery,
PageSelection, PaginatedResponse, Pagination, Path, Query,
errors, preconfigured_router_layers, serve_generic_ws_subscription,
serve_generic_ws_subscription_with_config, ApiResult, FilterQuery, PageSelection,
PaginatedResponse, Pagination, Path, Query, WsSubscriptionConfig,
};
use sov_rest_utils::{get_client_ip, WsMessage};
use sov_rollup_interface::da::{DaBlobHash, DaSpec};
Expand Down Expand Up @@ -75,6 +76,34 @@ pub struct StartFrom {
start_from: u64,
}

/// Compression mode for WebSocket subscriptions.
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionMode {
/// No compression (default). Messages are sent as individual JSON text frames.
#[default]
None,
/// Gzip compression. Messages are batched into JSON arrays, compressed, and sent as binary frames.
Gzip,
}

/// Query parameters for WebSocket subscriptions that support compression.
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CompressionQuery {
/// The compression mode to use for this subscription.
#[serde(default)]
pub compression: CompressionMode,
}

impl CompressionQuery {
/// Converts the query into a [`WsSubscriptionConfig`].
pub fn to_config(&self) -> WsSubscriptionConfig {
WsSubscriptionConfig {
compress: self.compression == CompressionMode::Gzip,
}
}
}

/// Provides REST APIs for any [`Sequencer`]. See [`SequencerApis::rest_api_server`].
#[derive(derivative::Derivative)]
#[derivative(Clone(bound = ""))]
Expand Down Expand Up @@ -480,10 +509,12 @@ impl<Seq: Sequencer> SequencerApis<Seq> {
async fn subscribe_to_events(
State(state): State<Self>,
filter: FilterQuery,
compression: Option<Query<CompressionQuery>>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
use futures::future;
ws.on_upgrade(|socket| async move {
let config = compression.map(|q| q.0.to_config()).unwrap_or_default();
ws.on_upgrade(move |socket| async move {
let stream = state
.sequencer
.subscribe_events()
Expand All @@ -494,20 +525,34 @@ impl<Seq: Sequencer> SequencerApis<Seq> {
(Ok(event), Some(filter)) => future::ready(filter.matches(&event.key)),
(_, _) => future::ready(true),
});
serve_generic_ws_subscription(socket, stream, state.shutdown_receiver.clone()).await;
serve_generic_ws_subscription_with_config(
socket,
stream,
state.shutdown_receiver.clone(),
config,
)
.await;
})
}

async fn subscribe_to_transactions(
State(state): State<Self>,
start_from: Option<Query<StartFrom>>,
compression: Option<Query<CompressionQuery>>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
let start_from = start_from.map(|start_from| start_from.0.start_from);
let config = compression.map(|q| q.0.to_config()).unwrap_or_default();
ws.on_upgrade(move |socket| async move {
let stream =
Self::subscribe_txs_starting_from(start_from, state.sequencer.clone()).await;
serve_generic_ws_subscription(socket, stream, state.shutdown_receiver.clone()).await;
serve_generic_ws_subscription_with_config(
socket,
stream,
state.shutdown_receiver.clone(),
config,
)
.await;
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/utils/sov-rest-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow = { workspace = true }
client-ip = "0.1.1"
axum = { workspace = true, features = ["query", "ws", "json", "original-uri"] }
derive_more = { workspace = true, default-features = true, features = ["deref", "display"] }
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
futures = { workspace = true }
proptest = { workspace = true, features = ["std"], optional = true }
proptest-derive = { workspace = true, optional = true }
Expand All @@ -35,6 +36,7 @@ tokio-tungstenite = { version = "0.28.0", default-features = false, features = [
tokio-stream = { workspace = true }
tungstenite = "0.28.0"
axum = { workspace = true, features = ["query", "ws", "json", "original-uri", "tokio", "http1"] }
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }

[features]
arbitrary = ["proptest", "proptest-derive", "sov-rest-utils/arbitrary"]
183 changes: 163 additions & 20 deletions crates/utils/sov-rest-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ pub fn cors_layer_opt(

const MAX_BATCH_SIZE: usize = 128;

/// Configuration for WebSocket subscription behavior.
#[derive(Debug, Clone, Copy, Default)]
pub struct WsSubscriptionConfig {
/// When true, messages are batched into arrays and gzip-compressed before sending.
pub compress: bool,
}

/// Interval between ping frames sent to the client for keepalive.
const PING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);

Expand All @@ -195,9 +202,42 @@ const PONG_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
/// - Graceful shutdown on server shutdown signal
/// - Proper handling of client disconnection and half-closed connections
pub async fn serve_generic_ws_subscription<S, M, E>(
socket: WebSocket,
subscription: S,
shutdown_receiver: tokio::sync::watch::Receiver<()>,
) where
S: futures::Stream<Item = Result<M, E>> + Unpin,
E: ReportableWsError,
M: Clone + serde::Serialize + Send + Sync + 'static,
{
serve_generic_ws_subscription_with_config(
socket,
subscription,
shutdown_receiver,
WsSubscriptionConfig::default(),
)
.await
}

/// A utility function for serving some data inside a [`futures::Stream`] over a
/// WebSocket connection, with configurable behavior.
///
/// This function handles:
/// - Sending data from the subscription stream to the client
/// - Periodic ping/pong keepalive to detect dead connections
/// - Graceful shutdown on server shutdown signal
/// - Proper handling of client disconnection and half-closed connections
/// - Optional gzip compression of batched messages (when `config.compress` is true)
///
/// When compression is enabled:
/// - Data messages are batched into JSON arrays, gzip-compressed, and sent as binary frames
/// - Error messages are also gzip-compressed and sent as binary frames
/// - Clients can uniformly decompress all binary frames (gzip magic bytes: `0x1f 0x8b`)
pub async fn serve_generic_ws_subscription_with_config<S, M, E>(
mut socket: WebSocket,
subscription: S,
mut shutdown_receiver: tokio::sync::watch::Receiver<()>,
config: WsSubscriptionConfig,
) where
S: futures::Stream<Item = Result<M, E>> + Unpin,
E: ReportableWsError,
Expand Down Expand Up @@ -272,31 +312,67 @@ pub async fn serve_generic_ws_subscription<S, M, E>(
chunk_opt = chunked_subscription.next() => {
match chunk_opt {
Some(chunk) => {
for item in chunk {
match item {
Ok(data) => {
let serialized = match serde_json::to_string(&data) {
Ok(serialized) => serialized,
Err(err) => {
error!(?err, "Failed to serialize data for WebSocket; this is a bug, please report it");
if config.compress {
// Compressed mode: batch successful items, compress, send as binary
let mut batch: Vec<M> = Vec::with_capacity(chunk.len());
for item in chunk {
match item {
Ok(data) => {
batch.push(data);
}
Err(err) => {
// Flush any accumulated batch before sending error
if !batch.is_empty() {
if send_compressed_batch(&mut socket, &batch).await.is_err() {
break 'outer;
}
batch.clear();
}

// Send compressed error
if send_compressed_bytes(&mut socket, err.to_json().as_bytes()).await.is_err() {
break 'outer;
}

if !err.is_recoverable() {
break 'outer;
}
};
if let Err(err) = socket.feed(serialized.into()).await {
warn!(?err, "WebSocket send error - disconnecting client");
break 'outer;
}
}
Err(err) => {
// Send error notification to the client
if let Err(send_err) = socket.send(err.to_json().into()).await {
warn!(err=?send_err, "WebSocket send error - disconnecting client");
break 'outer;
}

// Send remaining batch
if !batch.is_empty() && send_compressed_batch(&mut socket, &batch).await.is_err() {
break 'outer;
}
} else {
// Uncompressed mode: send individual text messages (original behavior)
for item in chunk {
match item {
Ok(data) => {
let serialized = match serde_json::to_string(&data) {
Ok(serialized) => serialized,
Err(err) => {
error!(?err, "Failed to serialize data for WebSocket; this is a bug, please report it");
break 'outer;
}
};
if let Err(err) = socket.feed(serialized.into()).await {
warn!(?err, "WebSocket send error - disconnecting client");
break 'outer;
}
}
// For recoverable errors (e.g., lag), continue streaming
// For non-recoverable errors, disconnect
if !err.is_recoverable() {
break 'outer;
Err(err) => {
// Send error notification to the client
if let Err(send_err) = socket.send(err.to_json().into()).await {
warn!(err=?send_err, "WebSocket send error - disconnecting client");
break 'outer;
}
// For recoverable errors (e.g., lag), continue streaming
// For non-recoverable errors, disconnect
if !err.is_recoverable() {
break 'outer;
}
}
}
}
Expand Down Expand Up @@ -343,6 +419,73 @@ pub async fn serve_generic_ws_subscription<S, M, E>(
socket.close().await.ok();
}

/// Compresses bytes with gzip.
fn compress_bytes(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;

let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data)?;
encoder.finish()
}

/// Serializes the value as JSON and compresses it with gzip.
fn compress_json<T: Serialize + ?Sized>(value: &T) -> Result<Vec<u8>, std::io::Error> {
let json = serde_json::to_vec(value)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
compress_bytes(&json)
}

/// Compresses and sends a batch of items as a binary WebSocket message.
/// Returns Err(()) if the send fails or compression fails.
async fn send_compressed_batch<T: Serialize>(
socket: &mut WebSocket,
batch: &[T],
) -> Result<(), ()> {
use axum::extract::ws::Message;

match compress_json(batch) {
Ok(compressed) => {
if let Err(err) = socket.feed(Message::Binary(compressed)).await {
warn!(?err, "WebSocket send error - disconnecting client");
return Err(());
}
Ok(())
}
Err(err) => {
error!(
?err,
"Failed to serialize/compress data for WebSocket; this is a bug, please report it"
);
Err(())
}
}
}

/// Compresses and sends raw bytes as a binary WebSocket message.
/// Returns Err(()) if the send fails or compression fails.
async fn send_compressed_bytes(socket: &mut WebSocket, data: &[u8]) -> Result<(), ()> {
use axum::extract::ws::Message;

match compress_bytes(data) {
Ok(compressed) => {
if let Err(err) = socket.feed(Message::Binary(compressed)).await {
warn!(?err, "WebSocket send error - disconnecting client");
return Err(());
}
Ok(())
}
Err(err) => {
error!(
?err,
"Failed to compress data for WebSocket; this is a bug, please report it"
);
Err(())
}
}
}

/// A message that can be received via websocket.
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
pub struct WsMessage<Contents> {
Expand Down
Loading
Loading