Skip to content

Commit bea3536

Browse files
authored
Merge pull request #6 from gpu-mode/feat/streaming-response
Feat: streaming response
2 parents 86f599e + 4f3e49d commit bea3536

File tree

3 files changed

+111
-10
lines changed

3 files changed

+111
-10
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@ serde_yaml = "0.9"
2020
webbrowser = "0.8"
2121
base64-url = "3.0.0"
2222
urlencoding = "2.1.3"
23+
bytes = "1.10.1"
24+
futures-util = "0.3.31"
2325

2426

src/service/mod.rs

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use serde_json::Value;
66
use std::env;
77
use std::path::Path;
88
use std::time::Duration;
9+
use tokio::io::AsyncWriteExt;
910

1011
use crate::models::{GpuItem, LeaderboardItem};
1112

@@ -125,22 +126,105 @@ pub async fn submit_solution<P: AsRef<Path>>(
125126
let resp = client
126127
.post(&url)
127128
.multipart(form)
128-
.timeout(Duration::from_secs(180))
129+
.timeout(Duration::from_secs(300))
129130
.send()
130131
.await?;
131132

132133
let status = resp.status();
133134
if !status.is_success() {
134135
let error_text = resp.text().await?;
135-
return Err(anyhow!("Server returned status {}: {}", status, error_text));
136+
let detail = serde_json::from_str::<Value>(&error_text)
137+
.ok()
138+
.and_then(|v| v.get("detail").and_then(|d| d.as_str()).map(str::to_string));
139+
140+
return Err(anyhow!(
141+
"Server returned status {}: {}",
142+
status,
143+
detail.unwrap_or(error_text)
144+
));
136145
}
137146

138-
let result: Value = resp.json().await?;
139-
140-
let pretty_result = match result.get("results") {
141-
Some(result_obj) => serde_json::to_string_pretty(result_obj)?,
142-
None => return Err(anyhow!("Invalid response structure")),
143-
};
144-
145-
Ok(pretty_result)
147+
if resp
148+
.headers()
149+
.get(reqwest::header::CONTENT_TYPE)
150+
.and_then(|v| v.to_str().ok())
151+
.map_or(false, |s| s.starts_with("text/event-stream"))
152+
{
153+
let mut resp = resp;
154+
let mut buffer = String::new();
155+
let mut stderr = tokio::io::stderr();
156+
157+
while let Some(chunk) = resp.chunk().await? {
158+
buffer.push_str(&String::from_utf8_lossy(&chunk));
159+
160+
while let Some(pos) = buffer.find("\n\n") {
161+
let message_str = buffer.drain(..pos + 2).collect::<String>();
162+
let mut event_type = None;
163+
let mut data_json = None;
164+
165+
for line in message_str.lines() {
166+
if line.starts_with("event:") {
167+
event_type = Some(line["event:".len()..].trim());
168+
} else if line.starts_with("data:") {
169+
data_json = Some(line["data:".len()..].trim());
170+
}
171+
}
172+
173+
if let (Some(event), Some(data)) = (event_type, data_json) {
174+
match event {
175+
"status" => (),
176+
"result" => {
177+
let result_val: Value = serde_json::from_str(data)?;
178+
let pretty_result = match result_val.get("results") {
179+
Some(result_obj) => serde_json::to_string_pretty(result_obj)?,
180+
None => {
181+
return Err(anyhow!(
182+
"Invalid 'result' event structure: missing 'results' field"
183+
))
184+
}
185+
};
186+
return Ok(pretty_result);
187+
}
188+
"error" => {
189+
let error_val: Value = serde_json::from_str(data)?;
190+
let detail = error_val
191+
.get("detail")
192+
.and_then(|d| d.as_str())
193+
.unwrap_or("Unknown server error");
194+
let status_code = error_val.get("status_code").and_then(|s| s.as_i64());
195+
let raw_error = error_val.get("raw_error").and_then(|e| e.as_str());
196+
197+
let mut error_msg = format!("Server processing error: {}", detail);
198+
if let Some(sc) = status_code {
199+
error_msg.push_str(&format!(" (Status Code: {})", sc));
200+
}
201+
if let Some(re) = raw_error {
202+
error_msg.push_str(&format!(" | Raw Error: {}", re));
203+
}
204+
205+
return Err(anyhow!(error_msg));
206+
}
207+
_ => {
208+
stderr
209+
.write_all(
210+
format!("Ignoring unknown SSE event: {}\n", event).as_bytes(),
211+
)
212+
.await?;
213+
stderr.flush().await?;
214+
}
215+
}
216+
}
217+
}
218+
}
219+
Err(anyhow!(
220+
"Stream ended unexpectedly without a final result or error event."
221+
))
222+
} else {
223+
let result: Value = resp.json().await?;
224+
let pretty_result = match result.get("results") {
225+
Some(result_obj) => serde_json::to_string_pretty(result_obj)?,
226+
None => return Err(anyhow!("Invalid non-streaming response structure")),
227+
};
228+
Ok(pretty_result)
229+
}
146230
}

0 commit comments

Comments
 (0)