-
Notifications
You must be signed in to change notification settings - Fork 114
chore(ups): add message chunking & ups protocol #2874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(ups): add message chunking & ups protocol #2874
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
commit: |
async fn next(&mut self) -> Result<DriverOutput> { | ||
loop { | ||
tokio::select! { | ||
biased; | ||
// Prefer local messages to reduce latency | ||
res = self.local_rx.recv() => { | ||
match res { | ||
std::result::Result::Ok(payload) => { | ||
return Ok(DriverOutput::Message { subject: self.subject.clone(), payload }); | ||
} | ||
std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => { | ||
// Skip lagged and continue | ||
continue; | ||
} | ||
std::result::Result::Err(broadcast::error::RecvError::Closed) => { | ||
// Local channel closed; fall back to driver only | ||
// Replace with a closed receiver to avoid busy loop | ||
// We simply continue and rely on driver | ||
} | ||
} | ||
} | ||
res = self.driver.next() => { | ||
return res; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a potential issue in the error handling for broadcast::error::RecvError::Closed
. The comment indicates that the code should replace the receiver with a closed one to avoid a busy loop, but the implementation doesn't actually do this. When the local channel is closed, the code continues the loop without modifying self.local_rx
, which means it will repeatedly hit the same closed channel error.
Consider replacing self.local_rx
with a permanently closed receiver when this error is encountered, perhaps by creating a new closed channel:
Err(broadcast::error::RecvError::Closed) => {
// Replace with a permanently closed receiver to avoid busy loop
let (tx, rx) = broadcast::channel::<Vec<u8>>(1);
drop(tx); // Close the channel
self.local_rx = rx;
continue;
}
async fn next(&mut self) -> Result<DriverOutput> { | |
loop { | |
tokio::select! { | |
biased; | |
// Prefer local messages to reduce latency | |
res = self.local_rx.recv() => { | |
match res { | |
std::result::Result::Ok(payload) => { | |
return Ok(DriverOutput::Message { subject: self.subject.clone(), payload }); | |
} | |
std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => { | |
// Skip lagged and continue | |
continue; | |
} | |
std::result::Result::Err(broadcast::error::RecvError::Closed) => { | |
// Local channel closed; fall back to driver only | |
// Replace with a closed receiver to avoid busy loop | |
// We simply continue and rely on driver | |
} | |
} | |
} | |
res = self.driver.next() => { | |
return res; | |
} | |
} | |
} | |
} | |
async fn next(&mut self) -> Result<DriverOutput> { | |
loop { | |
tokio::select! { | |
biased; | |
// Prefer local messages to reduce latency | |
res = self.local_rx.recv() => { | |
match res { | |
std::result::Result::Ok(payload) => { | |
return Ok(DriverOutput::Message { subject: self.subject.clone(), payload }); | |
} | |
std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => { | |
// Skip lagged and continue | |
continue; | |
} | |
std::result::Result::Err(broadcast::error::RecvError::Closed) => { | |
// Local channel closed; fall back to driver only | |
// Replace with a closed receiver to avoid busy loop | |
let (tx, rx) = broadcast::channel::<Vec<u8>>(1); | |
drop(tx); // Close the channel | |
self.local_rx = rx; | |
continue; | |
} | |
} | |
} | |
res = self.driver.next() => { | |
return res; | |
} | |
} | |
} | |
} |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
9d59211
to
bf98d22
Compare
cf0534f
to
90ce950
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
Claude encountered an error —— View job I'll analyze this and get back to you. |
c140b96
to
f452a88
Compare
bf98d22
to
c9cff29
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
c9cff29
to
a294c2e
Compare
Claude encountered an error —— View job I'll analyze this and get back to you. |
Merge activity
|
Fixes RVT-5129 Fixes RVT-5118
Fixes RVT-5129
Fixes RVT-5118