Skip to content

Commit e6244f2

Browse files
committed
Add in-place compression
1 parent ce73ba7 commit e6244f2

File tree

1 file changed

+96
-21
lines changed

1 file changed

+96
-21
lines changed

ext/src/http/client/curl/http_client_curl.cc

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <curl/curl.h>
55
#include <curl/curlver.h>
6+
#include <array>
67
#include <atomic>
78
#include <chrono>
89
#include <cstddef>
@@ -57,6 +58,78 @@ nostd::shared_ptr<HttpCurlGlobalInitializer> HttpCurlGlobalInitializer::GetInsta
5758
return shared_initializer;
5859
}
5960

61+
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
62+
int deflateInPlace(z_stream *strm, unsigned char *buf, uint32_t len, uint32_t *max)
63+
{
64+
// must be large enough to hold zlib or gzip header (if any) and one more byte -- 11 works for the
65+
// worst case here, but if gzip encoding is used and a deflateSetHeader() call is inserted in this
66+
// code after the deflateReset(), then the 11 needs to be increased to accomodate the resulting
67+
// gzip header size plus one
68+
std::array<unsigned char, 11> temp{};
69+
70+
// kick start the process with a temporary output buffer -- this allows deflate to consume a large
71+
// chunk of input data in order to make room for output data there
72+
strm->next_in = buf;
73+
strm->avail_in = len;
74+
if (*max < len)
75+
{
76+
*max = len;
77+
}
78+
strm->next_out = temp.data();
79+
strm->avail_out = std::min(static_cast<decltype(z_stream::avail_out)>(temp.size()), *max);
80+
auto ret = deflate(strm, Z_FINISH);
81+
if (ret == Z_STREAM_ERROR)
82+
{
83+
return ret;
84+
}
85+
86+
// if we can, copy the temporary output data to the consumed portion of the input buffer, and then
87+
// continue to write up to the start of the consumed input for as long as possible
88+
auto have = strm->next_out - temp.data(); // number of bytes in temp[]
89+
if (have <= (strm->avail_in ? len - strm->avail_in : *max))
90+
{
91+
std::memcpy(buf, temp.data(), have);
92+
strm->next_out = buf + have;
93+
have = 0;
94+
while (ret == Z_OK)
95+
{
96+
strm->avail_out =
97+
strm->avail_in ? strm->next_in - strm->next_out : (buf + *max) - strm->next_out;
98+
ret = deflate(strm, Z_FINISH);
99+
}
100+
if (ret != Z_BUF_ERROR || strm->avail_in == 0)
101+
{
102+
*max = strm->next_out - buf;
103+
return ret == Z_STREAM_END ? Z_OK : ret;
104+
}
105+
}
106+
107+
// the output caught up with the input due to insufficiently compressible data -- copy the
108+
// remaining input data into an allocated buffer and complete the compression from there to the
109+
// now empty input buffer (this will only occur for long incompressible streams, more than ~20 MB
110+
// for the default deflate memLevel of 8, or when *max is too small and less than the length of
111+
// the header plus one byte)
112+
auto hold = static_cast<std::remove_const_t<decltype(z_stream::next_in)>>(
113+
strm->zalloc(strm->opaque, strm->avail_in, 1)); // allocated buffer to hold input data
114+
if (hold == Z_NULL)
115+
{
116+
return Z_MEM_ERROR;
117+
}
118+
std::memcpy(hold, strm->next_in, strm->avail_in);
119+
strm->next_in = hold;
120+
if (have)
121+
{
122+
std::memcpy(buf, temp.data(), have);
123+
strm->next_out = buf + have;
124+
}
125+
strm->avail_out = (buf + *max) - strm->next_out;
126+
ret = deflate(strm, Z_FINISH);
127+
strm->zfree(strm->opaque, hold);
128+
*max = strm->next_out - buf;
129+
return ret == Z_OK ? Z_BUF_ERROR : (ret == Z_STREAM_END ? Z_OK : ret);
130+
}
131+
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
132+
60133
void Session::SendRequest(
61134
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept
62135
{
@@ -76,45 +149,47 @@ void Session::SendRequest(
76149
if (http_request_->compression_ == opentelemetry::ext::http::client::Compression::kGzip)
77150
{
78151
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
79-
http_request_->AddHeader("Content-Encoding", "gzip");
80-
81-
opentelemetry::ext::http::client::Body compressed_body(http_request_->body_.size());
82-
z_stream zs;
83-
zs.zalloc = Z_NULL;
84-
zs.zfree = Z_NULL;
85-
zs.opaque = Z_NULL;
86-
zs.avail_in = static_cast<uInt>(http_request_->body_.size());
87-
zs.next_in = http_request_->body_.data();
88-
zs.avail_out = static_cast<uInt>(compressed_body.size());
89-
zs.next_out = compressed_body.data();
152+
z_stream zs{};
153+
zs.zalloc = Z_NULL;
154+
zs.zfree = Z_NULL;
155+
zs.opaque = Z_NULL;
90156

91157
// ZLIB: Have to maually specify 16 bits for the Gzip headers
92158
static constexpr int kWindowBits = MAX_WBITS + 16;
93159
static constexpr int kMemLevel = MAX_MEM_LEVEL;
94160

95-
int stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel,
96-
Z_DEFAULT_STRATEGY);
161+
auto stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel,
162+
Z_DEFAULT_STRATEGY);
97163

98164
if (stream == Z_OK)
99165
{
100-
deflate(&zs, Z_FINISH);
101-
deflateEnd(&zs);
102-
compressed_body.resize(zs.total_out);
103-
http_request_->SetBody(compressed_body);
166+
auto size = static_cast<uInt>(http_request_->body_.size());
167+
auto max = size;
168+
stream = deflateInPlace(&zs, http_request_->body_.data(), size, &max);
169+
170+
if (stream == Z_OK)
171+
{
172+
http_request_->AddHeader("Content-Encoding", "gzip");
173+
http_request_->body_.resize(max);
174+
}
104175
}
105-
else
176+
177+
if (stream != Z_OK)
106178
{
107179
if (callback)
108180
{
109-
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, "");
181+
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed,
182+
zs.msg ? zs.msg : "");
110183
}
111184
is_session_active_.store(false, std::memory_order_release);
112185
}
186+
187+
deflateEnd(&zs);
113188
#else
114189
OTEL_INTERNAL_LOG_ERROR(
115190
"[HTTP Client Curl] Set WITH_OTLP_HTTP_COMPRESSION=ON to use gzip compression with the "
116191
"OTLP HTTP Exporter");
117-
#endif
192+
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
118193
}
119194

120195
curl_operation_.reset(new HttpOperation(
@@ -224,7 +299,7 @@ HttpClient::~HttpClient()
224299
std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSession(
225300
nostd::string_view url) noexcept
226301
{
227-
const auto& parsedUrl = common::UrlParser(std::string(url));
302+
const auto &parsedUrl = common::UrlParser(std::string(url));
228303
if (!parsedUrl.success_)
229304
{
230305
return std::make_shared<Session>(*this);

0 commit comments

Comments
 (0)