Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 38 additions & 39 deletions mux/SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ MUX is a symmetric stream multiplexing protocol designed to run over reliable, o

- Lightweight framing with minimal overhead (14-byte headers)
- Per-stream flow control with automatic window tuning
- Graceful and abrupt stream termination
- Fast stream open/close with ID reuse
- Connection-level resource limits
- Symmetric operation (no client/server distinction)
- Deterministic stream IDs derived from user-defined identifiers
Expand Down Expand Up @@ -98,15 +98,14 @@ Flags modify frame behavior. Multiple flags MAY be set simultaneously by combini

| Flag | Value | Applicable Types | Description |
|------|-------|------------------|-------------|
| FIN | `0x01` | Data, Window Update | Half-closes the stream in the sender's direction. |
| RST | `0x02` | Data, Window Update | Immediately resets (terminates) the stream. |
| FIN | `0x01` | Data | Signals end-of-stream marker (see Section 5.4). |
| SYN | `0x04` | Ping | Ping request. |
| ACK | `0x08` | Ping | Ping response. |

### 4.1 Flag Constraints

- FIN and RST MUST NOT be set together. If both are set, RST takes precedence.
- SYN and ACK are only valid on Ping frames.
- FIN is only valid on Data frames.

## 5. Stream Management

Expand Down Expand Up @@ -141,46 +140,45 @@ Streams are created implicitly when the first frame for a Stream ID is sent or r

### 5.3 Stream Lifecycle

```
+-------+
| Open |
+-------+
/ | \
FIN← / | \ →FIN
/ | \
+----------+ | +----------+
|RecvClosed| | |SendClosed|
+----------+ | +----------+
\ | /
FIN→ \ | / ←FIN
\ | /
+-------+
|Closed |
+-------+
RST (any state)
```
Streams have a simplified lifecycle optimized for fast open/close with ID reuse:

**Local stream states:**
- **Active**: Application holds a handle to the stream. Can read/write.
- **Buffering**: No handle, but buffered data exists. Data continues to be received.
- **Removed**: No handle and no buffered data. Stream ID available for reuse.

**Opening a stream:**
1. Compute Stream ID from user identifier.
2. If stream exists in Buffering state, transition to Active (access buffered data).
3. If stream does not exist, create new stream in Active state.

| State | Description |
|-------|-------------|
| Open | Bidirectional data flow. |
| SendClosed | Local side sent FIN. Can still receive data. |
| RecvClosed | Remote side sent FIN. Can still send data. |
| Closed | Both directions closed. Stream resources may be released. |
**Closing a stream:**
1. Application drops the stream handle.
2. If buffer is empty, remove stream (ID available for reuse).
3. If buffer is non-empty, transition to Buffering state.

### 5.4 Half-Close (FIN)
**Key properties:**
- Stream IDs can be reused after the stream is removed.
- Dropping a handle does NOT send any protocol message.
- Remote peer is not notified when local handle is dropped.
- Flow control remains active for Buffering streams.

Sending FIN indicates the sender will transmit no more data on this stream. The stream transitions:
- `Open` → `SendClosed`
- `RecvClosed` → `Closed`
### 5.4 End-of-Stream Marker (FIN)

Receiving FIN transitions:
- `Open` → `RecvClosed`
- `SendClosed` → `Closed`
FIN is an in-band marker signaling the end of a logical message or stream segment. Unlike traditional half-close:

### 5.5 Reset (RST)
- FIN is buffered in-order with data frames.
- Reading FIN returns EOF (0 bytes).
- **Data MAY be sent after FIN.** FIN does not prevent further transmission.
- Applications use FIN for framing; the protocol does not enforce termination.

**Example usage:**
```
Sender: Data("hello") → FIN → Data("world") → FIN
Reader: reads "hello" → reads EOF → reads "world" → reads EOF
```

RST immediately terminates a stream from any state. Both sides SHOULD release stream resources upon sending or receiving RST. No further frames SHOULD be sent on a reset stream.
This allows FIN to delimit messages within a long-lived stream.

## 6. Flow Control

Expand All @@ -195,6 +193,7 @@ Each stream maintains an independent receive window representing the number of b
**Behavior:**
- Senders MUST NOT send more data than the receiver's advertised window.
- Each byte of Data payload consumes one byte of window.
- Each FIN marker consumes 32 bytes of window (to prevent FIN spam attacks).
- Window Update frames replenish the window.

### 6.2 Window Updates
Expand Down Expand Up @@ -267,7 +266,7 @@ Upon detecting a protocol violation, implementations MUST:

### 8.2 Stream Errors vs Connection Errors

- **Stream errors** (e.g., application-level errors) SHOULD be handled with RST on that stream.
- **Stream errors** (e.g., application-level errors) are handled by the application dropping the stream handle. The remote peer is not explicitly notified.
- **Connection errors** (e.g., protocol violations) MUST be handled with GoAway and connection closure.

## 9. Constants Summary
Expand Down
67 changes: 53 additions & 14 deletions mux/mux/src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@

use std::{collections::VecDeque, io};

/// A sequence of [`Chunk`] values.
/// An element in the buffer - either data or a FIN marker.
#[derive(Debug)]
pub(crate) enum ChunkOrFin {
Chunk(Chunk),
Fin,
}

/// A sequence of [`ChunkOrFin`] values.
///
/// [`Chunks::len`] considers all [`Chunk`] elements and computes the total
/// result, i.e. the length of all bytes, by summing up the lengths of all
/// [`Chunk`] elements.
/// [`Chunk`] elements. FIN markers don't contribute to length.
#[derive(Debug)]
pub(crate) struct Chunks {
seq: VecDeque<Chunk>,
seq: VecDeque<ChunkOrFin>,
len: usize,
}

Expand All @@ -34,29 +41,61 @@ impl Chunks {

/// The total length of bytes yet-to-be-read in all `Chunk`s.
pub(crate) fn len(&self) -> usize {
self.len - self.seq.front().map(|c| c.offset()).unwrap_or(0)
let front_offset = self
.seq
.front()
.and_then(|e| match e {
ChunkOrFin::Chunk(c) => Some(c.offset()),
ChunkOrFin::Fin => None,
})
.unwrap_or(0);
self.len - front_offset
}

/// Returns true if there is no data in the buffer.
///
/// Note: A buffer with only FIN markers is considered empty.
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}

/// Add another chunk of bytes to the end.
pub(crate) fn push(&mut self, x: Vec<u8>) {
self.len += x.len();
if !x.is_empty() {
self.seq.push_back(Chunk {
self.seq.push_back(ChunkOrFin::Chunk(Chunk {
cursor: io::Cursor::new(x),
})
}))
}
}

/// Add a FIN marker to the end.
pub(crate) fn push_fin(&mut self) {
self.seq.push_back(ChunkOrFin::Fin);
}

/// Remove and return the first element.
pub(crate) fn pop(&mut self) -> Option<ChunkOrFin> {
let elem = self.seq.pop_front();
if let Some(ChunkOrFin::Chunk(ref c)) = elem {
self.len -= c.len() + c.offset();
}
elem
}

/// Remove and return the first chunk.
pub(crate) fn pop(&mut self) -> Option<Chunk> {
let chunk = self.seq.pop_front();
self.len -= chunk.as_ref().map(|c| c.len() + c.offset()).unwrap_or(0);
chunk
/// Get a reference to the first element.
pub(crate) fn front(&self) -> Option<&ChunkOrFin> {
self.seq.front()
}

/// Get a mutable reference to the first chunk.
pub(crate) fn front_mut(&mut self) -> Option<&mut Chunk> {
self.seq.front_mut()
/// Get a mutable reference to the first chunk, if it is a chunk.
///
/// Returns None if buffer is empty or front is a FIN marker.
pub(crate) fn front_chunk_mut(&mut self) -> Option<&mut Chunk> {
match self.seq.front_mut() {
Some(ChunkOrFin::Chunk(c)) => Some(c),
_ => None,
}
}
}

Expand Down
20 changes: 12 additions & 8 deletions mux/mux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
matches!(self.inner, ConnectionState::Closed(_))
}

/// Get a handle for creating streams concurrently.
/// Get a handle for obtaining streams concurrently.
///
/// The handle can be cloned and used from multiple tasks while the
/// Connection is being polled.
Expand All @@ -95,17 +95,21 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
}
}

/// Create a new stream with the given user ID.
/// Get a stream handle for the given user ID.
///
/// Streams are implicit - if data has already been received for this ID,
/// returns a handle to the existing stream with buffered data. Otherwise,
/// creates a new stream.
///
/// The stream ID is computed from the user ID using BLAKE3.
/// Either side can create streams with the same user ID - they will
/// automatically merge into the same stream.
/// Either side can get streams with the same user ID - they will
/// automatically refer to the same stream.
///
/// The `user_id` parameter is a required user-defined stream identifier
/// (1-256 bytes). User IDs must be unique within the session.
pub fn new_stream(&mut self, user_id: &[u8]) -> Result<Stream> {
/// The `user_id` parameter is a user-defined stream identifier (1-256
/// bytes).
pub fn get_stream(&mut self, user_id: &[u8]) -> Result<Stream> {
match &mut self.inner {
ConnectionState::Active(active) => active.new_stream(user_id),
ConnectionState::Active(active) => active.get_stream(user_id),
_ => Err(ConnectionError::Closed),
}
}
Expand Down
Loading
Loading