Skip to content

Commit 06ccf9d

Browse files
committed
feat(pb): configure total slots header for rivetkit from outbound req (#3021)
1 parent c09e161 commit 06ccf9d

File tree

1 file changed

+18
-3
lines changed
  • packages/core/pegboard-serverless/src

1 file changed

+18
-3
lines changed

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use universaldb::utils::IsolationLevel::*;
2222
use vbare::OwnedVersionedData;
2323

2424
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
25+
const X_RIVETKIT_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivetkit-total-slots");
2526

2627
struct OutboundConnection {
2728
handle: JoinHandle<()>,
@@ -158,6 +159,7 @@ async fn tick(
158159
url.clone(),
159160
headers.clone(),
160161
Duration::from_secs(*request_lifespan as u64),
162+
*slots_per_runner,
161163
)
162164
})
163165
.take(start_count);
@@ -183,14 +185,23 @@ fn spawn_connection(
183185
url: String,
184186
headers: HashMap<String, String>,
185187
request_lifespan: Duration,
188+
slots_per_runner: u32,
186189
) -> OutboundConnection {
187190
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
188191
let draining = Arc::new(AtomicBool::new(false));
189192

190193
let draining2 = draining.clone();
191194
let handle = tokio::spawn(async move {
192-
if let Err(err) =
193-
outbound_handler(&ctx, url, headers, request_lifespan, shutdown_rx, draining2).await
195+
if let Err(err) = outbound_handler(
196+
&ctx,
197+
url,
198+
headers,
199+
request_lifespan,
200+
slots_per_runner,
201+
shutdown_rx,
202+
draining2,
203+
)
204+
.await
194205
{
195206
tracing::error!(?err, "outbound req failed");
196207

@@ -217,6 +228,7 @@ async fn outbound_handler(
217228
url: String,
218229
headers: HashMap<String, String>,
219230
request_lifespan: Duration,
231+
slots_per_runner: u32,
220232
shutdown_rx: oneshot::Receiver<()>,
221233
draining: Arc<AtomicBool>,
222234
) -> Result<()> {
@@ -232,7 +244,10 @@ async fn outbound_handler(
232244
})
233245
.collect();
234246

235-
let mut req = client.get(url).headers(headers);
247+
let mut req = client
248+
.get(url)
249+
.headers(headers)
250+
.header(X_RIVETKIT_TOTAL_SLOTS, slots_per_runner.to_string());
236251

237252
// Add admin token if configured
238253
if let Some(auth) = &ctx.config().auth {

0 commit comments

Comments
 (0)