Skip to content

Conversation

MasterPtato
Copy link
Contributor

No description provided.

Copy link

vercel bot commented Sep 3, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivet-site Ready Ready Preview Comment Sep 8, 2025 5:20am
1 Skipped Deployment
Project Deployment Preview Comments Updated (UTC)
rivet-studio Ignored Ignored Preview Sep 8, 2025 5:20am

Copy link

claude bot commented Sep 3, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link
Contributor Author

MasterPtato commented Sep 3, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link

pkg-pr-new bot commented Sep 3, 2025

Open in StackBlitz

npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@2858
npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@2858
npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@2858

commit: e472192

Copy link

claude bot commented Sep 4, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

claude bot commented Sep 4, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

claude bot commented Sep 4, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Comment on lines +597 to +612
if let Some(sub) = driver.subscriptions.get(&subject).await {
if sub.tx.receiver_count() == 0 {
driver.subscriptions.invalidate(&subject).await;

let mut hasher = DefaultHasher::new();
subject.hash(&mut hasher);
let subject_hash = BASE64.encode(&hasher.finish().to_be_bytes());

let sql = format!("UNLISTEN {}", quote_ident(&subject_hash));
let unlisten_res = driver.client.batch_execute(&sql).await;

if let std::result::Result::Err(err) = unlisten_res {
tracing::error!(%subject, ?err, "failed to unlisten subject");
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a race condition in the subscription cleanup logic. When checking sub.tx.receiver_count() == 0 and then performing invalidation and UNLISTEN operations, new receivers could be added to the subscription between these steps. This creates a Time-of-Check-Time-of-Use vulnerability where active subscriptions might be incorrectly terminated.

Consider using a more atomic approach for this cleanup operation, such as:

  1. Acquiring a lock before checking the receiver count
  2. Using a compare-and-swap pattern if available
  3. Moving this logic into a synchronized context where subscription additions are also managed

This would prevent the scenario where a subscription is invalidated while new clients are attempting to use it.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +89 to +127
let (client, mut conn) = tokio_postgres::connect(&conn_str, tokio_postgres::NoTls).await?;
tokio::spawn(async move {
// NOTE: This loop will stop automatically when client is dropped
loop {
match poll_fn(|cx| conn.poll_message(cx)).await {
Some(std::result::Result::Ok(AsyncMessage::Notification(note))) => {
if let Some(sub) = subscriptions2.get(note.channel()).await {
let env = match serde_json::from_str::<Envelope>(&note.payload()) {
std::result::Result::Ok(env) => env,
std::result::Result::Err(err) => {
tracing::error!(?err, "failed deserializing envelope");
break;
}
};
let payload = match BASE64
.decode(env.payload)
.context("invalid base64 payload")
{
std::result::Result::Ok(p) => p,
std::result::Result::Err(err) => {
tracing::error!(?err, "failed deserializing envelope");
break;
}
};

let _ = sub.tx.send((payload, env.reply_subject));
}
}
Some(std::result::Result::Ok(_)) => continue,
Some(std::result::Result::Err(err)) => {
tracing::error!(?err, "ups poll loop failed");
break;
}
None => break,
}
}

tracing::info!("ups poll loop stopped");
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling in this connection polling task needs improvement to prevent resource leaks and silent failures. Currently, when deserialization errors occur (lines 96-101 and 108-111), the task breaks out of the loop and terminates without proper cleanup or recovery. This creates several issues:

  1. The connection remains in an undefined state when the task terminates early
  2. Message processing stops permanently with no notification to subscribers
  3. The client connection resource may leak if the task exits unexpectedly

Consider implementing:

  • Robust error handling that logs but continues processing other messages
  • A reconnection mechanism if the connection fails
  • A way to notify the system when message delivery is compromised
  • Proper resource cleanup in all termination paths

This would make the system more resilient to transient errors and prevent silent degradation of the pub/sub functionality.

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

claude bot commented Sep 5, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

claude bot commented Sep 5, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link
Contributor

graphite-app bot commented Sep 8, 2025

Merge activity

  • Sep 8, 7:09 AM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Sep 8, 7:09 AM UTC: CI is running for this pull request on a draft pull request (#2881) due to your merge queue CI optimization settings.
  • Sep 8, 7:11 AM UTC: Merged by the Graphite merge queue via draft PR: #2881.

graphite-app bot pushed a commit that referenced this pull request Sep 8, 2025
@graphite-app graphite-app bot closed this Sep 8, 2025
@graphite-app graphite-app bot deleted the 09-03-fix_tunnel_fix_ups_race_condition branch September 8, 2025 07:11
This was referenced Sep 9, 2025
This was referenced Sep 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants