Skip to content

Commit 899da67

Browse files
sky92zwqmatejk
authored andcommitted
fix(Net): reactor-based http server not response when requests with body (#5227)
* fix: reactor-based http server, send error when exp, recv req body * add POST interface in reactor server * fix: exit stuck * fix: reactor-based http server redefination * fix: tsan about socket reactor testConcurrentHandlerRemoval * address review, add unit tests * initialize _stopped, and coding style
1 parent e772a96 commit 899da67

14 files changed

+708
-24
lines changed

Net/include/Poco/Net/HTTPReactorServer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#ifndef Net_HTTPReactorServer_INCLUDED
2+
#define Net_HTTPReactorServer_INCLUDED
3+
14
#include "Poco/Logger.h"
25
#include "Poco/Net/HTTPRequestHandlerFactory.h"
36
#include "Poco/Net/HTTPServerParams.h"
@@ -17,6 +20,7 @@ class Net_API HTTPReactorServer
1720
~HTTPReactorServer();
1821
void start();
1922
void stop();
23+
int port() const { return _tcpReactorServer.port(); }
2024
void onMessage(const TcpReactorConnectionPtr& conn);
2125
void onError(const Poco::Exception& ex);
2226
void sendErrorResponse(HTTPSession& session, HTTPResponse::HTTPStatus status);
@@ -31,3 +35,4 @@ class Net_API HTTPReactorServer
3135

3236
}} // namespace Poco::Net
3337

38+
#endif // Net_HTTPReactorServer_INCLUDED

Net/include/Poco/Net/HTTPReactorServerSession.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class Net_API HTTPReactorServerSession : public HTTPSession
4343

4444
int peek() override;
4545

46+
int read(char* buffer, std::streamsize length) override;
47+
4648
int write(const char* buffer, std::streamsize length) override;
4749

4850
bool parseHeaders(std::size_t pos, std::size_t& bodyStart, std::size_t& contentLength, bool& isChunked);

Net/include/Poco/Net/TCPReactorAcceptor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class Net_API TCPReactorAcceptor : public Poco::Net::SocketAcceptor<TCPReactorSe
2222

2323
~TCPReactorAcceptor();
2424
SocketReactor& reactor();
25+
void stop();
2526

2627
void setRecvMessageCallback(const RecvMessageCallback& cb)
2728
{
@@ -40,6 +41,7 @@ class Net_API TCPReactorAcceptor : public Poco::Net::SocketAcceptor<TCPReactorSe
4041
std::shared_ptr<ThreadPool> _threadPool;
4142
RecvMessageCallback _recvMessageCallback;
4243
TCPServerParams::Ptr _pParams;
44+
std::atomic<bool> _stopped{false};
4345
};
4446

4547
}} // namespace Poco::Net

Net/include/Poco/Net/TCPReactorServer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "Poco/Net/TCPReactorAcceptor.h"
77
#include "Poco/Net/TCPServerParams.h"
88
#include "Poco/ThreadPool.h"
9+
#include <atomic>
910
#include <vector>
1011

1112
namespace Poco { namespace Net {
@@ -32,6 +33,8 @@ class Net_API TCPReactorServer
3233

3334
void stop();
3435

36+
int port() const { return _port; }
37+
3538
void setRecvMessageCallback(const RecvMessageCallback& cb);
3639

3740
private:
@@ -41,6 +44,7 @@ class Net_API TCPReactorServer
4144
std::vector<ServerSocket> _sockets;
4245
TCPServerParams::Ptr _pParams;
4346
int _port;
47+
std::atomic<bool> _stopped;
4448
};
4549

4650
}} // namespace Poco::Net

Net/samples/HTTPReactorTimeServer/src/HTTPReactorTimeServer.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
#include "Poco/DateTimeFormat.h"
2525
#include "Poco/Exception.h"
2626
#include "Poco/ThreadPool.h"
27+
#include "Poco/StreamCopier.h"
2728
#include "Poco/Util/ServerApplication.h"
2829
#include "Poco/Util/Option.h"
2930
#include "Poco/Util/OptionSet.h"
3031
#include "Poco/Util/HelpFormatter.h"
3132
#include <cstdio>
3233
#include <iostream>
34+
#include <sstream>
3335

3436
#include "Poco/Net/HTTPReactorServer.h"
3537

@@ -89,6 +91,36 @@ class TimeRequestHandler: public HTTPRequestHandler
8991
};
9092

9193

94+
class BodyEchoRequestHandler: public HTTPRequestHandler
95+
/// Echo the request body back to the client.
96+
{
97+
public:
98+
BodyEchoRequestHandler() = default;
99+
100+
void handleRequest(HTTPServerRequest& request, HTTPServerResponse& response)
101+
{
102+
Application& app = Application::instance();
103+
app.logger().information("Request from " + request.clientAddress().toString());
104+
105+
app.logger().information("Method: " + request.getMethod() + ", URI: " + request.getURI());
106+
107+
std::string contentType = request.get("Content-Type", "text/plain");
108+
app.logger().information("Content-Type: " + contentType);
109+
110+
std::ostringstream ostrBody;
111+
Poco::StreamCopier::copyStream(request.stream(), ostrBody);
112+
std::string body = ostrBody.str();
113+
app.logger().information("Request body length: " + std::to_string(body.length()));
114+
115+
response.setChunkedTransferEncoding(true);
116+
response.setContentType(contentType);
117+
118+
std::ostream& ostr = response.send();
119+
ostr << body;
120+
}
121+
};
122+
123+
92124
class TimeRequestHandlerFactory: public HTTPRequestHandlerFactory
93125
{
94126
public:
@@ -100,6 +132,8 @@ class TimeRequestHandlerFactory: public HTTPRequestHandlerFactory
100132
{
101133
if (request.getURI() == "/")
102134
return new TimeRequestHandler(_format, _delay);
135+
else if (request.getURI() == "/echo" || request.getURI() == "/body")
136+
return new BodyEchoRequestHandler();
103137
else
104138
return nullptr;
105139
}

Net/src/HTTPReactorServer.cpp

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,19 @@
44
#include "Poco/Net/HTTPSession.h"
55
#include <cstring>
66

7-
namespace Poco { namespace Net {
7+
namespace Poco {
8+
namespace Net {
89

910
HTTPReactorServer::HTTPReactorServer(int port, HTTPServerParams::Ptr pParams, HTTPRequestHandlerFactory::Ptr pFactory)
1011
: _tcpReactorServer(port, pParams)
1112
{
1213
_pParams = pParams;
1314
_pFactory = pFactory;
14-
_tcpReactorServer.setRecvMessageCallback([this](const TcpReactorConnectionPtr& conn) {
15-
// Handle incoming message
16-
this->onMessage(conn);
17-
});
15+
_tcpReactorServer.setRecvMessageCallback(
16+
[this](const TcpReactorConnectionPtr& conn)
17+
{
18+
this->onMessage(conn);
19+
});
1820
}
1921

2022
HTTPReactorServer::~HTTPReactorServer()
@@ -35,16 +37,15 @@ void HTTPReactorServer::onMessage(const TcpReactorConnectionPtr& conn)
3537
{
3638
try
3739
{
38-
// Handle read event
3940
HTTPReactorServerSession session(conn->socket(), conn->buffer(), _pParams);
4041
if (!session.checkRequestComplete())
4142
{
4243
return;
4344
}
44-
// Create request and response objects
45+
4546
HTTPServerResponseImpl response(session);
46-
HTTPServerRequestImpl request(response, session, _pParams);
47-
// Process request and generate response
47+
HTTPServerRequestImpl request(response, session, _pParams);
48+
4849
Poco::Timestamp now;
4950
response.setDate(now);
5051
response.setVersion(request.getVersion());
@@ -54,6 +55,7 @@ void HTTPReactorServer::onMessage(const TcpReactorConnectionPtr& conn)
5455
{
5556
response.set("Server", server);
5657
}
58+
5759
try
5860
{
5961
session.requestTrailer().clear();
@@ -66,19 +68,23 @@ void HTTPReactorServer::onMessage(const TcpReactorConnectionPtr& conn)
6668

6769
pHandler->handleRequest(request, response);
6870
session.setKeepAlive(_pParams->getKeepAlive() && response.getKeepAlive());
71+
session.popCompletedRequest();
6972
}
7073
else
74+
{
7175
sendErrorResponse(session, HTTPResponse::HTTP_NOT_IMPLEMENTED);
76+
session.popCompletedRequest();
77+
}
7278
}
7379
catch (Poco::Exception& e)
7480
{
7581
if (!response.sent())
7682
{
7783
try
7884
{
79-
sendErrorResponse(
80-
session,
81-
e.code() == 0 ? HTTPResponse::HTTP_INTERNAL_SERVER_ERROR : HTTPResponse::HTTPStatus(e.code()));
85+
sendErrorResponse(session, e.code() == 0 ? HTTPResponse::HTTP_INTERNAL_SERVER_ERROR
86+
: HTTPResponse::HTTPStatus(e.code()));
87+
session.popCompletedRequest();
8288
}
8389
catch (...)
8490
{
@@ -99,13 +105,14 @@ void HTTPReactorServer::sendErrorResponse(HTTPSession& session, HTTPResponse::HT
99105
response.setVersion(HTTPMessage::HTTP_1_1);
100106
response.setStatusAndReason(status);
101107
response.setKeepAlive(false);
108+
response.setContentLength(0);
109+
response.send();
102110

103111
session.setKeepAlive(false);
104112
}
105113

106114
void HTTPReactorServer::onError(const Poco::Exception& ex)
107115
{
108-
// Handle error
109116
throw ex;
110117
}
111118

Net/src/HTTPReactorServerSession.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,19 @@ int HTTPReactorServerSession::peek()
209209
}
210210
}
211211

212+
int HTTPReactorServerSession::read(char* buffer, std::streamsize length)
213+
{
214+
if (_idx < _complete)
215+
{
216+
int n = static_cast<int>(_complete - _idx);
217+
if (n > static_cast<int>(length)) n = static_cast<int>(length);
218+
std::memcpy(buffer, _buf.data() + _idx, n);
219+
_idx += n;
220+
return n;
221+
}
222+
return 0;
223+
}
224+
212225
int HTTPReactorServerSession::write(const char* buffer, std::streamsize length)
213226
{
214227
try

Net/src/TCPReactorAcceptor.cpp

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ namespace Poco {
66
namespace Net {
77

88

9-
TCPReactorAcceptor::TCPReactorAcceptor(
10-
Poco::Net::ServerSocket& socket, Poco::Net::SocketReactor& reactor, TCPServerParams::Ptr pParams)
9+
TCPReactorAcceptor::TCPReactorAcceptor(Poco::Net::ServerSocket& socket, Poco::Net::SocketReactor& reactor,
10+
TCPServerParams::Ptr pParams)
1111
: Poco::Net::SocketAcceptor<TCPReactorServerConnection>(socket, reactor),
1212
_selfReactor(reactor),
1313
_useSelfReactor(pParams->getUseSelfReactor()),
@@ -16,18 +16,39 @@ TCPReactorAcceptor::TCPReactorAcceptor(
1616
int workerThreads = _useSelfReactor ? 0 : _pParams->getMaxThreads();
1717
if (workerThreads > 0)
1818
{
19-
_threadPool = std::make_shared<Poco::ThreadPool>("TCPRA", workerThreads, workerThreads);
19+
_threadPool = std::make_shared<Poco::ThreadPool>("TCPRW", workerThreads, workerThreads);
20+
for (int i = 0; i < workerThreads; i++)
21+
{
22+
std::shared_ptr<SocketReactor> workerReactor(std::make_shared<SocketReactor>());
23+
_wokerReactors.push_back(workerReactor);
24+
_threadPool->start(*workerReactor);
25+
}
2026
}
21-
for (int i = 0; i < workerThreads; i++)
27+
else
2228
{
23-
std::shared_ptr<SocketReactor> workerReactor(std::make_shared<SocketReactor>());
24-
_wokerReactors.push_back(workerReactor);
25-
_threadPool->start(*workerReactor);
29+
_useSelfReactor = true;
2630
}
2731
}
2832

2933
TCPReactorAcceptor::~TCPReactorAcceptor()
3034
{
35+
stop();
36+
}
37+
38+
void TCPReactorAcceptor::stop()
39+
{
40+
if (_stopped.exchange(true))
41+
{
42+
return;
43+
}
44+
for (auto& worker : _wokerReactors)
45+
{
46+
worker->stop();
47+
}
48+
if (_threadPool)
49+
{
50+
_threadPool->joinAll();
51+
}
3152
}
3253

3354
SocketReactor& TCPReactorAcceptor::reactor()

Net/src/TCPReactorServer.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@ namespace Net {
99

1010

1111
TCPReactorServer::TCPReactorServer(int port, TCPServerParams::Ptr pParams)
12-
: _threadPool("TCPR", pParams->getAcceptorNum()),
12+
: _threadPool("TCPRA", pParams->getAcceptorNum()),
1313
_reactors(pParams->getAcceptorNum()),
1414
_pParams(pParams),
15-
_port(port)
15+
_port(port),
16+
_stopped(false)
1617
{
1718
for (auto& reactor : _reactors)
1819
{
1920
ServerSocket socket(_port);
2021
_sockets.push_back(socket);
22+
if (_sockets.size() == 1)
23+
{
24+
_port = socket.address().port();
25+
}
2126
auto acceptor = std::make_shared<TCPReactorAcceptor>(socket, reactor, _pParams);
2227
_acceptors.push_back(acceptor);
2328
}
@@ -46,6 +51,14 @@ void TCPReactorServer::setRecvMessageCallback(const RecvMessageCallback& cb)
4651

4752
void TCPReactorServer::stop()
4853
{
54+
if (_stopped.exchange(true))
55+
{
56+
return;
57+
}
58+
for (auto& acceptor : _acceptors)
59+
{
60+
acceptor->stop();
61+
}
4962
for (auto& reactor : _reactors)
5063
{
5164
reactor.stop();

Net/testsuite/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ objects = \
1515
HTTPRequestTest MessageHeaderTest NetTestSuite UDPEchoServer \
1616
HTTPResponseTest MessagesTestSuite NetworkInterfaceTest \
1717
HTTPServerTest MulticastEchoServer SocketAddressTest \
18-
HTTPReactorServerSessionTest HTTPReactorServerTestSuite \
18+
HTTPReactorServerSessionTest HTTPReactorServerTest HTTPReactorServerTestSuite \
1919
HTTPCookieTest HTTPCredentialsTest HTMLFormTest HTMLTestSuite \
2020
MediaTypeTest QuotedPrintableTest DialogSocketTest \
2121
HTTPClientTestSuite FTPClientTestSuite FTPClientSessionTest \

0 commit comments

Comments
 (0)