Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions httplib.h
Original file line number Diff line number Diff line change
Expand Up @@ -7333,6 +7333,16 @@ inline ClientImpl::ClientImpl(const std::string &host, int port,
client_cert_path_(client_cert_path), client_key_path_(client_key_path) {}

inline ClientImpl::~ClientImpl() {
// Wait until all the requests in flight are handled.
size_t retry_count = 10;
while (retry_count-- > 0) {
{
std::lock_guard<std::mutex> guard(socket_mutex_);
if (socket_requests_in_flight_ == 0) { break; }
}
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}

std::lock_guard<std::mutex> guard(socket_mutex_);
shutdown_socket(socket_);
close_socket(socket_);
Expand Down
83 changes: 83 additions & 0 deletions test/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8351,3 +8351,86 @@ TEST(MaxTimeoutTest, ContentStreamSSL) {
max_timeout_test(svr, cli, timeout, threshold);
}
#endif

class EventDispatcher {
public:
EventDispatcher() {}

void wait_event(DataSink *sink) {
unique_lock<mutex> lk(m_);
int id = id_;
cv_.wait(lk, [&] { return cid_ == id; });
sink->write(message_.data(), message_.size());
}

void send_event(const string &message) {
lock_guard<mutex> lk(m_);
cid_ = id_++;
message_ = message;
cv_.notify_all();
}

private:
mutex m_;
condition_variable cv_;
atomic_int id_{0};
atomic_int cid_{-1};
string message_;
};

TEST(ClientInThreadTest, Issue2068) {
EventDispatcher ed;

Server svr;
svr.Get("/event1", [&](const Request & /*req*/, Response &res) {
res.set_chunked_content_provider("text/event-stream",
[&](size_t /*offset*/, DataSink &sink) {
ed.wait_event(&sink);
return true;
});
});

auto listen_thread = std::thread([&svr]() { svr.listen(HOST, PORT); });

svr.wait_until_ready();

thread event_thread([&] {
int id = 0;
while (svr.is_running()) {
this_thread::sleep_for(chrono::milliseconds(500));

std::stringstream ss;
ss << "data: " << id << "\n\n";
ed.send_event(ss.str());
id++;
}
});

auto se = detail::scope_exit([&] {
svr.stop();

listen_thread.join();
event_thread.join();

ASSERT_FALSE(svr.is_running());
});

{
auto client = detail::make_unique<Client>(HOST, PORT);
client->set_read_timeout(std::chrono::minutes(10));

std::atomic<bool> stop{false};

std::thread t([&] {
client->Get("/event1",
[&](const char *, size_t) -> bool { return !stop; });
});

std::this_thread::sleep_for(std::chrono::seconds(2));
stop = true;
client->stop();
client.reset();

t.join();
}
}
Loading