Skip to content

Commit 8644a21

Browse files
router : add real-time model swap notifications via SSE
Implement notification sink to stream lifecycle events during model swaps. Notifications sent via delta.reasoning_content (OpenAI-compatible) Progress events emitted during ensure_running() - Unloading previous model(s) - Loading new model - Backend ready confirmation Refactor proxy_request() to handle ensure_running() with optional sink attachment for streaming feedback { "router": { "notify_model_swap": true } }
1 parent c5f5589 commit 8644a21

File tree

7 files changed

+173
-102
lines changed

7 files changed

+173
-102
lines changed

tools/router/router-app.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ bool RouterApp::ensure_running(const std::string & model_name, std::string & err
5858
target_group.c_str());
5959

6060
terminate_process(it_proc->second);
61+
notify_progress("[llama-router] Unloading " + it_proc->first + " (" + running_group + ")\n");
6162
wait_for_process_exit(it_proc->second, ROUTER_PROCESS_SHUTDOWN_TIMEOUT_MS);
6263
model_ports.erase(it_proc->first);
6364
it_proc = processes.erase(it_proc);
@@ -102,6 +103,8 @@ bool RouterApp::ensure_running(const std::string & model_name, std::string & err
102103
last_spawned_model = model_name;
103104
LOG_INF("Spawned %s (group '%s') with %zu args\n", model_name.c_str(), target_group.c_str(), command.size());
104105

106+
notify_progress("[llama-router] Loading " + model_name + " (" + target_group + ")\n");
107+
105108
const std::string health_endpoint = spawn_cfg.health_endpoint.empty() ? "/health" : spawn_cfg.health_endpoint;
106109
if (!wait_for_backend_ready(port, health_endpoint, ROUTER_BACKEND_READY_TIMEOUT_MS, &proc_it->second)) {
107110
error = "backend not ready";
@@ -116,6 +119,7 @@ bool RouterApp::ensure_running(const std::string & model_name, std::string & err
116119
}
117120

118121
LOG_INF("Backend ready on port %d\n", port);
122+
notify_progress("[llama-router] Backend ready, generating response...\n");
119123
return true;
120124
}
121125

@@ -162,3 +166,20 @@ void RouterApp::stop_all() {
162166
}
163167
processes.clear();
164168
}
169+
170+
void RouterApp::set_notification_sink(NotificationSink sink) {
171+
std::lock_guard<std::mutex> lock(notification_mutex);
172+
notification_sink = std::move(sink);
173+
}
174+
175+
void RouterApp::clear_notification_sink() {
176+
std::lock_guard<std::mutex> lock(notification_mutex);
177+
notification_sink.reset();
178+
}
179+
180+
void RouterApp::notify_progress(const std::string & message) {
181+
std::lock_guard<std::mutex> lock(notification_mutex);
182+
if (notification_sink) {
183+
(*notification_sink)(ProgressNotification{message});
184+
}
185+
}

tools/router/router-app.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
#include <atomic>
77
#include <mutex>
8+
#include <optional>
89
#include <string>
910
#include <unordered_map>
11+
#include <vector>
1012

1113
class RouterApp {
1214
public:
@@ -20,16 +22,22 @@ class RouterApp {
2022
void stop_all();
2123
void update_config(RouterConfig cfg);
2224

25+
void set_notification_sink(NotificationSink sink);
26+
void clear_notification_sink();
27+
2328
const RouterConfig & get_config() const { return config; }
2429

2530
private:
2631
RouterConfig config;
2732
std::atomic<int> next_port;
2833
std::mutex mutex;
34+
std::optional<NotificationSink> notification_sink;
35+
std::mutex notification_mutex;
2936
std::unordered_map<std::string, ModelConfig> model_lookup;
3037
std::unordered_map<std::string, ProcessHandle> processes;
3138
std::unordered_map<std::string, int> model_ports;
3239
std::string last_spawned_model;
3340

3441
SpawnConfig resolve_spawn_config(const ModelConfig & cfg) const;
42+
void notify_progress(const std::string & message);
3543
};

tools/router/router-config.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ const RouterOptions & get_default_router_options() {
135135
/*connection_timeout_s =*/ 5,
136136
/*read_timeout_s =*/ 600,
137137
/*admin_token =*/ "",
138+
/*notify_model_swap =*/ false,
138139
};
139140

140141
return opts;
@@ -192,6 +193,10 @@ void write_config_file(const RouterConfig & cfg, const std::string & path) {
192193
out["router"]["admin_token"] = cfg.router.admin_token;
193194
}
194195

196+
if (cfg.router.notify_model_swap) {
197+
out["router"]["notify_model_swap"] = true;
198+
}
199+
195200
out["models"] = json::array();
196201
for (const auto & m : cfg.models) {
197202
json obj;
@@ -316,6 +321,7 @@ RouterConfig load_config(const std::string & path) {
316321
if (r.contains("connection_timeout_s")) cfg.router.connection_timeout_s = r["connection_timeout_s"].get<int>();
317322
if (r.contains("read_timeout_s")) cfg.router.read_timeout_s = r["read_timeout_s"].get<int>();
318323
if (r.contains("admin_token")) cfg.router.admin_token = r["admin_token"].get<std::string>();
324+
if (r.contains("notify_model_swap")) cfg.router.notify_model_swap = r["notify_model_swap"].get<bool>();
319325
}
320326
if (data.contains("models")) {
321327
for (const auto & m : data["models"]) {

tools/router/router-config.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
#pragma once
22

3+
#include <functional>
4+
#include <optional>
35
#include <string>
46
#include <vector>
57

8+
struct ProgressNotification {
9+
std::string message;
10+
};
11+
12+
using NotificationSink = std::function<void(const ProgressNotification &)>;
13+
614
struct SpawnConfig {
715
std::vector<std::string> command;
816
std::vector<std::string> proxy_endpoints;
@@ -28,6 +36,7 @@ struct RouterOptions {
2836
int connection_timeout_s = 5;
2937
int read_timeout_s = 600;
3038
std::string admin_token;
39+
bool notify_model_swap = false;
3140
};
3241

3342
struct RouterConfig {

tools/router/router-endpoints.cpp

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,9 @@ void register_routes(httplib::Server & server, RouterApp & app) {
4545
res.set_content("no models running", "text/plain");
4646
return;
4747
}
48-
49-
std::string error;
50-
if (!app.ensure_running(model, error)) {
51-
LOG_WRN("Failed to ensure last spawned model %s: %s\n", model.c_str(), error.c_str());
52-
res.status = 503;
53-
res.set_content("no models running", "text/plain");
54-
return;
55-
}
5648
LOG_INF("Proxying %s to last spawned model %s\n", req.path.c_str(), model.c_str());
5749
const auto spawn_cfg = app.get_spawn_config(model);
58-
proxy_request(req, res, app.upstream_for(model), app.get_config().router, spawn_cfg.proxy_endpoints);
50+
proxy_request(req, res, app, model, spawn_cfg.proxy_endpoints);
5951
};
6052

6153
server.Get("/props", proxy_last_spawned);
@@ -68,17 +60,10 @@ void register_routes(httplib::Server & server, RouterApp & app) {
6860
std::string model_name = model_it != req.matches.end() ? model_it->str() : std::string();
6961
++model_it;
7062
const std::string endpoint_suffix = model_it != req.matches.end() ? model_it->str() : std::string();
71-
std::string error;
72-
if (!app.ensure_running(model_name, error)) {
73-
LOG_WRN("Model %s unavailable: %s\n", model_name.c_str(), error.c_str());
74-
res.status = 404;
75-
res.set_content("{\"error\":\"model unavailable\"}", "application/json");
76-
return;
77-
}
7863
LOG_INF("Proxying %s for model %s\n", req.path.c_str(), model_name.c_str());
7964
const auto spawn_cfg = app.get_spawn_config(model_name);
8065
const std::string corrected_path = "/" + endpoint_suffix;
81-
proxy_request(req, res, app.upstream_for(model_name), app.get_config().router, spawn_cfg.proxy_endpoints, corrected_path);
66+
proxy_request(req, res, app, model_name, spawn_cfg.proxy_endpoints, corrected_path);
8267
});
8368

8469
server.Post("/v1/chat/completions", [&app](const httplib::Request & req, httplib::Response & res) {
@@ -90,17 +75,9 @@ void register_routes(httplib::Server & server, RouterApp & app) {
9075
return;
9176
}
9277

93-
std::string error;
94-
if (!app.ensure_running(model, error)) {
95-
LOG_WRN("Model %s not available: %s\n", model.c_str(), error.c_str());
96-
res.status = 404;
97-
res.set_content("{\"error\":\"" + error + "\"}", "application/json");
98-
return;
99-
}
100-
10178
LOG_INF("Proxying chat completion for model %s\n", model.c_str());
10279
const auto spawn_cfg = app.get_spawn_config(model);
103-
proxy_request(req, res, app.upstream_for(model), app.get_config().router, spawn_cfg.proxy_endpoints);
80+
proxy_request(req, res, app, model, spawn_cfg.proxy_endpoints);
10481
});
10582

10683
server.set_error_handler([](const httplib::Request &, httplib::Response & res) {

0 commit comments

Comments
 (0)