Skip to content

Commit 60bca9c

Browse files
committed
Squashed commit of the following:
commit 98c4a8d Author: Mason M <[email protected]> Date: Wed Feb 19 11:05:18 2025 -0400 Refactor MCP transport callback mechanism commit 2b3d1f6 Author: Mason M <[email protected]> Date: Tue Feb 18 18:01:54 2025 -0400 add message_to_json function commit 3c7ae27 Author: Mason M <[email protected]> Date: Tue Feb 18 15:32:26 2025 -0400 Implement send routine commit 9cec1e0 Author: Mason M <[email protected]> Date: Tue Feb 18 10:12:17 2025 -0400 Fix include paths commit b5642f0 Author: Mason M <[email protected]> Date: Tue Feb 18 09:56:52 2025 -0400 Use log API commit 7a83b2b Author: Mason M <[email protected]> Date: Mon Feb 17 19:32:48 2025 -0400 Fix build errors commit cc7fd66 Author: Mason M <[email protected]> Date: Mon Feb 17 19:03:43 2025 -0400 Use condition variable to wait for endpoint event commit 73ccdd1 Author: Mason M <[email protected]> Date: Mon Feb 17 17:18:09 2025 -0400 Process SSE data asynchronously commit e9c37a3 Author: Mason M <[email protected]> Date: Mon Feb 17 14:01:56 2025 -0400 Add keep-alive header to sse handler commit 57f84e6 Author: Mason M <[email protected]> Date: Mon Feb 17 13:37:59 2025 -0400 Add methods for handling endpoint/message events commit 5c160f6 Author: Mason M <[email protected]> Date: Mon Feb 17 13:25:18 2025 -0400 Process sse values commit f51b493 Author: Mason M <[email protected]> Date: Mon Feb 17 13:07:38 2025 -0400 Clean up sse_read algorithm commit 29d6875 Author: Mason M <[email protected]> Date: Sun Feb 16 19:04:39 2025 -0400 WIP: implementing SSE protocol
1 parent 7b93c31 commit 60bca9c

File tree

10 files changed

+356
-22
lines changed

10 files changed

+356
-22
lines changed

common/toolcall/handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
#include "../json.hpp"
2+
#include <json.hpp>
33
#include "handler.hpp"
44
#include "params.hpp"
55

common/toolcall/handler.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "../json.hpp"
3+
#include <json.hpp> // TODO: remove dependence on this
44
#include "params.hpp" // TODO: make foreward decl.
55
#include <string>
66
#include <variant>

common/toolcall/mcp_messages.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,19 @@ void mcp::tools_list_response::refreshResult() {
256256

257257
this->result(result);
258258
}
259+
260+
static bool has_initialized_response(const nlohmann::json & data) {
261+
return data["result"].contains("serverInfo");
262+
}
263+
264+
bool mcp::create_message(const std::string & data, mcp::message_variant & message) {
265+
json j = json::parse(data);
266+
267+
if (has_initialized_response(j)) {
268+
message = mcp::initialize_response::fromJson(j);
269+
270+
} else {
271+
return false;
272+
}
273+
return true;
274+
}

common/toolcall/mcp_messages.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include <optional>
33
#include <vector>
44
#include <variant>
5-
#include "../json.hpp"
5+
#include <json.hpp>
66

77
namespace mcp
88
{
@@ -214,7 +214,13 @@ namespace mcp
214214
std::string next_cursor_;
215215
};
216216

217-
using message_variant = std::variant<
218-
initialize_request, initialize_response, initialized_notification,
219-
tools_list_request, tools_list_response>;
217+
using message_variant =
218+
std::variant<std::monostate,
219+
initialize_request,
220+
initialize_response,
221+
initialized_notification,
222+
tools_list_request,
223+
tools_list_response>;
224+
225+
bool create_message(const std::string & data, message_variant & message);
220226
}
Lines changed: 246 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,260 @@
11

2+
#include <iostream>
3+
#include <sstream>
24
#include "mcp_sse_transport.hpp"
5+
#include <log.h>
6+
#include <chrono>
7+
8+
toolcall::mcp_sse_transport::~mcp_sse_transport() {
9+
if (endpoint_headers_) {
10+
curl_slist_free_all(endpoint_headers_);
11+
}
12+
if (endpoint_) {
13+
curl_easy_cleanup(endpoint_);
14+
}
15+
}
316

417
toolcall::mcp_sse_transport::mcp_sse_transport(std::string server_uri)
5-
: server_uri_(std::move(server_uri))
18+
: server_uri_(std::move(server_uri)),
19+
running_(false),
20+
sse_thread_(),
21+
endpoint_(nullptr),
22+
endpoint_headers_(nullptr),
23+
endpoint_errbuf_(CURL_ERROR_SIZE),
24+
event_{"", "", ""},
25+
sse_buffer_(""),
26+
sse_cursor_(0),
27+
sse_last_id_(""),
28+
initializing_mutex_(),
29+
initializing_()
630
{
31+
curl_global_init(CURL_GLOBAL_DEFAULT);
732
}
833

934
void toolcall::mcp_sse_transport::start() {
35+
if (running_) return;
36+
running_ = true;
37+
38+
std::unique_lock<std::mutex> lock(initializing_mutex_);
39+
sse_thread_ = std::thread(&toolcall::mcp_sse_transport::sse_run, this);
40+
initializing_.wait(lock);
41+
42+
if (endpoint_ == nullptr) {
43+
running_ = false;
44+
LOG_ERR("SSE: Connection to \"%s\" failed", server_uri_.c_str());
45+
throw std::runtime_error("Connection to \"" + server_uri_ + "\" failed");
46+
}
1047
}
1148

1249
void toolcall::mcp_sse_transport::stop() {
50+
running_ = false;
51+
}
52+
53+
bool toolcall::mcp_sse_transport::send(const std::string & request_json) {
54+
if (! running_ || endpoint_ == nullptr) {
55+
return false;
56+
}
57+
58+
curl_easy_setopt(endpoint_, CURLOPT_POSTFIELDS, request_json.c_str());
59+
60+
CURLcode code = curl_easy_perform(endpoint_);
61+
if (code != CURLE_OK) {
62+
size_t len = strlen(&endpoint_errbuf_[0]);
63+
LOG_ERR("%s", (len > 0 ? &endpoint_errbuf_[0] : curl_easy_strerror(code)));
64+
return false;
65+
}
66+
return true;
67+
}
68+
69+
static size_t sse_callback(char * data, size_t size, size_t nmemb, void * clientp) {
70+
auto transport = static_cast<toolcall::mcp_sse_transport*>(clientp);
71+
size_t len = size * nmemb;
72+
return transport->sse_read(data, len);
1373
}
1474

15-
bool toolcall::mcp_sse_transport::send(const mcp::message_variant & /*request*/) {
16-
return false;
75+
void toolcall::mcp_sse_transport::parse_field_value(std::string field, std::string value) {
76+
if (field == "event") {
77+
// Set the event type buffer to field value.
78+
event_.type = std::move(value);
79+
80+
} else if (field == "data") {
81+
// Append the field value to the data buffer,
82+
// then append a single U+000A LINE FEED (LF)
83+
// character to the data buffer.
84+
value += '\n';
85+
event_.data.insert(event_.data.end(), value.begin(), value.end());
86+
87+
} else if (field == "id") {
88+
// If the field value does not contain U+0000 NULL,
89+
// then set the last event ID buffer to the field value.
90+
// Otherwise, ignore the field.
91+
if (! value.empty()) {
92+
event_.id = std::move(value);
93+
}
94+
95+
} else if (field == "retry") {
96+
// If the field value consists of only ASCII digits,
97+
// then interpret the field value as an integer in base
98+
// ten, and set the event stream's reconnection time to
99+
// that integer. Otherwise, ignore the field.
100+
101+
LOG_INF("SSE: Retry field is not currently implemented");
102+
103+
} else {
104+
LOG_WRN("SSE: Unsupported field \"%s\" received", field.c_str());
105+
}
106+
}
107+
108+
void toolcall::mcp_sse_transport::on_endpoint_event() {
109+
endpoint_ = curl_easy_init();
110+
if (! endpoint_) {
111+
LOG_ERR("SSE: Failed to create endpoint handle");
112+
running_ = false;
113+
return;
114+
}
115+
116+
curl_easy_setopt(endpoint_, CURLOPT_URL, event_.data.c_str());
117+
118+
endpoint_headers_ =
119+
curl_slist_append(endpoint_headers_, "Content-Type: application/json");
120+
curl_slist_append(endpoint_headers_, "Connection: keep-alive");
121+
curl_easy_setopt(endpoint_, CURLOPT_HTTPHEADER, endpoint_headers_);
122+
curl_easy_setopt(endpoint_, CURLOPT_ERRORBUFFER, &endpoint_errbuf_[0]);
123+
124+
// Later calls to send will reuse the endpoint_ handle
125+
}
126+
127+
void toolcall::mcp_sse_transport::on_message_event() {
128+
mcp::message_variant message;
129+
if (mcp::create_message(event_.data, message)) {
130+
notify_if<mcp::initialize_response>(message);
131+
notify_if<mcp::tools_list_response>(message);
132+
}
133+
}
134+
135+
size_t toolcall::mcp_sse_transport::sse_read(const char * data, size_t len) {
136+
sse_buffer_.insert(sse_buffer_.end(), data, data + len);
137+
138+
for (; sse_cursor_ < sse_buffer_.length(); ++sse_cursor_) {
139+
if (sse_buffer_[sse_cursor_] == '\r' || sse_buffer_[sse_cursor_] == '\n') {
140+
auto last = sse_buffer_.begin() + sse_cursor_;
141+
142+
std::string line(sse_buffer_.begin(), last);
143+
if (line.empty()) { // Dispatch event
144+
if (event_.type == "endpoint") {
145+
on_endpoint_event();
146+
147+
} else if(event_.type == "message") {
148+
on_message_event();
149+
150+
} else {
151+
LOG_WRN("SSE: Unsupported event \"%s\" received", event_.type.c_str());
152+
}
153+
154+
sse_last_id_ = event_.id;
155+
event_ = {"", "", ""};
156+
157+
} else if(line[0] != ':') { // : denotes a comment
158+
// Set field/value
159+
auto sep_index = line.find(':');
160+
if (sep_index != std::string::npos) {
161+
auto sep_i = line.begin() + sep_index;
162+
163+
std::string field (line.begin(), sep_i);
164+
std::string value (sep_i + 1, line.end());
165+
166+
parse_field_value(std::move(field), std::move(value));
167+
}
168+
}
169+
170+
if (last++ != sse_buffer_.end()) { // Consume line-end
171+
if (*last == '\n') {
172+
last ++; // In the CRLF case consume one more
173+
}
174+
sse_buffer_ = std::string(last, sse_buffer_.end());
175+
176+
} else {
177+
sse_buffer_.clear();
178+
}
179+
sse_cursor_ = 0; // Prepare to scan for next line-end
180+
}
181+
}
182+
return len;
183+
}
184+
185+
void toolcall::mcp_sse_transport::sse_run() {
186+
std::unique_lock<std::mutex> lock(initializing_mutex_);
187+
char errbuf[CURL_ERROR_SIZE];
188+
size_t errlen;
189+
CURLMcode mcode;
190+
int num_handles;
191+
struct CURLMsg * m;
192+
int msgs_in_queue = 0;
193+
CURLM * async_handle = nullptr;
194+
struct curl_slist * headers = nullptr;
195+
CURL * sse = nullptr;
196+
197+
sse = curl_easy_init();
198+
if (! sse) {
199+
LOG_ERR("SSE: Failed to initialize handle");
200+
goto cleanup;
201+
}
202+
203+
headers = curl_slist_append(headers, "Connection: keep-alive");
204+
205+
curl_easy_setopt(sse, CURLOPT_HTTPHEADER, headers);
206+
curl_easy_setopt(sse, CURLOPT_ERRORBUFFER, errbuf);
207+
curl_easy_setopt(sse, CURLOPT_URL, server_uri_.c_str());
208+
curl_easy_setopt(sse, CURLOPT_TCP_KEEPALIVE, 1L);
209+
curl_easy_setopt(sse, CURLOPT_WRITEFUNCTION, sse_callback);
210+
curl_easy_setopt(sse, CURLOPT_WRITEDATA, this);
211+
212+
async_handle = curl_multi_init();
213+
if (! async_handle) {
214+
LOG_ERR("SSE: Failed to initialize async handle");
215+
goto cleanup;
216+
}
217+
curl_multi_add_handle(async_handle, sse);
218+
219+
do {
220+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
221+
222+
mcode = curl_multi_perform(async_handle, &num_handles);
223+
if (mcode != CURLM_OK) {
224+
LOG_ERR("SSE: %s", curl_multi_strerror(mcode));
225+
break;
226+
}
227+
while ((m = curl_multi_info_read(async_handle, &msgs_in_queue)) != nullptr) {
228+
if (m->msg == CURLMSG_DONE) {
229+
if (m->data.result != CURLE_OK) {
230+
errlen = strlen(errbuf);
231+
if (errlen) {
232+
LOG_ERR("SSE: %s", errbuf);
233+
234+
} else {
235+
LOG_ERR("SSE: %s", curl_easy_strerror(m->data.result));
236+
}
237+
running_ = false;
238+
break;
239+
}
240+
}
241+
}
242+
if (endpoint_ && lock.owns_lock()) { // TODO: timeout if endpoint not received
243+
lock.unlock();
244+
initializing_.notify_one();
245+
}
246+
247+
} while (running_);
248+
249+
cleanup:
250+
if (headers) {
251+
curl_slist_free_all(headers);
252+
}
253+
if (async_handle) {
254+
curl_multi_remove_handle(async_handle, sse);
255+
curl_multi_cleanup(async_handle);
256+
}
257+
if (sse) {
258+
curl_easy_cleanup(sse);
259+
}
17260
}
Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,49 @@
11
#pragma once
22

33
#include "mcp_transport.hpp"
4+
#include <condition_variable>
5+
#include <mutex>
6+
#include <thread>
7+
#include <curl/curl.h>
48

59
namespace toolcall
610
{
711
class mcp_sse_transport : public mcp_transport {
812
public:
13+
~mcp_sse_transport();
14+
915
mcp_sse_transport(std::string server_uri);
1016

1117
virtual void start() override;
1218
virtual void stop() override;
13-
virtual bool send(const mcp::message_variant & request) override;
19+
virtual bool send(const std::string & request_json) override;
20+
21+
size_t sse_read(const char * data, size_t len);
1422

1523
private:
24+
void sse_run();
25+
void parse_field_value(std::string field, std::string value);
26+
void on_endpoint_event();
27+
void on_message_event();
28+
1629
std::string server_uri_;
30+
bool running_;
31+
std::thread sse_thread_;
32+
CURL * endpoint_;
33+
struct curl_slist * endpoint_headers_;
34+
std::vector<char> endpoint_errbuf_;
35+
36+
struct sse_event {
37+
std::string type;
38+
std::string data;
39+
std::string id;
40+
} event_;
41+
42+
std::string sse_buffer_;
43+
size_t sse_cursor_;
44+
std::string sse_last_id_;
45+
46+
std::mutex initializing_mutex_;
47+
std::condition_variable initializing_;
1748
};
1849
}

common/toolcall/mcp_stdio_transport.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ toolcall::mcp_stdio_transport::mcp_stdio_transport(std::vector<std::string> argv
1616
throw std::logic_error(std::string("Function not implemented: ") + __func__);
1717
}
1818

19-
[[noreturn]] bool toolcall::mcp_stdio_transport::send(const mcp::message_variant & /*request*/) {
19+
[[noreturn]] bool toolcall::mcp_stdio_transport::send(const std::string & /*request_json*/) {
2020
throw std::logic_error(std::string("Function not implemented: ") + __func__);
2121
}

common/toolcall/mcp_stdio_transport.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace toolcall
1313

1414
[[noreturn]] virtual void start() override;
1515
[[noreturn]] virtual void stop() override;
16-
[[noreturn]] virtual bool send(const mcp::message_variant & request) override;
16+
[[noreturn]] virtual bool send(const std::string & request_json) override;
1717

1818
private:
1919
std::vector<std::string> argv_;

0 commit comments

Comments
 (0)