Skip to content

Commit 5472e95

Browse files
committed
Add open_stream_direct() method and tests for true streaming functionality
1 parent cee57e5 commit 5472e95

File tree

2 files changed

+243
-0
lines changed

2 files changed

+243
-0
lines changed

httplib.h

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,7 @@ class ClientImpl {
14811481

14821482
// Mode 2: Socket direct (true streaming)
14831483
std::unique_ptr<ClientConnection> connection_; // Socket ownership
1484+
std::unique_ptr<Stream> socket_stream_; // SocketStream ownership
14841485
Stream *stream_ = nullptr; // Stream for reading
14851486
detail::BodyReader body_reader_; // Body reading state
14861487

@@ -1632,6 +1633,12 @@ class ClientImpl {
16321633
// Streaming API: Open a stream for reading response body incrementally
16331634
StreamHandle open_stream(const std::string &path);
16341635
StreamHandle open_stream(const std::string &path, const Headers &headers);
1636+
1637+
// True streaming API: Socket ownership transferred to StreamHandle
1638+
StreamHandle open_stream_direct(const std::string &path);
1639+
StreamHandle open_stream_direct(const std::string &path,
1640+
const Headers &headers);
1641+
16351642
bool send(Request &req, Response &res, Error &error);
16361643
Result send(const Request &req);
16371644

@@ -2004,6 +2011,12 @@ class Client {
20042011
ClientImpl::StreamHandle open_stream(const std::string &path);
20052012
ClientImpl::StreamHandle open_stream(const std::string &path,
20062013
const Headers &headers);
2014+
2015+
// True streaming API: Socket ownership transferred to StreamHandle
2016+
ClientImpl::StreamHandle open_stream_direct(const std::string &path);
2017+
ClientImpl::StreamHandle open_stream_direct(const std::string &path,
2018+
const Headers &headers);
2019+
20072020
bool send(Request &req, Response &res, Error &error);
20082021
Result send(const Request &req);
20092022

@@ -9155,6 +9168,121 @@ ClientImpl::open_stream(const std::string &path, const Headers &headers) {
91559168
return handle;
91569169
}
91579170

9171+
inline ClientImpl::StreamHandle
9172+
ClientImpl::open_stream_direct(const std::string &path) {
9173+
return open_stream_direct(path, Headers{});
9174+
}
9175+
9176+
inline ClientImpl::StreamHandle
9177+
ClientImpl::open_stream_direct(const std::string &path,
9178+
const Headers &headers) {
9179+
StreamHandle handle;
9180+
handle.response = detail::make_unique<Response>();
9181+
handle.error = Error::Success;
9182+
9183+
// Create socket connection
9184+
handle.connection_ = detail::make_unique<ClientConnection>();
9185+
9186+
{
9187+
std::lock_guard<std::mutex> guard(socket_mutex_);
9188+
9189+
// Check if existing socket is alive, if not create new one
9190+
auto is_alive = false;
9191+
if (socket_.is_open()) {
9192+
is_alive = detail::is_socket_alive(socket_.sock);
9193+
if (!is_alive) {
9194+
shutdown_ssl(socket_, false);
9195+
shutdown_socket(socket_);
9196+
close_socket(socket_);
9197+
}
9198+
}
9199+
9200+
if (!is_alive) {
9201+
if (!create_and_connect_socket(socket_, handle.error)) {
9202+
handle.response.reset();
9203+
return handle;
9204+
}
9205+
}
9206+
9207+
// Transfer socket ownership to StreamHandle
9208+
handle.connection_->sock = socket_.sock;
9209+
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
9210+
handle.connection_->ssl = socket_.ssl;
9211+
socket_.ssl = nullptr;
9212+
#endif
9213+
socket_.sock = INVALID_SOCKET;
9214+
}
9215+
9216+
// Create SocketStream for the transferred socket
9217+
handle.socket_stream_ = detail::make_unique<detail::SocketStream>(
9218+
handle.connection_->sock, read_timeout_sec_, read_timeout_usec_,
9219+
write_timeout_sec_, write_timeout_usec_);
9220+
handle.stream_ = handle.socket_stream_.get();
9221+
9222+
// Build and send request
9223+
Request req;
9224+
req.method = "GET";
9225+
req.path = path;
9226+
req.headers = headers;
9227+
9228+
// Add default headers
9229+
for (const auto &header : default_headers_) {
9230+
if (req.headers.find(header.first) == req.headers.end()) {
9231+
req.headers.insert(header);
9232+
}
9233+
}
9234+
9235+
// Add required headers before writing
9236+
if (req.headers.find("Host") == req.headers.end()) {
9237+
req.headers.emplace("Host", host_and_port_);
9238+
}
9239+
if (req.headers.find("Accept") == req.headers.end()) {
9240+
req.headers.emplace("Accept", "*/*");
9241+
}
9242+
if (req.headers.find("User-Agent") == req.headers.end()) {
9243+
req.headers.emplace("User-Agent", CPPHTTPLIB_VERSION);
9244+
}
9245+
9246+
// Write request line
9247+
auto &strm = *handle.stream_;
9248+
if (detail::write_request_line(strm, req.method, req.path) < 0) {
9249+
handle.error = Error::Write;
9250+
handle.response.reset();
9251+
return handle;
9252+
}
9253+
9254+
// Write headers
9255+
// Write headers
9256+
if (!detail::write_headers(strm, req.headers)) {
9257+
handle.error = Error::Write;
9258+
handle.response.reset();
9259+
return handle;
9260+
}
9261+
9262+
// Read response headers only (not body)
9263+
if (!read_response_line(strm, req, *handle.response) ||
9264+
!detail::read_headers(strm, handle.response->headers)) {
9265+
handle.error = Error::Read;
9266+
handle.response.reset();
9267+
return handle;
9268+
}
9269+
9270+
// Set up BodyReader based on response headers
9271+
handle.body_reader_.stream = handle.stream_;
9272+
9273+
auto content_length_str = handle.response->get_header_value("Content-Length");
9274+
if (!content_length_str.empty()) {
9275+
handle.body_reader_.content_length =
9276+
static_cast<size_t>(std::stoull(content_length_str));
9277+
}
9278+
9279+
auto transfer_encoding =
9280+
handle.response->get_header_value("Transfer-Encoding");
9281+
handle.body_reader_.chunked = (transfer_encoding == "chunked");
9282+
9283+
return handle;
9284+
}
9285+
91589286
inline bool ClientImpl::handle_request(Stream &strm, Request &req,
91599287
Response &res, bool close_connection,
91609288
Error &error) {
@@ -12375,6 +12503,15 @@ inline ClientImpl::StreamHandle Client::open_stream(const std::string &path,
1237512503
return cli_->open_stream(path, headers);
1237612504
}
1237712505

12506+
inline ClientImpl::StreamHandle
12507+
Client::open_stream_direct(const std::string &path) {
12508+
return cli_->open_stream_direct(path);
12509+
}
12510+
inline ClientImpl::StreamHandle
12511+
Client::open_stream_direct(const std::string &path, const Headers &headers) {
12512+
return cli_->open_stream_direct(path, headers);
12513+
}
12514+
1237812515
inline bool Client::send(Request &req, Response &res, Error &error) {
1237912516
return cli_->send(req, res, error);
1238012517
}

test/test20.cc

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,3 +791,109 @@ TEST_F(StreamHandleV2Test, ReadAllSocketDirect) {
791791
auto result = handle.read_all();
792792
EXPECT_EQ("Hello from socket!", result);
793793
}
794+
795+
// =============================================================================
796+
// Phase 2.5: open_stream_direct() Tests (True Streaming)
797+
// =============================================================================
798+
799+
class OpenStreamDirectTest : public ::testing::Test {
800+
protected:
801+
void SetUp() override {
802+
// Start test server
803+
svr_.Get("/hello", [](const httplib::Request &, httplib::Response &res) {
804+
res.set_content("Hello World!", "text/plain");
805+
});
806+
807+
svr_.Get("/large", [](const httplib::Request &, httplib::Response &res) {
808+
std::string body(10000, 'X');
809+
res.set_content(body, "text/plain");
810+
});
811+
812+
svr_.Get("/chunked", [](const httplib::Request &, httplib::Response &res) {
813+
res.set_chunked_content_provider(
814+
"text/plain", [](size_t offset, httplib::DataSink &sink) {
815+
if (offset < 3) {
816+
sink.write("chunk", 5);
817+
return true;
818+
}
819+
sink.done();
820+
return true;
821+
});
822+
});
823+
824+
thread_ = std::thread([this]() { svr_.listen("127.0.0.1", 8787); });
825+
svr_.wait_until_ready();
826+
}
827+
828+
void TearDown() override {
829+
svr_.stop();
830+
if (thread_.joinable()) { thread_.join(); }
831+
}
832+
833+
httplib::Server svr_;
834+
std::thread thread_;
835+
};
836+
837+
TEST_F(OpenStreamDirectTest, MethodExists) {
838+
httplib::Client cli("127.0.0.1", 8787);
839+
840+
auto handle = cli.open_stream_direct("/hello");
841+
842+
EXPECT_TRUE(handle.is_valid());
843+
EXPECT_EQ(200, handle.response->status);
844+
}
845+
846+
TEST_F(OpenStreamDirectTest, IsSocketDirectMode) {
847+
httplib::Client cli("127.0.0.1", 8787);
848+
849+
auto handle = cli.open_stream_direct("/hello");
850+
851+
EXPECT_TRUE(handle.is_valid());
852+
EXPECT_TRUE(handle.is_socket_direct_mode());
853+
}
854+
855+
TEST_F(OpenStreamDirectTest, ReadBody) {
856+
httplib::Client cli("127.0.0.1", 8787);
857+
858+
auto handle = cli.open_stream_direct("/hello");
859+
ASSERT_TRUE(handle.is_valid());
860+
861+
auto body = handle.read_all();
862+
EXPECT_EQ("Hello World!", body);
863+
}
864+
865+
TEST_F(OpenStreamDirectTest, ReadBodyInChunks) {
866+
httplib::Client cli("127.0.0.1", 8787);
867+
868+
auto handle = cli.open_stream_direct("/hello");
869+
ASSERT_TRUE(handle.is_valid());
870+
871+
std::string result;
872+
char buf[4];
873+
ssize_t n;
874+
while ((n = handle.read(buf, sizeof(buf))) > 0) {
875+
result.append(buf, static_cast<size_t>(n));
876+
}
877+
878+
EXPECT_EQ("Hello World!", result);
879+
}
880+
881+
TEST_F(OpenStreamDirectTest, LargeResponse) {
882+
httplib::Client cli("127.0.0.1", 8787);
883+
884+
auto handle = cli.open_stream_direct("/large");
885+
ASSERT_TRUE(handle.is_valid());
886+
887+
auto body = handle.read_all();
888+
EXPECT_EQ(10000u, body.size());
889+
EXPECT_EQ(std::string(10000, 'X'), body);
890+
}
891+
892+
TEST_F(OpenStreamDirectTest, ConnectionError) {
893+
httplib::Client cli("127.0.0.1", 9999); // Wrong port
894+
895+
auto handle = cli.open_stream_direct("/hello");
896+
897+
EXPECT_FALSE(handle.is_valid());
898+
EXPECT_NE(httplib::Error::Success, handle.error);
899+
}

0 commit comments

Comments
 (0)