Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ Increment the:
* [SDK] Do not frequently create and destroy http client threads
[#3198](https://github.com/open-telemetry/opentelemetry-cpp/pull/3198)

* [SDK] Fix instrumentation scope attributes evaluated in equal method
[#3214](https://github.com/open-telemetry/opentelemetry-cpp/pull/3214)

## [1.18 2024-11-25]

* [EXPORTER] Fix crash in ElasticsearchLogRecordExporter
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ OtlpHttpClient::createSession(
// Parse uri and store it to cache
if (http_uri_.empty())
{
auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url));
const auto parse_url = opentelemetry::ext::http::common::UrlParser(options_.url);
if (!parse_url.success_)
{
std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url;
Expand Down
128 changes: 104 additions & 24 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

#include <curl/curl.h>
#include <curl/curlver.h>
#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <list>
#include <mutex>
#include <string>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -57,11 +60,85 @@ nostd::shared_ptr<HttpCurlGlobalInitializer> HttpCurlGlobalInitializer::GetInsta
return shared_initializer;
}

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
// Original source:
// https://stackoverflow.com/questions/12398377/is-it-possible-to-have-zlib-read-from-and-write-to-the-same-memory-buffer/12412863#12412863
int deflateInPlace(z_stream *strm, unsigned char *buf, uint32_t len, uint32_t *max_len)
{
// must be large enough to hold zlib or gzip header (if any) and one more byte -- 11 works for the
// worst case here, but if gzip encoding is used and a deflateSetHeader() call is inserted in this
// code after the deflateReset(), then the 11 needs to be increased to accommodate the resulting
// gzip header size plus one
std::array<unsigned char, 11> temp{};

// kick start the process with a temporary output buffer -- this allows deflate to consume a large
// chunk of input data in order to make room for output data there
strm->next_in = buf;
strm->avail_in = len;
if (*max_len < len)
{
*max_len = len;
}
strm->next_out = temp.data();
strm->avail_out = (std::min)(static_cast<decltype(z_stream::avail_out)>(temp.size()), *max_len);
auto ret = deflate(strm, Z_FINISH);
if (ret == Z_STREAM_ERROR)
{
return ret;
}

// if we can, copy the temporary output data to the consumed portion of the input buffer, and then
// continue to write up to the start of the consumed input for as long as possible
auto have = strm->next_out - temp.data(); // number of bytes in temp[]
if (have <= static_cast<decltype(have)>(strm->avail_in ? len - strm->avail_in : *max_len))
{
std::memcpy(buf, temp.data(), have);
strm->next_out = buf + have;
have = 0;
while (ret == Z_OK)
{
strm->avail_out =
strm->avail_in ? strm->next_in - strm->next_out : (buf + *max_len) - strm->next_out;
ret = deflate(strm, Z_FINISH);
}
if (ret != Z_BUF_ERROR || strm->avail_in == 0)
{
*max_len = strm->next_out - buf;
return ret == Z_STREAM_END ? Z_OK : ret;
}
}

// the output caught up with the input due to insufficiently compressible data -- copy the
// remaining input data into an allocated buffer and complete the compression from there to the
// now empty input buffer (this will only occur for long incompressible streams, more than ~20 MB
// for the default deflate memLevel of 8, or when *max_len is too small and less than the length
// of the header plus one byte)
auto hold = static_cast<std::remove_const_t<decltype(z_stream::next_in)>>(
strm->zalloc(strm->opaque, strm->avail_in, 1)); // allocated buffer to hold input data
if (hold == Z_NULL)
{
return Z_MEM_ERROR;
}
std::memcpy(hold, strm->next_in, strm->avail_in);
strm->next_in = hold;
if (have)
{
std::memcpy(buf, temp.data(), have);
strm->next_out = buf + have;
}
strm->avail_out = (buf + *max_len) - strm->next_out;
ret = deflate(strm, Z_FINISH);
strm->zfree(strm->opaque, hold);
*max_len = strm->next_out - buf;
return ret == Z_OK ? Z_BUF_ERROR : (ret == Z_STREAM_END ? Z_OK : ret);
}
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW

void Session::SendRequest(
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) noexcept
{
is_session_active_.store(true, std::memory_order_release);
std::string url = host_ + std::string(http_request_->uri_);
const auto &url = host_ + http_request_->uri_;
auto callback_ptr = callback.get();
bool reuse_connection = false;

Expand All @@ -76,44 +153,47 @@ void Session::SendRequest(
if (http_request_->compression_ == opentelemetry::ext::http::client::Compression::kGzip)
{
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
http_request_->AddHeader("Content-Encoding", "gzip");

opentelemetry::ext::http::client::Body compressed_body(http_request_->body_.size());
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = static_cast<uInt>(http_request_->body_.size());
zs.next_in = http_request_->body_.data();
zs.avail_out = static_cast<uInt>(compressed_body.size());
zs.next_out = compressed_body.data();
z_stream zs{};
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;

// ZLIB: Have to maually specify 16 bits for the Gzip headers
const int window_bits = 15 + 16;
static constexpr int kWindowBits = MAX_WBITS + 16;
static constexpr int kMemLevel = MAX_MEM_LEVEL;

int stream =
deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
auto stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel,
Z_DEFAULT_STRATEGY);

if (stream == Z_OK)
{
deflate(&zs, Z_FINISH);
deflateEnd(&zs);
compressed_body.resize(zs.total_out);
http_request_->SetBody(compressed_body);
auto size = static_cast<uInt>(http_request_->body_.size());
auto max_size = size;
stream = deflateInPlace(&zs, http_request_->body_.data(), size, &max_size);

if (stream == Z_OK)
{
http_request_->AddHeader("Content-Encoding", "gzip");
http_request_->body_.resize(max_size);
}
}
else

if (stream != Z_OK)
{
if (callback)
{
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, "");
callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed,
zs.msg ? zs.msg : "");
}
is_session_active_.store(false, std::memory_order_release);
}

deflateEnd(&zs);
#else
OTEL_INTERNAL_LOG_ERROR(
"[HTTP Client Curl] Set WITH_OTLP_HTTP_COMPRESSION=ON to use gzip compression with the "
"OTLP HTTP Exporter");
#endif
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
}

curl_operation_.reset(new HttpOperation(
Expand Down Expand Up @@ -226,7 +306,7 @@ HttpClient::~HttpClient()
std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSession(
nostd::string_view url) noexcept
{
auto parsedUrl = common::UrlParser(std::string(url));
const auto parsedUrl = common::UrlParser(std::string(url));
if (!parsedUrl.success_)
{
return std::make_shared<Session>(*this);
Expand Down
4 changes: 1 addition & 3 deletions ext/src/http/client/curl/http_operation_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ HttpOperation::HttpOperation(opentelemetry::ext::http::client::Method method,
{
for (auto &kv : this->request_headers_)
{
std::string header = std::string(kv.first);
header += ": ";
header += std::string(kv.second);
const auto header = std::string(kv.first).append(": ").append(kv.second);
curl_resource_.headers_chunk =
curl_slist_append(curl_resource_.headers_chunk, header.c_str());
}
Expand Down
127 changes: 126 additions & 1 deletion ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <numeric>
#include <string>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -558,7 +559,6 @@ TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
ASSERT_TRUE(handler->is_called_);
ASSERT_TRUE(handler->got_response_);
}

TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
{
{
Expand All @@ -581,3 +581,128 @@ TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
}
}

#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
struct GzipEventHandler : public CustomEventHandler
{
~GzipEventHandler() override = default;

void OnResponse(http_client::Response & /* response */) noexcept override {}

void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
{
is_called_ = true;
state_ = state;
reason_ = std::string{reason};
}

bool is_called_ = false;
http_client::SessionState state_ = static_cast<http_client::SessionState>(-1);
std::string reason_;
};

TEST_F(BasicCurlHttpTests, GzipCompressibleData)
{
received_requests_.clear();
auto session_manager = http_client::HttpClientFactory::Create();
EXPECT_TRUE(session_manager != nullptr);

auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("post/");
request->SetMethod(http_client::Method::Post);

const auto original_size = 500UL;
http_client::Body body(original_size);
std::iota(body.begin(), body.end(), 0);
request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
auto handler = std::make_shared<GzipEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
ASSERT_TRUE(handler->reason_.empty());

auto http_request =
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
ASSERT_TRUE(http_request != nullptr);
ASSERT_LT(http_request->body_.size(), original_size);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();
}

TEST_F(BasicCurlHttpTests, GzipIncompressibleData)
{
received_requests_.clear();
auto session_manager = http_client::HttpClientFactory::Create();
EXPECT_TRUE(session_manager != nullptr);

auto session = session_manager->CreateSession("http://127.0.0.1:19000");
auto request = session->CreateRequest();
request->SetUri("post/");
request->SetMethod(http_client::Method::Post);

// Random data generated using code snippet below.
// const auto original_size = 500UL;
// http_client::Body body(original_size);
// std::random_device rd;
// std::mt19937 gen(rd());
// std::uniform_int_distribution<> uid(1, 255);
// std::generate(body.begin(), body.end(), [&]() { return uid(gen); });

// The input values are fixed to make the test repeatable in the event that some distributions
// might yield results that are, in fact, compressible.
http_client::Body body = {
140, 198, 12, 56, 165, 185, 173, 20, 13, 83, 127, 223, 77, 38, 224, 43, 236, 10, 178,
75, 169, 157, 136, 199, 74, 30, 148, 195, 51, 30, 225, 21, 121, 219, 7, 155, 198, 121,
205, 102, 80, 38, 132, 202, 45, 229, 206, 90, 150, 202, 53, 221, 54, 37, 172, 90, 238,
248, 191, 240, 109, 227, 248, 41, 251, 121, 35, 226, 107, 122, 15, 242, 203, 45, 64, 195,
186, 23, 1, 158, 61, 196, 182, 26, 201, 47, 211, 241, 251, 209, 255, 170, 181, 192, 89,
133, 176, 60, 178, 97, 168, 223, 152, 9, 118, 98, 169, 240, 170, 15, 13, 161, 24, 57,
123, 117, 230, 30, 244, 117, 238, 255, 198, 232, 95, 148, 37, 61, 67, 103, 31, 240, 52,
21, 145, 175, 201, 86, 19, 61, 228, 76, 131, 185, 111, 149, 203, 143, 16, 142, 95, 173,
42, 106, 39, 203, 116, 235, 20, 162, 112, 173, 112, 70, 126, 191, 210, 219, 90, 145, 126,
118, 43, 241, 101, 66, 175, 179, 5, 233, 208, 164, 180, 83, 214, 194, 173, 29, 179, 149,
75, 202, 17, 152, 139, 130, 94, 247, 142, 249, 159, 224, 205, 131, 93, 82, 186, 226, 210,
84, 17, 212, 155, 61, 226, 103, 152, 37, 3, 193, 216, 219, 203, 101, 99, 33, 59, 38,
106, 62, 232, 127, 44, 125, 90, 169, 148, 238, 34, 106, 12, 221, 90, 173, 67, 122, 232,
161, 89, 198, 43, 241, 195, 248, 219, 35, 47, 200, 11, 227, 168, 246, 243, 103, 38, 17,
203, 237, 203, 158, 204, 89, 231, 19, 24, 25, 199, 160, 233, 43, 117, 144, 196, 117, 152,
42, 121, 189, 217, 202, 221, 250, 157, 237, 47, 29, 64, 32, 10, 32, 243, 28, 114, 158,
228, 102, 36, 191, 139, 217, 161, 162, 186, 19, 141, 212, 49, 1, 239, 153, 107, 249, 31,
235, 138, 73, 80, 58, 152, 15, 149, 50, 42, 84, 75, 95, 82, 56, 86, 143, 45, 214,
11, 184, 164, 181, 249, 74, 184, 26, 207, 165, 162, 240, 154, 90, 56, 175, 72, 4, 166,
188, 78, 232, 87, 243, 50, 59, 62, 175, 213, 210, 182, 31, 123, 91, 118, 98, 249, 23,
170, 240, 228, 236, 121, 87, 132, 129, 250, 41, 227, 204, 250, 147, 145, 109, 149, 210, 21,
174, 165, 127, 234, 64, 211, 52, 93, 126, 117, 231, 216, 210, 15, 16, 2, 167, 215, 178,
104, 245, 119, 211, 235, 120, 135, 202, 117, 150, 101, 94, 201, 136, 179, 205, 167, 212, 236,
7, 178, 132, 228, 65, 230, 90, 171, 109, 31, 83, 31, 210, 123, 136, 76, 186, 81, 205,
63, 35, 21, 121, 152, 22, 242, 199, 106, 217, 199, 211, 206, 165, 88, 77, 112, 108, 193,
122, 8, 193, 74, 91, 50, 6, 156, 185, 165, 15, 92, 116, 3, 18, 244, 165, 191, 2,
183, 9, 164, 116, 75, 127};
const auto original_size = body.size();

request->SetBody(body);
request->AddHeader("Content-Type", "text/plain");
request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip);
auto handler = std::make_shared<GzipEventHandler>();
session->SendRequest(handler);
ASSERT_TRUE(waitForRequests(30, 1));
session->FinishSession();
ASSERT_TRUE(handler->is_called_);
ASSERT_EQ(handler->state_, http_client::SessionState::Response);
ASSERT_TRUE(handler->reason_.empty());

auto http_request =
dynamic_cast<opentelemetry::ext::http::client::curl::Request *>(request.get());
ASSERT_TRUE(http_request != nullptr);
ASSERT_EQ(http_request->body_.size(), original_size);

session_manager->CancelAllSessions();
session_manager->FinishAllSessions();
}
#endif // ENABLE_OTLP_COMPRESSION_PREVIEW
Loading
Loading