Skip to content

Commit d2123eb

Browse files
llama-router: fix SSE streaming termination and use-after-free
- Fix use-after-free: capture request data by value (path, method, body) instead of by-reference to avoid stack variable invalidation when proxy_request() returns while upstream thread still running - Use shared_ptr<httplib::Client> to ensure lifetime during async ops - Fix streaming termination: explicitly call sink.done() when upstream completes to signal httplib connection closure (fixes infinite hang) - Add unlock() before all provider returns to prevent mutex deadlock - Handle spurious wakeups: pause and retry when queue temporarily empty
1 parent 3d14f32 commit d2123eb

File tree

1 file changed

+41
-14
lines changed

1 file changed

+41
-14
lines changed

tools/router/router-proxy.cpp

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <cpp-httplib/httplib.h>
77

88
#include <condition_variable>
9+
#include <memory>
910
#include <mutex>
1011
#include <queue>
1112
#include <thread>
@@ -47,14 +48,16 @@ bool proxy_request(const httplib::Request & req,
4748
}
4849

4950
LOG_INF("Proxying %s %s to upstream %s\n", req.method.c_str(), req.path.c_str(), upstream_base.c_str());
50-
httplib::Client client(upstream_base.c_str());
51-
client.set_connection_timeout(opts.connection_timeout_s, 0);
52-
client.set_read_timeout(opts.read_timeout_s, 0);
51+
auto client = std::make_shared<httplib::Client>(upstream_base.c_str());
52+
client->set_connection_timeout(opts.connection_timeout_s, 0);
53+
client->set_read_timeout(opts.read_timeout_s, 0);
5354

5455
httplib::Headers headers = req.headers;
5556
headers.erase("Host");
5657

57-
const std::string path = !req.target.empty() ? req.target : req.path;
58+
const std::string path = !req.target.empty() ? req.target : req.path;
59+
const std::string method = req.method;
60+
const std::string request_body = req.body;
5861

5962
if (!matches_any_endpoint(path, proxy_endpoints)) {
6063
LOG_WRN("Request %s not proxied because it does not match configured endpoints\n", path.c_str());
@@ -63,7 +66,7 @@ bool proxy_request(const httplib::Request & req,
6366
return false;
6467
}
6568

66-
std::string content_type = req.get_header_value("Content-Type", "application/json");
69+
const std::string content_type = req.get_header_value("Content-Type", "application/json");
6770

6871
const auto accept_header = req.get_header_value("Accept");
6972
const bool wants_stream = accept_header.find("text/event-stream") != std::string::npos ||
@@ -93,9 +96,17 @@ bool proxy_request(const httplib::Request & req,
9396
return true;
9497
};
9598

96-
auto upstream_thread = std::make_shared<std::thread>([&, state_ptr]() {
97-
if (req.method == "POST") {
98-
result = client.Post(path.c_str(), headers, req.body, content_type.c_str(), content_receiver);
99+
auto upstream_thread = std::make_shared<std::thread>([state_ptr,
100+
client,
101+
path,
102+
headers,
103+
content_type,
104+
method,
105+
request_body,
106+
content_receiver]() {
107+
httplib::Result result;
108+
if (method == "POST") {
109+
result = client->Post(path.c_str(), headers, request_body, content_type.c_str(), content_receiver);
99110
if (result) {
100111
std::lock_guard<std::mutex> lock(state_ptr->mutex);
101112
state_ptr->status = result->status;
@@ -112,7 +123,7 @@ bool proxy_request(const httplib::Request & req,
112123
state_ptr->content_type = upstream.get_header_value("Content-Type", "text/event-stream");
113124
return true;
114125
};
115-
result = client.Get(path.c_str(), headers, response_handler, content_receiver);
126+
result = client->Get(path.c_str(), headers, response_handler, content_receiver);
116127
}
117128

118129
std::lock_guard<std::mutex> lock(state_ptr->mutex);
@@ -134,20 +145,36 @@ bool proxy_request(const httplib::Request & req,
134145
state_ptr->cv.wait(lock, [&] { return !state_ptr->chunks.empty() || state_ptr->done; });
135146

136147
if (!state_ptr->chunks.empty()) {
148+
149+
// Chunks available: send next chunk to client
137150
auto chunk = std::move(state_ptr->chunks.front());
138151
state_ptr->chunks.pop();
139152
if (!state_ptr->upstream_headers.empty()) {
153+
154+
// Apply response headers on first chunk
140155
res.status = state_ptr->status;
141156
res.reason = state_ptr->reason;
142157
copy_response_headers(state_ptr->upstream_headers, res);
143158
state_ptr->upstream_headers.clear();
144159
res.set_header("Content-Type", state_ptr->content_type);
145160
}
146161
lock.unlock();
162+
163+
// sink.write() returns true -> provider continues immediately
147164
return sink.write(chunk.data(), chunk.size());
148165
}
149166

150-
return state_ptr->done;
167+
// No chunks available: determine if stream should continue or terminate
168+
if (state_ptr->done) {
169+
// Upstream finished and all chunks have been sent
170+
lock.unlock();
171+
sink.done(); // Explicitly signal stream completion to httplib
172+
return false; // Stop provider -> httplib closes connection gracefully
173+
}
174+
175+
// Spurious wakeup or transient empty queue: upstream still processing
176+
lock.unlock();
177+
return false; // Pause provider -> httplib retries after timeout/new data
151178
},
152179
[state_ptr, upstream_thread](bool) {
153180
(void) state_ptr;
@@ -159,14 +186,14 @@ bool proxy_request(const httplib::Request & req,
159186
return true;
160187
}
161188

162-
if (req.method == "POST") {
163-
result = client.Post(path.c_str(), headers, req.body, content_type.c_str());
189+
if (method == "POST") {
190+
result = client->Post(path.c_str(), headers, request_body, content_type.c_str());
164191
} else {
165-
result = client.Get(path.c_str(), headers);
192+
result = client->Get(path.c_str(), headers);
166193
}
167194

168195
if (!result) {
169-
LOG_ERR("Upstream %s unavailable for %s %s\n", upstream_base.c_str(), req.method.c_str(), path.c_str());
196+
LOG_ERR("Upstream %s unavailable for %s %s\n", upstream_base.c_str(), method.c_str(), path.c_str());
170197
res.status = 502;
171198
res.set_content("{\"error\":\"upstream unavailable\"}", "application/json");
172199
return false;

0 commit comments

Comments
 (0)