Skip to content

Commit bf98d22

Browse files
committed
chore(ups): add message chunking & ups protocol
1 parent 90ce950 commit bf98d22

File tree

23 files changed

+1059
-709
lines changed

23 files changed

+1059
-709
lines changed

Cargo.lock

Lines changed: 19 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
[workspace]
33
resolver = "2"
4-
members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/api-runtime","sdks/rust/bare_gen","sdks/rust/epoxy-protocol","sdks/rust/key-data","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol"]
4+
members = ["packages/common/api-builder","packages/common/api-client","packages/common/api-types","packages/common/api-util","packages/common/cache/build","packages/common/cache/result","packages/common/clickhouse-inserter","packages/common/clickhouse-user-query","packages/common/config","packages/common/env","packages/common/error/core","packages/common/error/macros","packages/common/gasoline/core","packages/common/gasoline/macros","packages/common/logs","packages/common/metrics","packages/common/pools","packages/common/runtime","packages/common/service-manager","packages/common/test-deps","packages/common/test-deps-docker","packages/common/types","packages/common/udb-util","packages/common/universaldb","packages/common/universalpubsub","packages/common/util/core","packages/common/util/id","packages/common/versioned-data-util","packages/core/actor-kv","packages/core/api-peer","packages/core/api-public","packages/core/bootstrap","packages/core/dump-openapi","packages/core/guard/core","packages/core/guard/server","packages/core/pegboard-gateway","packages/core/pegboard-runner-ws","packages/core/pegboard-tunnel","packages/core/workflow-worker","packages/infra/engine","packages/services/epoxy","packages/services/namespace","packages/services/pegboard","sdks/rust/api-full","sdks/rust/api-runtime","sdks/rust/bare_gen","sdks/rust/epoxy-protocol","sdks/rust/key-data","sdks/rust/runner-protocol","sdks/rust/tunnel-protocol","sdks/rust/ups-protocol"]
55

66
[workspace.package]
77
version = "0.0.1"
@@ -82,14 +82,7 @@ vergen = "9.0.4"
8282

8383
[workspace.dependencies.windows]
8484
version = "0.58"
85-
features = [
86-
"Win32",
87-
"Win32_Storage",
88-
"Win32_Storage_FileSystem",
89-
"Win32_System",
90-
"Win32_System_Console",
91-
"Win32_Security",
92-
]
85+
features = ["Win32","Win32_Storage","Win32_Storage_FileSystem","Win32_System","Win32_System_Console","Win32_Security"]
9386

9487
[workspace.dependencies.pest]
9588
version = "2.7"
@@ -105,12 +98,12 @@ features = ["full"]
10598
[workspace.dependencies.rustls]
10699
version = "0.23.25"
107100
default-features = false
108-
features = ["ring", "std", "logging"]
101+
features = ["ring","std","logging"]
109102

110103
[workspace.dependencies.tokio-rustls]
111104
version = "0.26.2"
112105
default-features = false
113-
features = ["ring", "logging"]
106+
features = ["ring","logging"]
114107

115108
[workspace.dependencies.utoipa]
116109
version = "5.4.0"
@@ -278,11 +271,11 @@ path = "packages/common/error/core"
278271
[workspace.dependencies.rivet-error-macros]
279272
path = "packages/common/error/macros"
280273

281-
[workspace.dependencies.gas]
282-
package = "gasoline"
274+
[workspace.dependencies.gasoline]
283275
path = "packages/common/gasoline/core"
284276

285-
[workspace.dependencies.gasoline]
277+
[workspace.dependencies.gas]
278+
package = "gasoline"
286279
path = "packages/common/gasoline/core"
287280

288281
[workspace.dependencies.gasoline-macros]
@@ -400,6 +393,9 @@ path = "sdks/rust/runner-protocol"
400393
[workspace.dependencies.rivet-tunnel-protocol]
401394
path = "sdks/rust/tunnel-protocol"
402395

396+
[workspace.dependencies.rivet-ups-protocol]
397+
path = "sdks/rust/ups-protocol"
398+
403399
[profile.dev]
404400
overflow-checks = false
405401
debug = false

packages/common/gasoline/core/src/ctx/message.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,15 @@ impl MessageCtx {
153153
message_len = message_buf.len(),
154154
"publishing message to pubsub"
155155
);
156-
if let Err(err) = self.pubsub.publish(&subject, &(*message_buf)).await {
156+
if let Err(err) = self
157+
.pubsub
158+
.publish(
159+
&subject,
160+
&(*message_buf),
161+
universalpubsub::PublishOpts::broadcast(),
162+
)
163+
.await
164+
{
157165
tracing::warn!(?err, "publish message failed, trying again");
158166
continue;
159167
}

packages/common/gasoline/core/src/db/kv/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ impl DatabaseKv {
6262
let spawn_res = tokio::task::Builder::new().name("wake").spawn(
6363
async move {
6464
// Fail gracefully
65-
if let Err(err) = pubsub.publish(WORKER_WAKE_SUBJECT, &Vec::new()).await {
65+
if let Err(err) = pubsub
66+
.publish(
67+
WORKER_WAKE_SUBJECT,
68+
&Vec::new(),
69+
universalpubsub::PublishOpts::broadcast(),
70+
)
71+
.await
72+
{
6673
tracing::warn!(?err, "failed to publish wake message");
6774
}
6875
}

packages/common/universalpubsub/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ deadpool-postgres.workspace = true
1414
futures-util.workspace = true
1515
moka.workspace = true
1616
rivet-error.workspace = true
17+
rivet-ups-protocol.workspace = true
1718
serde_json.workspace = true
19+
versioned-data-util.workspace = true
1820
serde.workspace = true
1921
sha2.workspace = true
2022
tokio-postgres.workspace = true
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
use std::collections::HashMap;
2+
use std::time::{Duration, Instant};
3+
4+
use anyhow::*;
5+
use rivet_ups_protocol::versioned::UpsMessage;
6+
use rivet_ups_protocol::{MessageBody, MessageChunk, MessageStart, PROTOCOL_VERSION};
7+
use versioned_data_util::OwnedVersionedData;
8+
9+
pub const CHUNK_BUFFER_GC_INTERVAL: Duration = Duration::from_secs(60);
10+
pub const CHUNK_BUFFER_MAX_AGE: Duration = Duration::from_secs(300);
11+
12+
#[derive(Debug)]
13+
pub struct ChunkBuffer {
14+
pub message_id: [u8; 16],
15+
pub received_chunks: u32,
16+
pub last_chunk_ts: Instant,
17+
pub buffer: Vec<u8>,
18+
pub chunk_count: u32,
19+
pub reply_subject: Option<String>,
20+
}
21+
22+
pub struct ChunkTracker {
23+
chunks_in_process: HashMap<[u8; 16], ChunkBuffer>,
24+
}
25+
26+
impl ChunkTracker {
27+
pub fn new() -> Self {
28+
Self {
29+
chunks_in_process: HashMap::new(),
30+
}
31+
}
32+
33+
pub fn process_chunk(
34+
&mut self,
35+
raw_message: &[u8],
36+
) -> Result<Option<(Vec<u8>, Option<String>)>> {
37+
let message = UpsMessage::deserialize_with_embedded_version(raw_message)?;
38+
39+
match message.body {
40+
MessageBody::MessageStart(msg) => {
41+
// If only one chunk, return immediately
42+
if msg.chunk_count == 1 {
43+
return Ok(Some((msg.payload, msg.reply_subject)));
44+
}
45+
46+
// Start of a multi-chunk message
47+
let buffer = ChunkBuffer {
48+
message_id: msg.message_id,
49+
received_chunks: 1,
50+
last_chunk_ts: Instant::now(),
51+
buffer: msg.payload,
52+
chunk_count: msg.chunk_count,
53+
reply_subject: msg.reply_subject,
54+
};
55+
self.chunks_in_process.insert(msg.message_id, buffer);
56+
Ok(None)
57+
}
58+
MessageBody::MessageChunk(msg) => {
59+
// Find the matching buffer using message_id
60+
let buffer = self.chunks_in_process.get_mut(&msg.message_id);
61+
62+
let Some(buffer) = buffer else {
63+
bail!(
64+
"received chunk {} for message {:?} but no matching buffer found",
65+
msg.chunk_index,
66+
msg.message_id
67+
);
68+
};
69+
70+
// Validate chunk order
71+
if buffer.received_chunks != msg.chunk_index {
72+
bail!(
73+
"received chunk {} but expected chunk {} for message {:?}",
74+
msg.chunk_index,
75+
buffer.received_chunks,
76+
msg.message_id
77+
);
78+
}
79+
80+
// Update buffer
81+
buffer.buffer.extend_from_slice(&msg.payload);
82+
buffer.received_chunks += 1;
83+
buffer.last_chunk_ts = Instant::now();
84+
let is_complete = buffer.received_chunks == buffer.chunk_count;
85+
86+
if is_complete {
87+
let completed_buffer = self.chunks_in_process.remove(&msg.message_id).unwrap();
88+
Ok(Some((
89+
completed_buffer.buffer,
90+
completed_buffer.reply_subject,
91+
)))
92+
} else {
93+
Ok(None)
94+
}
95+
}
96+
}
97+
}
98+
99+
pub fn gc(&mut self) {
100+
let now = Instant::now();
101+
let size_before = self.chunks_in_process.len();
102+
self.chunks_in_process
103+
.retain(|_, buffer| now.duration_since(buffer.last_chunk_ts) < CHUNK_BUFFER_MAX_AGE);
104+
let size_after = self.chunks_in_process.len();
105+
106+
tracing::debug!(
107+
?size_before,
108+
?size_after,
109+
"performed chunk buffer garbage collection"
110+
);
111+
}
112+
}
113+
114+
/// Splits a payload into chunks that fit within message size limits.
115+
///
116+
/// This function handles chunking by accounting for different overhead
117+
/// between the first chunk (MessageStart) and subsequent chunks (MessageChunk).
118+
///
119+
/// The first chunk carries additional metadata like the reply_subject and chunk_count,
120+
/// which means it has more protocol overhead and less room for payload data.
121+
/// Subsequent chunks only carry a chunk_index, allowing them to fit more payload.
122+
///
123+
/// This optimization ensures:
124+
/// - Reply subject is only transmitted once (in MessageStart)
125+
/// - Maximum payload utilization in each chunk
126+
/// - Efficient bandwidth usage for multi-chunk messages
127+
///
128+
/// # Returns
129+
/// A vector of payload chunks, where each chunk is sized to fit within the message limit
130+
/// after accounting for protocol overhead.
131+
pub fn split_payload_into_chunks(
132+
payload: &[u8],
133+
max_message_size: usize,
134+
message_id: [u8; 16],
135+
reply_subject: Option<&str>,
136+
) -> Result<Vec<Vec<u8>>> {
137+
// Calculate overhead for MessageStart (first chunk)
138+
let start_message = MessageStart {
139+
message_id,
140+
chunk_count: 1,
141+
reply_subject: reply_subject.map(|s| s.to_string()),
142+
payload: vec![],
143+
};
144+
let start_ups_message = rivet_ups_protocol::UpsMessage {
145+
body: MessageBody::MessageStart(start_message),
146+
};
147+
let start_overhead = UpsMessage::latest(start_ups_message)
148+
.serialize_with_embedded_version(PROTOCOL_VERSION)?
149+
.len();
150+
151+
// Calculate overhead for MessageChunk (subsequent chunks)
152+
let chunk_message = MessageChunk {
153+
message_id,
154+
chunk_index: 0,
155+
payload: vec![],
156+
};
157+
let chunk_ups_message = rivet_ups_protocol::UpsMessage {
158+
body: MessageBody::MessageChunk(chunk_message),
159+
};
160+
let chunk_overhead = UpsMessage::latest(chunk_ups_message)
161+
.serialize_with_embedded_version(PROTOCOL_VERSION)?
162+
.len();
163+
164+
// Calculate max payload sizes
165+
let first_chunk_max_payload = max_message_size.saturating_sub(start_overhead);
166+
let other_chunk_max_payload = max_message_size.saturating_sub(chunk_overhead);
167+
168+
if first_chunk_max_payload == 0 || other_chunk_max_payload == 0 {
169+
bail!("message overhead exceeds max message size");
170+
}
171+
172+
// Calculate how many chunks we need
173+
if payload.len() <= first_chunk_max_payload {
174+
// Single chunk - all data fits in first message
175+
return Ok(vec![payload.to_vec()]);
176+
}
177+
178+
// Multi-chunk: first chunk + remaining chunks
179+
let remaining_after_first = payload.len() - first_chunk_max_payload;
180+
let additional_chunks =
181+
(remaining_after_first + other_chunk_max_payload - 1) / other_chunk_max_payload;
182+
183+
let mut chunks = Vec::new();
184+
185+
// First chunk (smaller due to reply_subject overhead)
186+
chunks.push(payload[..first_chunk_max_payload].to_vec());
187+
188+
// Subsequent chunks
189+
let mut offset = first_chunk_max_payload;
190+
for _ in 0..additional_chunks {
191+
let end = std::cmp::min(offset + other_chunk_max_payload, payload.len());
192+
chunks.push(payload[offset..end].to_vec());
193+
offset = end;
194+
}
195+
196+
Ok(chunks)
197+
}
198+
199+
/// Encodes a chunk to the resulting BARE message.
200+
pub fn encode_chunk(
201+
payload: Vec<u8>,
202+
chunk_idx: u32,
203+
chunk_count: u32,
204+
message_id: [u8; 16],
205+
reply_subject: Option<String>,
206+
) -> Result<Vec<u8>> {
207+
let body = if chunk_idx == 0 {
208+
// First chunk - MessageStart
209+
MessageBody::MessageStart(MessageStart {
210+
message_id,
211+
chunk_count,
212+
reply_subject,
213+
payload,
214+
})
215+
} else {
216+
// Subsequent chunks - MessageChunk
217+
MessageBody::MessageChunk(MessageChunk {
218+
message_id,
219+
chunk_index: chunk_idx,
220+
payload,
221+
})
222+
};
223+
224+
let ups_message = rivet_ups_protocol::UpsMessage { body };
225+
UpsMessage::latest(ups_message).serialize_with_embedded_version(PROTOCOL_VERSION)
226+
}

0 commit comments

Comments
 (0)