Skip to content

Commit ddaa3da

Browse files
Cornel Ratmeta-codesync[bot]
authored andcommitted
Add ZeroCopyEnableFunc support to coro HTTPServer
Summary: Add generic zero copy (MSG_ZEROCOPY) write support to the proxygen coro HTTPServer infrastructure. - Add `zeroCopyEnableFunc` field to `HTTPServer::Config` and `socketConfig.useZeroCopy` to enable `SO_ZEROCOPY` on the server socket. - Add `setZeroCopyEnableFunc` to `HTTPCoroAcceptor` so that accepted transports are configured with zero copy and the per-write enable callback. - Add `//folly/io/async:async_transport` dependency for the `AsyncWriter::ZeroCopyEnableFunc` type. Reviewed By: dmm-fb Differential Revision: D95869208 fbshipit-source-id: b28a8ba26b28b329ac559685a0ee8555f5c6c5e9
1 parent cddc940 commit ddaa3da

File tree

5 files changed

+214
-6
lines changed

5 files changed

+214
-6
lines changed

proxygen/lib/http/coro/server/HTTPCoroAcceptor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ void HTTPCoroAcceptor::onNewConnection(
159159
}
160160

161161
auto eventBase = transport->getEventBase();
162+
163+
if (zeroCopyEnableThreshold_ > 0) {
164+
transport->setZeroCopy(true);
165+
transport->setZeroCopyEnableThreshold(zeroCopyEnableThreshold_);
166+
}
167+
162168
auto session = factory_.makeUniplexSession(std::move(transport),
163169
peerAddress,
164170
nextProtocol,

proxygen/lib/http/coro/server/HTTPCoroAcceptor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ class HTTPCoroAcceptor : public wangle::Acceptor {
121121
onConnectionsDrainedFn_ = std::move(fn);
122122
}
123123

124+
void setZeroCopyEnableThreshold(size_t threshold) {
125+
zeroCopyEnableThreshold_ = threshold;
126+
}
127+
124128
void stopAcceptingQuic() {
125129
acceptStopped();
126130
}
@@ -167,6 +171,7 @@ class HTTPCoroAcceptor : public wangle::Acceptor {
167171
std::function<void()> onConnectionsDrainedFn_;
168172
folly::Synchronized<folly::Executor::KeepAlive<folly::EventBase>> keepAlive_;
169173
std::shared_ptr<wangle::FizzLoggingCallback> loggingCallback_;
174+
size_t zeroCopyEnableThreshold_{0};
170175
};
171176

172177
} // namespace proxygen::coro

proxygen/lib/http/coro/server/HTTPServer.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,16 @@ HTTPCoroAcceptor* HTTPServer::createAcceptor(
174174
folly::EventBase* evb,
175175
std::shared_ptr<const AcceptorConfiguration> acceptorConfig) {
176176
auto [it, _] = acceptors_.try_emplace(evb, std::list<HTTPCoroAcceptor>());
177-
return &(*it->second.emplace(it->second.end(),
178-
std::move(acceptorConfig),
179-
handler_,
180-
&config_.newConnectionFilter,
181-
/* codecFactory = */ nullptr,
182-
config_.fizzLoggingCallback));
177+
auto* acceptor = &(*it->second.emplace(it->second.end(),
178+
std::move(acceptorConfig),
179+
handler_,
180+
&config_.newConnectionFilter,
181+
/* codecFactory = */ nullptr,
182+
config_.fizzLoggingCallback));
183+
if (config_.zeroCopyEnableThreshold > 0) {
184+
acceptor->setZeroCopyEnableThreshold(config_.zeroCopyEnableThreshold);
185+
}
186+
return acceptor;
183187
}
184188

185189
HTTPCoroAcceptor* FOLLY_NULLABLE
@@ -206,6 +210,9 @@ void HTTPServer::startTcp(const KeepAliveEventBaseVec& keepAliveEvbs) {
206210
new folly::AsyncServerSocket(&eventBase_));
207211
try {
208212
serverSocket->setReusePortEnabled(setReusePortSocketOption_);
213+
if (config_.socketConfig.useZeroCopy) {
214+
serverSocket->setZeroCopy(true);
215+
}
209216
if (config_.preboundSocket.has_value()) {
210217
serverSocket->useExistingSocket(
211218
folly::NetworkSocket::fromFd(config_.preboundSocket.value()));

proxygen/lib/http/coro/server/HTTPServer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ class HTTPServer : public quic::QuicHandshakeSocketHolder::Callback {
7575
NewConnectionFilter newConnectionFilter;
7676
ServerFilterFactoryList filterFactories;
7777
std::shared_ptr<wangle::FizzLoggingCallback> fizzLoggingCallback;
78+
79+
/**
80+
* Zero copy enable threshold. When set to a non-zero value (along with
81+
* socketConfig.useZeroCopy = true), each accepted TCP connection will
82+
* have zero copy enabled and MSG_ZEROCOPY will be used for writes
83+
* whose total length meets or exceeds this threshold.
84+
*/
85+
size_t zeroCopyEnableThreshold{0};
7886
};
7987

8088
static wangle::SSLContextConfig getDefaultTLSConfig() {

proxygen/lib/http/coro/server/test/HTTPServerTest.cpp

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222

2323
#include <chrono>
2424
#include <folly/coro/GtestHelpers.h>
25+
#include <folly/io/async/ScopedEventBaseThread.h>
2526
#include <folly/portability/GMock.h>
2627
#include <folly/portability/GTest.h>
28+
#include <folly/portability/Sockets.h>
2729
#include <folly/testing/TestUtil.h>
2830
#include <proxygen/lib/http/coro/test/TestUtils.h>
2931
#include <quic/api/test/Mocks.h>
@@ -728,4 +730,184 @@ TEST(StatsFilterFactory, LatencyOnDestruction) {
728730
EXPECT_EQ(stats.latencies.at(0).count(), 0);
729731
}
730732

733+
class ZeroCopyHTTPServerTest : public HTTPServerTests {};
734+
735+
// Verify that configuring zero copy on the server does not break normal
736+
// request handling. This exercises the full path: Config -> startTcp() ->
737+
// createAcceptor() -> setZeroCopyEnableThreshold -> onNewConnection() ->
738+
// transport zero copy setup.
739+
TEST_P(ZeroCopyHTTPServerTest, TestZeroCopyConfigBasicRequest) {
740+
serverConfig_.socketConfig.useZeroCopy = true;
741+
serverConfig_.zeroCopyEnableThreshold = 32768;
742+
743+
startServer(nullptr, /*expectRequest=*/true);
744+
initClient();
745+
746+
auto url = fmt::format("https://{}/test", server_->address()->describe());
747+
auto useQuic = GetParam() == TransportType::QUIC;
748+
EventBase evb;
749+
auto response = blockingWait(
750+
HTTPClient::get(&evb, url, std::chrono::milliseconds(500), useQuic),
751+
&evb);
752+
EXPECT_NE(response.headers.get(), nullptr);
753+
EXPECT_EQ(response.headers->getStatusCode(), 200);
754+
stopServer();
755+
}
756+
757+
// Verify that only socketConfig.useZeroCopy (without zeroCopyEnableThreshold)
758+
// also works. The server socket gets SO_ZEROCOPY but accepted connections
759+
// won't have the per-write threshold.
760+
TEST_P(ZeroCopyHTTPServerTest, TestZeroCopySocketOnlyBasicRequest) {
761+
serverConfig_.socketConfig.useZeroCopy = true;
762+
// No zeroCopyEnableThreshold set
763+
764+
startServer(nullptr, /*expectRequest=*/true);
765+
initClient();
766+
767+
auto url = fmt::format("https://{}/test", server_->address()->describe());
768+
auto useQuic = GetParam() == TransportType::QUIC;
769+
EventBase evb;
770+
auto response = blockingWait(
771+
HTTPClient::get(&evb, url, std::chrono::milliseconds(500), useQuic),
772+
&evb);
773+
EXPECT_NE(response.headers.get(), nullptr);
774+
EXPECT_EQ(response.headers->getStatusCode(), 200);
775+
stopServer();
776+
}
777+
778+
INSTANTIATE_TEST_SUITE_P(ZeroCopy,
779+
ZeroCopyHTTPServerTest,
780+
testing::Values(TransportType::QUIC,
781+
TransportType::TLS),
782+
transportTypeToString);
783+
784+
namespace {
785+
786+
// Creates a pair of connected TCP sockets (server-side, client-side).
787+
std::pair<folly::NetworkSocket, folly::NetworkSocket> createTcpSocketPair() {
788+
int listenFd = ::socket(AF_INET, SOCK_STREAM, 0);
789+
CHECK_GE(listenFd, 0);
790+
791+
struct sockaddr_in addr{};
792+
addr.sin_family = AF_INET;
793+
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
794+
addr.sin_port = 0;
795+
CHECK_EQ(::bind(listenFd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)),
796+
0);
797+
CHECK_EQ(::listen(listenFd, 1), 0);
798+
799+
socklen_t addrLen = sizeof(addr);
800+
CHECK_EQ(
801+
::getsockname(listenFd, reinterpret_cast<sockaddr*>(&addr), &addrLen), 0);
802+
803+
int clientFd = ::socket(AF_INET, SOCK_STREAM, 0);
804+
CHECK_GE(clientFd, 0);
805+
CHECK_EQ(
806+
::connect(clientFd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)), 0);
807+
808+
int serverFd = ::accept(listenFd, nullptr, nullptr);
809+
CHECK_GE(serverFd, 0);
810+
::close(listenFd);
811+
812+
return {folly::NetworkSocket::fromFd(serverFd),
813+
folly::NetworkSocket::fromFd(clientFd)};
814+
}
815+
816+
// Exposes the protected TCP onNewConnection for direct unit testing.
817+
class TestableHTTPCoroAcceptor : public HTTPCoroAcceptor {
818+
public:
819+
using HTTPCoroAcceptor::HTTPCoroAcceptor;
820+
using HTTPCoroAcceptor::onNewConnection;
821+
};
822+
823+
} // namespace
824+
825+
// Verify that zero copy is applied to accepted TCP connections when the
826+
// zeroCopyEnableThreshold is set on the acceptor.
827+
TEST(ZeroCopyAcceptor, ZeroCopyAppliedToAcceptedTransport) {
828+
auto [serverFd, clientFd] = createTcpSocketPair();
829+
830+
auto accConfig = std::make_shared<AcceptorConfiguration>();
831+
accConfig->plaintextProtocol = "http/1.1";
832+
auto handler = std::make_shared<TestHandler>();
833+
834+
folly::ScopedEventBaseThread evbThread;
835+
auto* evb = evbThread.getEventBase();
836+
837+
auto acceptor =
838+
std::make_unique<TestableHTTPCoroAcceptor>(accConfig, handler);
839+
evb->runInEventBaseThreadAndWait([&]() { acceptor->init(nullptr, evb); });
840+
acceptor->setZeroCopyEnableThreshold(32768);
841+
842+
std::atomic<bool> zeroCopyApplied{false};
843+
844+
evb->runInEventBaseThreadAndWait([&]() {
845+
auto socket = folly::AsyncSocket::newSocket(evb, serverFd);
846+
auto* rawSocket = socket.get();
847+
848+
folly::SocketAddress addr;
849+
wangle::TransportInfo tinfo;
850+
acceptor->onNewConnection(
851+
folly::AsyncTransport::UniquePtr(socket.release()),
852+
&addr,
853+
"http/1.1",
854+
wangle::SecureTransportType::NONE,
855+
tinfo);
856+
857+
zeroCopyApplied = rawSocket->getZeroCopy();
858+
});
859+
860+
EXPECT_TRUE(zeroCopyApplied.load());
861+
862+
::close(clientFd.toFd());
863+
// forceStop defers dropAllConnections via runInLoop, so we must let the
864+
// EventBase process that callback before destroying the acceptor.
865+
evb->runInEventBaseThreadAndWait([&]() { acceptor->forceStop(); });
866+
evb->runInEventBaseThreadAndWait([&]() { acceptor.reset(); });
867+
}
868+
869+
// Verify that zero copy is NOT applied when zeroCopyEnableThreshold is not set.
870+
TEST(ZeroCopyAcceptor, ZeroCopyNotAppliedWithoutThreshold) {
871+
auto [serverFd, clientFd] = createTcpSocketPair();
872+
873+
auto accConfig = std::make_shared<AcceptorConfiguration>();
874+
accConfig->plaintextProtocol = "http/1.1";
875+
auto handler = std::make_shared<TestHandler>();
876+
877+
folly::ScopedEventBaseThread evbThread;
878+
auto* evb = evbThread.getEventBase();
879+
880+
auto acceptor =
881+
std::make_unique<TestableHTTPCoroAcceptor>(accConfig, handler);
882+
evb->runInEventBaseThreadAndWait([&]() { acceptor->init(nullptr, evb); });
883+
// Deliberately NOT setting zeroCopyEnableThreshold
884+
885+
std::atomic<bool> zeroCopyApplied{false};
886+
887+
evb->runInEventBaseThreadAndWait([&]() {
888+
auto socket = folly::AsyncSocket::newSocket(evb, serverFd);
889+
auto* rawSocket = socket.get();
890+
891+
folly::SocketAddress addr;
892+
wangle::TransportInfo tinfo;
893+
acceptor->onNewConnection(
894+
folly::AsyncTransport::UniquePtr(socket.release()),
895+
&addr,
896+
"http/1.1",
897+
wangle::SecureTransportType::NONE,
898+
tinfo);
899+
900+
zeroCopyApplied = rawSocket->getZeroCopy();
901+
});
902+
903+
// Without zeroCopyEnableThreshold, zero copy should remain disabled.
904+
EXPECT_FALSE(zeroCopyApplied.load());
905+
906+
::close(clientFd.toFd());
907+
// forceStop defers dropAllConnections via runInLoop, so we must let the
908+
// EventBase process that callback before destroying the acceptor.
909+
evb->runInEventBaseThreadAndWait([&]() { acceptor->forceStop(); });
910+
evb->runInEventBaseThreadAndWait([&]() { acceptor.reset(); });
911+
}
912+
731913
} // namespace proxygen::coro::test

0 commit comments

Comments
 (0)