33#include < iostream>
44#include < sstream>
55#include < string>
6+ #include " utils/json_helper.h"
67#include " utils/logging_utils.h"
78namespace remote_engine {
89namespace {
@@ -14,8 +15,138 @@ constexpr const int kFileLoggerOption = 0;
1415bool is_anthropic (const std::string& model) {
1516 return model.find (" claude" ) != std::string::npos;
1617}
18+
19+ struct AnthropicChunk {
20+ std::string type;
21+ std::string id;
22+ int index;
23+ std::string msg;
24+ std::string model;
25+ std::string stop_reason;
26+ bool should_ignore = false ;
27+
28+ AnthropicChunk (const std::string& str) {
29+ if (str.size () > 6 ) {
30+ std::string s = str.substr (6 );
31+ try {
32+ auto root = json_helper::ParseJsonString (s);
33+ type = root[" type" ].asString ();
34+ if (type == " message_start" ) {
35+ id = root[" message" ][" id" ].asString ();
36+ model = root[" message" ][" model" ].asString ();
37+ } else if (type == " content_block_delta" ) {
38+ index = root[" index" ].asInt ();
39+ if (root[" delta" ][" type" ].asString () == " text_delta" ) {
40+ msg = root[" delta" ][" text" ].asString ();
41+ }
42+ } else if (type == " message_delta" ) {
43+ stop_reason = root[" delta" ][" stop_reason" ].asString ();
44+ } else {
45+ // ignore other messages
46+ should_ignore = true ;
47+ }
48+ } catch (const std::exception& e) {
49+ should_ignore = true ;
50+ CTL_WRN (" JSON parse error: " << e.what ());
51+ }
52+ } else {
53+ should_ignore = true ;
54+ }
55+ }
56+
57+ std::string ToOpenAiFormatString () {
58+ Json::Value root;
59+ root[" id" ] = id;
60+ root[" object" ] = " chat.completion.chunk" ;
61+ root[" created" ] = Json::Value ();
62+ root[" model" ] = model;
63+ root[" system_fingerprint" ] = " fp_e76890f0c3" ;
64+ Json::Value choices (Json::arrayValue);
65+ Json::Value choice;
66+ Json::Value content;
67+ choice[" index" ] = 0 ;
68+ content[" content" ] = msg;
69+ if (type == " message_start" ) {
70+ content[" role" ] = " assistant" ;
71+ content[" refusal" ] = Json::Value ();
72+ }
73+ choice[" delta" ] = content;
74+ choice[" finish_reason" ] = stop_reason.empty () ? Json::Value () : stop_reason;
75+ choices.append (choice);
76+ root[" choices" ] = choices;
77+ return " data: " + json_helper::DumpJsonString (root);
78+ }
79+ };
80+
1781} // namespace
1882
83+ size_t StreamWriteCallback (char * ptr, size_t size, size_t nmemb,
84+ void * userdata) {
85+ auto * context = static_cast <StreamContext*>(userdata);
86+ std::string chunk (ptr, size * nmemb);
87+
88+ context->buffer += chunk;
89+
90+ // Process complete lines
91+ size_t pos;
92+ while ((pos = context->buffer .find (' \n ' )) != std::string::npos) {
93+ std::string line = context->buffer .substr (0 , pos);
94+ context->buffer = context->buffer .substr (pos + 1 );
95+ CTL_TRC (line);
96+
97+ // Skip empty lines
98+ if (line.empty () || line == " \r " ||
99+ line.find (" event:" ) != std::string::npos)
100+ continue ;
101+
102+ // Remove "data: " prefix if present
103+ // if (line.substr(0, 6) == "data: ")
104+ // {
105+ // line = line.substr(6);
106+ // }
107+
108+ // Skip [DONE] message
109+ // std::cout << line << std::endl;
110+ if (line == " data: [DONE]" ||
111+ line.find (" message_stop" ) != std::string::npos) {
112+ Json::Value status;
113+ status[" is_done" ] = true ;
114+ status[" has_error" ] = false ;
115+ status[" is_stream" ] = true ;
116+ status[" status_code" ] = 200 ;
117+ (*context->callback )(std::move (status), Json::Value ());
118+ break ;
119+ }
120+
121+ // Parse the JSON
122+ Json::Value chunk_json;
123+ if (is_anthropic (context->model )) {
124+ AnthropicChunk ac (line);
125+ if (ac.should_ignore )
126+ continue ;
127+ ac.model = context->model ;
128+ if (ac.type == " message_start" ) {
129+ context->id = ac.id ;
130+ } else {
131+ ac.id = context->id ;
132+ }
133+ chunk_json[" data" ] = ac.ToOpenAiFormatString () + " \n\n " ;
134+ } else {
135+ chunk_json[" data" ] = line + " \n\n " ;
136+ }
137+ Json::Reader reader;
138+
139+ Json::Value status;
140+ status[" is_done" ] = false ;
141+ status[" has_error" ] = false ;
142+ status[" is_stream" ] = true ;
143+ status[" status_code" ] = 200 ;
144+ (*context->callback )(std::move (status), std::move (chunk_json));
145+ }
146+
147+ return size * nmemb;
148+ }
149+
19150CurlResponse RemoteEngine::MakeStreamingChatCompletionRequest (
20151 const ModelConfig& config, const std::string& body,
21152 const std::function<void (Json::Value&&, Json::Value&&)>& callback) {
@@ -37,6 +168,11 @@ CurlResponse RemoteEngine::MakeStreamingChatCompletionRequest(
37168 headers = curl_slist_append (headers, api_key_template_.c_str ());
38169 }
39170
171+ if (is_anthropic (config.model )) {
172+ std::string v = " anthropic-version: " + config.version ;
173+ headers = curl_slist_append (headers, v.c_str ());
174+ }
175+
40176 headers = curl_slist_append (headers, " Content-Type: application/json" );
41177 headers = curl_slist_append (headers, " Accept: text/event-stream" );
42178 headers = curl_slist_append (headers, " Cache-Control: no-cache" );
@@ -45,7 +181,7 @@ CurlResponse RemoteEngine::MakeStreamingChatCompletionRequest(
45181 StreamContext context{
46182 std::make_shared<std::function<void (Json::Value&&, Json::Value&&)>>(
47183 callback),
48- " " };
184+ " " , " " , config. model };
49185
50186 curl_easy_setopt (curl, CURLOPT_URL, full_url.c_str ());
51187 curl_easy_setopt (curl, CURLOPT_HTTPHEADER, headers);
@@ -249,6 +385,7 @@ bool RemoteEngine::LoadModelConfig(const std::string& model,
249385 std::unique_lock lock (models_mutex_);
250386 models_[model] = std::move (model_config);
251387 }
388+ CTL_DBG (" LoadModelConfig successfully: " << model << " , " << yaml_path);
252389
253390 return true ;
254391 } catch (const YAML::Exception& e) {
@@ -339,6 +476,7 @@ void RemoteEngine::LoadModel(
339476 status[" is_stream" ] = false ;
340477 status[" status_code" ] = k200OK;
341478 callback (std::move (status), std::move (response));
479+ CTL_INF (" Model loaded successfully: " << model);
342480}
343481
344482void RemoteEngine::UnloadModel (
0 commit comments