diff --git a/.github/workflows/statsd.yml b/.github/workflows/statsd.yml new file mode 100644 index 000000000..8509cd9af --- /dev/null +++ b/.github/workflows/statsd.yml @@ -0,0 +1,45 @@ +name: Statsd Metrics CI + +on: + push: + branches: "*" + paths: + - "exporters/statsd/**" + - ".github/workflows/statsd.yml" + pull_request: + branches: [main] + paths: + - "exporters/statsd/**" + - ".github/workflows/statsd.yml" +jobs: + cmake_linux: + name: CMake on Linux + runs-on: ubuntu-latest + steps: + - name: checkout opentelemetry-cpp-contrib + uses: actions/checkout@v3 + with: + path: opentelemetry-cpp-contrib + - name: checkout opentelemetry-cpp + uses: actions/checkout@v3 + with: + repository: "open-telemetry/opentelemetry-cpp" + ref: "v1.20.0" + path: "opentelemetry-cpp" + submodules: "recursive" + - name: setup + run: | + sudo apt update -y + sudo apt install -y --no-install-recommends --no-install-suggests \ + build-essential \ + cmake \ + ninja-build \ + ca-certificates wget git valgrind lcov + - name: run tests + run: | + sudo $GITHUB_WORKSPACE/opentelemetry-cpp/ci/setup_googletest.sh + mkdir -p "$GITHUB_WORKSPACE/opentelemetry-cpp/build" + cd "$GITHUB_WORKSPACE/opentelemetry-cpp/build" + cmake .. -G Ninja -DOPENTELEMETRY_EXTERNAL_COMPONENT_PATH=$GITHUB_WORKSPACE/opentelemetry-cpp-contrib/exporters/statsd + cmake --build . -j$(nproc) + ctest -j1 --output-on-failure diff --git a/exporters/statsd/CMakeLists.txt b/exporters/statsd/CMakeLists.txt new file mode 100644 index 000000000..b63103c83 --- /dev/null +++ b/exporters/statsd/CMakeLists.txt @@ -0,0 +1,144 @@ +# Copyright 2021, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +cmake_minimum_required(VERSION 3.12) + +# MAIN_PROJECT CHECK +## determine if statsd exporter is built as a subproject (using add_subdirectory) or if it is the main project +## +set(MAIN_PROJECT OFF) +if (CMAKE_CURRENT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR) + project(opentelemetry-cpp-statsd) + set(MAIN_PROJECT ON) +endif() + +if (MAIN_PROJECT) + find_package(opentelemetry-cpp CONFIG QUIET) + if(opentelemetry-cpp_FOUND) + message("Using external opentelemetry-cpp") + else() + include(cmake/opentelemetry-cpp.cmake) + build_opentelemetry() + set(OPENTELEMETRY_CPP_INCLUDE_DIRS "") + set(OPENTELEMETRY_CPP_LIBRARIES "opentelemetry::libopentelemetry") + message("opentelemetry-cpp package was not found. Cloned from github") + endif() +endif() + + +include_directories(include) + +# create statsd metrics exporter +if(WIN32) + add_library( + opentelemetry_exporter_statsd_metrics + src/exporter.cc src/etw_data_transport.cc + src/socket_data_transport.cc) +else() + add_library(opentelemetry_exporter_statsd_metrics + src/exporter.cc src/socket_data_transport.cc) +endif() + +if(MAIN_PROJECT) + target_include_directories(opentelemetry_exporter_statsd_metrics + PRIVATE ${OPENTELEMETRY_CPP_INCLUDE_DIRS}) + target_link_libraries( + opentelemetry_exporter_statsd_metrics + PUBLIC ${OPENTELEMETRY_CPP_LIBRARIES} + ) + set_target_properties(opentelemetry_exporter_statsd_metrics + PROPERTIES EXPORT_NAME metrics) +else() + target_link_libraries( + opentelemetry_exporter_statsd_metrics + PUBLIC opentelemetry_trace opentelemetry_resources opentelemetry_common + ) +endif() + + +if(MAIN_PROJECT) + option(WITH_EXAMPLES "Build examples" ON) +endif() + +if (WITH_EXAMPLES) + add_subdirectory(example) +endif() + +if(OPENTELEMETRY_INSTALL) + install( + TARGETS opentelemetry_exporter_statsd_metrics + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) + + install( + DIRECTORY include/opentelemetry/exporters/ + DESTINATION include/opentelemetry/exporters/ + FILES_MATCHING + PATTERN "*.h") +endif() + +if(BUILD_TESTING) + if(MAIN_PROJECT) + find_package(GTest CONFIG REQUIRED) + else() + if (NOT DEFINED GTEST_BOTH_LIBRARIES) + message(STATUS_FATAL, "Test is not enable.") + endif() + endif() + +endif() # BUILD_TESTING + +if (MAIN_PROJECT) + # config file for find_packages(opentelemetry-cpp-statsd CONFIG) + include(GNUInstallDirs) + include(CMakePackageConfigHelpers) + + set(OPENTELEMETRY_CPP_STATSD_VERSION "1.0.0") + set(INCLUDE_INSTALL_DIR "${CMAKE_INSTALL_INCLUDEDIR}") + configure_package_config_file( + "${CMAKE_CURRENT_LIST_DIR}/cmake/opentelemetry-cpp-statsd-config.cmake.in" + "${CMAKE_CURRENT_BINARY_DIR}/cmake/${PROJECT_NAME}/${PROJECT_NAME}-config.cmake" + INSTALL_DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" + # PATH_VARS OPENTELEMETRY_CPP_STATSD_VERSION PROJECT_NAME INCLUDE_INSTALL_DIR + # CMAKE_INSTALL_LIBDIR + PATH_VARS PROJECT_NAME INCLUDE_INSTALL_DIR CMAKE_INSTALL_LIBDIR + NO_CHECK_REQUIRED_COMPONENTS_MACRO) + + # Write version file for find_packages(opentelemetry-cpp-statsd CONFIG) + write_basic_package_version_file( + "${CMAKE_CURRENT_BINARY_DIR}/cmake/${PROJECT_NAME}/${PROJECT_NAME}-config-version.cmake" + VERSION ${OPENTELEMETRY_CPP_STATSD_VERSION} + COMPATIBILITY ExactVersion) + + install( + FILES + "${CMAKE_CURRENT_BINARY_DIR}/cmake/${PROJECT_NAME}/${PROJECT_NAME}-config.cmake" + "${CMAKE_CURRENT_BINARY_DIR}/cmake/${PROJECT_NAME}/${PROJECT_NAME}-config-version.cmake" + DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}") + + # Export all components + export( + EXPORT "${PROJECT_NAME}-target" + NAMESPACE "${PROJECT_NAME}::" + FILE "${CMAKE_CURRENT_BINARY_DIR}/cmake/${PROJECT_NAME}/${PROJECT_NAME}-target.cmake" + ) + + install( + EXPORT "${PROJECT_NAME}-target" + NAMESPACE "${PROJECT_NAME}::" + DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}") + +endif() diff --git a/exporters/statsd/README.md b/exporters/statsd/README.md new file mode 100644 index 000000000..a0c13e1fc --- /dev/null +++ b/exporters/statsd/README.md @@ -0,0 +1,7 @@ +# statsd Exporter for OpenTelemetry C++ + +## SPEC +Check [metric-instrument-optional-refinements](https://github.com/open-telemetry/opentelemetry-specification/blob/main/oteps/metrics/0088-metric-instrument-optional-refinements.md#statsd) for what we plan to support. + +## Installation +todo \ No newline at end of file diff --git a/exporters/statsd/cmake/opentelemetry-cpp-statsd-config.cmake.in b/exporters/statsd/cmake/opentelemetry-cpp-statsd-config.cmake.in new file mode 100644 index 000000000..1e8e9c2cd --- /dev/null +++ b/exporters/statsd/cmake/opentelemetry-cpp-statsd-config.cmake.in @@ -0,0 +1,59 @@ +#.rst: +# opentelemetry-cpp-statsd.config.cmake +# -------- +# +# Find the native opentelemetry-cpp-statsd includes and library. +# +# +# Result Variables +# ^^^^^^^^^^^^^^^^ +# +# This module defines the following variables: +# +# :: +# +# OPENTELEMETRY_CPP_STATSD_INCLUDE_DIRS - Include directories of opentelemetry-cpp-statsd. +# OPENTELEMETRY_CPP_STATSD_LIBRARY_DIRS - Link directories of opentelemetry-cpp-statsd. +# OPENTELEMETRY_CPP_STATSD_LIBRARIES - List of libraries when using opentelemetry-cpp-statsd. +# OPENTELEMETRY_CPP_STATSD_FOUND - True if opentelemetry-cpp-statsd found. +# OPENTELEMETRY_CPP_STATSD_VERSION - Version of opentelemetry-cpp-statsd. +# +# :: +# opentelemetry-cpp-statsd::metrics - Imported target of oopentelemetry-cpp-statsd::metrics + +# ============================================================================= +# Copyright 2020 opentelemetry. +# +# Distributed under the Apache License (the "License"); see accompanying file +# LICENSE for details. +# ============================================================================= +set(OPENTELEMETRY_CPP_STATSD_VERSION + "@OPENTELEMETRY_CPP_STATSD_VERSION@" + CACHE STRING "opentelemetry-cpp-statsd version" FORCE) + +@PACKAGE_INIT@ + +message(OPENTELEMETRY_CPP_STATSD_VERSION) +find_package(Threads) + +set_and_check(OPENTELEMETRY_CPP_STATSD_INCLUDE_DIRS "@PACKAGE_INCLUDE_INSTALL_DIR@") +set_and_check(OPENTELEMETRY_CPP_STATSD_LIBRARY_DIRS "@PACKAGE_CMAKE_INSTALL_LIBDIR@") + +include("${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME@-target.cmake") + +set(OPENTELEMETRY_CPP_STATSD_LIBRARIES) + +set(_OPENTELEMETRY_CPP_STATSD_LIBRARIES_TEST_TARGETS metrics) + +foreach(_TEST_TARGET IN LISTS _OPENTELEMETRY_CPP_STATSD_LIBRARIES_TEST_TARGETS) + if(TARGET opentelemetry-cpp-statsd::${_TEST_TARGET}) + list(APPEND OPENTELEMETRY_CPP_STATSD_LIBRARIES opentelemetry-cpp-statsd::${_TEST_TARGET}) + else() + message("Target not found: " ${_TEST_TARGET}) + endif() +endforeach() + +# handle the QUIETLY and REQUIRED arguments and set opentelemetry-cpp_FOUND to +# TRUE if all variables listed contain valid results, e.g. valid file paths. +include(CMakeFindDependencyMacro) +find_dependency(opentelemetry-cpp CONFIG) diff --git a/exporters/statsd/cmake/opentelemetry-cpp.cmake b/exporters/statsd/cmake/opentelemetry-cpp.cmake new file mode 100644 index 000000000..913dd5aee --- /dev/null +++ b/exporters/statsd/cmake/opentelemetry-cpp.cmake @@ -0,0 +1,70 @@ +if("${opentelemetry-cpp-tag}" STREQUAL "") + set(opentelemetry-cpp-tag "v1.20.0") +endif() +function(target_create _target _lib) + add_library(${_target} STATIC IMPORTED) + set_target_properties( + ${_target} PROPERTIES IMPORTED_LOCATION + "${opentelemetry_BINARY_DIR}/${_lib}") +endfunction() + +function(build_opentelemetry) + set(opentelemetry_SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/opentelemetry-cpp") + set(opentelemetry_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/opentelemetry-cpp") + set(opentelemetry_cpp_targets opentelemetry_trace opentelemetry_logs) + set(opentelemetry_CMAKE_ARGS -DCMAKE_POSITION_INDEPENDENT_CODE=ON + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} + -DWITH_LOGS_PREVIEW=ON + -DBUILD_TESTING=OFF + -DWITH_EXAMPLES=OFF) + + set(opentelemetry_libs + ${opentelemetry_BINARY_DIR}/sdk/src/trace/libopentelemetry_trace.a + ${opentelemetry_BINARY_DIR}/sdk/src/logs/libopentelemetry_logs.a + ${opentelemetry_BINARY_DIR}/sdk/src/resource/libopentelemetry_resources.a + ${opentelemetry_BINARY_DIR}/sdk/src/common/libopentelemetry_common.a + ${CURL_LIBRARIES} + ) + + set(opentelemetry_include_dir ${opentelemetry_SOURCE_DIR}/api/include/ + ${opentelemetry_SOURCE_DIR}/ext/include/ + ${opentelemetry_SOURCE_DIR}/sdk/include/ + ) + + include_directories(SYSTEM ${opentelemetry_include_dir}) + + set(opentelemetry_deps opentelemetry_trace opentelemetry_logs opentelemetry_resources opentelemetry_common ${CURL_LIBRARIES}) + + set(make_cmd ${CMAKE_COMMAND} --build --target + ${opentelemetry_cpp_targets}) + + include(ExternalProject) + ExternalProject_Add( + opentelemetry-cpp + GIT_REPOSITORY https://github.com/open-telemetry/opentelemetry-cpp.git + GIT_TAG "${opentelemetry-cpp-tag}" + GIT_SUBMODULES "third_party/opentelemetry-proto" + SOURCE_DIR ${opentelemetry_SOURCE_DIR} + PREFIX "opentelemetry-cpp" + CMAKE_ARGS ${opentelemetry_CMAKE_ARGS} + BUILD_COMMAND ${make_cmd} + BINARY_DIR ${opentelemetry_BINARY_DIR} + INSTALL_COMMAND "" + BUILD_BYPRODUCTS ${opentelemetry_libs} + DEPENDS ${dependencies} + LOG_BUILD ON) + + target_create("opentelemetry_trace" "sdk/src/trace/libopentelemetry_trace.a") + target_create("opentelemetry_logs" "sdk/src/logs/libopentelemetry_logs.a") + target_create("opentelemetry_resources" + "sdk/src/resource/libopentelemetry_resources.a") + target_create("opentelemetry_common" + "sdk/src/common/libopentelemetry_common.a") + add_library(opentelemetry::libopentelemetry INTERFACE IMPORTED) + add_dependencies(opentelemetry::libopentelemetry opentelemetry-cpp) + set_target_properties( + opentelemetry::libopentelemetry + PROPERTIES + INTERFACE_LINK_LIBRARIES "${opentelemetry_deps}") +endfunction() diff --git a/exporters/statsd/example/CMakeLists.txt b/exporters/statsd/example/CMakeLists.txt new file mode 100644 index 000000000..e69de29bb diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/connection_string_parser.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/connection_string_parser.h new file mode 100644 index 000000000..a578ede38 --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/connection_string_parser.h @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/exporters/statsd/metrics/macros.h" +#include "opentelemetry/exporters/statsd/metrics/socket_tools.h" +#include "opentelemetry/version.h" +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { +constexpr char kSemicolon = ';'; +constexpr char kEqual = '='; +constexpr char kEndpoint[] = "Endpoint"; +constexpr char kAccount[] = "Account"; +constexpr char kNamespace[] = "Namespace"; + +enum class TransportProtocol { kETW, kTCP, kUDP, kUNIX, kUnknown }; + +class ConnectionStringParser { + +public: + ConnectionStringParser(const std::string &connection_string) + : account_(""), namespace_(""), transport_protocol_{TransportProtocol::kUnknown} { + std::string::size_type key_pos = 0; + std::string::size_type key_end; + std::string::size_type val_pos; + std::string::size_type val_end; + bool is_endpoint_found = false; + while ((key_end = connection_string.find(kEqual, key_pos)) != + std::string::npos) { + if ((val_pos = connection_string.find_first_not_of(kEqual, key_end)) == + std::string::npos) + { + break; + } + val_end = connection_string.find(kSemicolon, val_pos); + auto key = connection_string.substr(key_pos, key_end - key_pos); + auto value = connection_string.substr(val_pos, val_end - val_pos); + key_pos = val_end; + if (key_pos != std::string::npos) + { + ++key_pos; + } + if (key == kNamespace) { + namespace_ = value; + } else if (key == kAccount) { + account_ = value; + } else if (key == kEndpoint) { + is_endpoint_found = true; + size_t pos = value.find("://", 0); + if (pos != std::string::npos) + { + auto scheme = std::string(value.begin(), value.begin() + pos); + connection_string_ = value.substr(pos + strlen("://")); +#ifdef HAVE_UNIX_DOMAIN + if (scheme == "unix") { + transport_protocol_ = TransportProtocol::kUNIX; + } +#else + if (scheme == "unix") { + LOG_ERROR("Unix domain socket not supported on this platform") + } +#endif + if (scheme == "tcp") { + transport_protocol_ = TransportProtocol::kTCP; + } + if (scheme == "udp") { + transport_protocol_ = TransportProtocol::kUDP; + } + } + } + } +#ifdef _WIN32 + if (account_.size() && namespace_.size() && !is_endpoint_found) { + transport_protocol_ = TransportProtocol::kETW; + } +#endif + } + + bool IsValid() { return transport_protocol_ != TransportProtocol::kUnknown; } + + std::string account_; + std::string namespace_; + TransportProtocol transport_protocol_; + std::string connection_string_; +}; +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/data_transport.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/data_transport.h new file mode 100644 index 000000000..81199e905 --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/data_transport.h @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include "opentelemetry/version.h" + +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { + +// Metrics Type defined in Statsd Protocol +// Other statsd metric types, such as c ("c"ounter), t ("t"imer), m ("m"eters), h ("h"istograms), etc are not supported, typically because such functionality is implemented differently in MDM. +enum class MetricsEventType { + // g ("g"auge) is a standard metric with a single 64-bit integer value; is an integer number. + Gauge, + + // s ("s"caled fixed-point number) is used to represent fixed-point values (such as CPU usages, load averages, fractions, etc). + // Precision for a fixed-point metric is determined by a precision setting in per-metric setup. can be any fractional number, but note that the actual value would in Geneva Metrics be truncated to a specified precision and still stored in 64-bit container. + ScaledFixedPointNumber, + + // f ("f"loating point number) is used to represent floating point numbers which vary a lot in its magnitude and thus warrant storage in a mantissa + expontent format (such as physical quantities, results of complex calculations, etc). can be any fractional number, which would be stored as double precision IEEE 754 float. + FloatingPointNumber, + + Unknown +}; + +inline std::string MetricsEventTypeToString(MetricsEventType type) { + switch (type) { + case MetricsEventType::Gauge: + return "g"; + case MetricsEventType::ScaledFixedPointNumber: + return "s"; + case MetricsEventType::FloatingPointNumber: + return "f"; + default: + return ""; + } +} + +class DataTransport { +public: + virtual bool Connect() noexcept = 0; + virtual bool Send(MetricsEventType event_type, const char *data, + uint16_t length) noexcept = 0; + virtual bool Disconnect() noexcept = 0; + virtual ~DataTransport() = default; +}; +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/etw_data_transport.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/etw_data_transport.h new file mode 100644 index 000000000..35c3bcc00 --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/etw_data_transport.h @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/exporters/statsd/metrics/connection_string_parser.h" +#include "opentelemetry/exporters/statsd/metrics/data_transport.h" +#include "opentelemetry/version.h" + +#include +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { +static const REGHANDLE INVALID_HANDLE = _UI64_MAX; +static const GUID kMDMProviderGUID = { + 0xedc24920, 0xe004, 0x40f6, 0xa8, 0xe1, 0x0e, 0x6e, 0x48, 0xf3, 0x9d, 0x84}; + +class ETWDataTransport : public DataTransport { +public: + ETWDataTransport(const size_t offset_to_skip_); + bool Connect() noexcept override; + bool Send(MetricsEventType event_type, const char *data, + uint16_t length) noexcept override; + bool Disconnect() noexcept override; + ~ETWDataTransport(); + +private: + REGHANDLE provider_handle_; + bool connected_{false}; + const size_t offset_to_skip_; +}; +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/exporter.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/exporter.h new file mode 100644 index 000000000..db069efaf --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/exporter.h @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/common/spin_lock_mutex.h" +#include "opentelemetry/common/timestamp.h" +#include "opentelemetry/exporters/statsd/metrics/connection_string_parser.h" +#include "opentelemetry/exporters/statsd/metrics/data_transport.h" +#include "opentelemetry/exporters/statsd/metrics/exporter_options.h" +#include "opentelemetry/sdk/metrics/push_metric_exporter.h" +#include "opentelemetry/sdk/metrics/data/metric_data.h" + + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { + +constexpr size_t kBufferSize = 65360; // the maximum ETW payload (inclusive) +constexpr size_t kMaxDimensionNameSize = 256; +constexpr size_t kMaxDimensionValueSize = 1024; +constexpr size_t kBinaryHeaderSize = 4; // event_id (2) + body_length (2) +constexpr size_t kMetricPayloadSize = + 24; // count_dimension (2) + reserverd_word (2) + reserverd_dword(4) + + // timestamp_utc (8) + metric_data (8) +constexpr size_t kExternalPayloadSize = + 40; // count_dimension (2) + reserverd_word (2) + count (4) + timestamp_utc + // (8) + metric_data_sum (8) + metric_data_min(8) + metric_data_max(8) + +// time conversion constants +constexpr uint32_t kWindowsTicksPerSecond = + 10000000; // windows ticks are in 100 ns +constexpr uint64_t kSecondsToUnixTime = + 11644473600L; // number of seconds between windows epoch start + // 1601-01-01T00:00:00Z and UNIX/Linux epoch + // (1970-01-01T00:00:00Z) + +const std::string kAttributeNamespaceKey = "_microsoft_metrics_namespace"; +const std::string kAttributeAccountKey = "_microsoft_metrics_account"; + +using ValueType = nostd::variant; + +/** + * The Geneva metrics exporter exports metrics data to Geneva + */ +class Exporter final : public opentelemetry::sdk::metrics::PushMetricExporter { +public: + Exporter(const ExporterOptions &options); + + opentelemetry::sdk::common::ExportResult + Export(const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept + override; + + sdk::metrics::AggregationTemporality GetAggregationTemporality( + sdk::metrics::InstrumentType instrument_type) const noexcept override; + + bool ForceFlush(std::chrono::microseconds timeout = + (std::chrono::microseconds::max)()) noexcept override; + + bool Shutdown(std::chrono::microseconds timeout = + (std::chrono::microseconds::max)()) noexcept override; + +private: + const ExporterOptions options_; + ConnectionStringParser connection_string_parser_; + const sdk::metrics::AggregationTemporalitySelector + aggregation_temporality_selector_; + bool is_shutdown_ = false; + mutable opentelemetry::common::SpinLockMutex lock_; + std::unique_ptr data_transport_; + + void SendMetrics(std::string metric_name, MetricsEventType type, + ValueType value) noexcept; + +}; + +static std::string AttributeValueToString( + const opentelemetry::sdk::common::OwnedAttributeValue &value) { + std::string result; + if (nostd::holds_alternative(value)) { + result = nostd::get(value) ? "true" : "false"; + } else if (nostd::holds_alternative(value)) { + result = std::to_string(nostd::get(value)); + } else if (nostd::holds_alternative(value)) { + result = std::to_string(nostd::get(value)); + } else if (nostd::holds_alternative(value)) { + result = std::to_string(nostd::get(value)); + } else if (nostd::holds_alternative(value)) { + result = std::to_string(nostd::get(value)); + } else if (nostd::holds_alternative(value)) { + result = std::to_string(nostd::get(value)); + } else if (nostd::holds_alternative(value)) { + result = nostd::get(value); + } else { + LOG_WARN("[Statsd Metrics Exporter] AttributeValueToString - " + " Nested attributes not supported - ignored"); + } + return result; +} + +static uint64_t UnixTimeToWindowsTicks(uint64_t unix_epoch_secs) { + uint64_t secs_since_windows_epoch = unix_epoch_secs + kSecondsToUnixTime; + return (secs_since_windows_epoch * (uint64_t)kWindowsTicksPerSecond); +} + +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/exporter_options.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/exporter_options.h new file mode 100644 index 000000000..7e0924aec --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/exporter_options.h @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { + +// Tags are used to represent dimensions in the metric data. +// We support for different style of tags +enum class TagStyle { + // For Librato-style tags, they must be appended to the metric name with a delimiting #, + // as so: "foo#tag1=bar,tag2=baz:100|c" + Librato, + + // For Geneva-style tags, they must be serialized in JSON format, + // and appended to the metric name as a JSON object, with the metric name and account and namespace as keys, + // as so: { "Metric": "TestMetricName", "Account": "TestAccount", "Namespace": "TestNamespace", "Dims": { "Dimension1Name": "Dim1Value", "Dimension2Name": "Dim2Value" }, "TS": "2018-01-01T01:02:03.004" } + Geneva, + + // The simplest way is not to use tags at all, and just send the metric name and value. + // This is the default style. + None +}; + +struct ExporterOptions { + // clang-format off + /* + Format - + Windows: + Account={MetricAccount};NameSpace={MetricNamespace} + Linux: + Endpoint=unix://{UDS Path};Account={MetricAccount};Namespace={MetricNamespace} + */ +// clang-format off + std::string connection_string; + + const std::map prepopulated_dimensions; + + TagStyle tag_style{TagStyle::None}; +}; +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/macros.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/macros.h new file mode 100644 index 000000000..46ef8dc7f --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/macros.h @@ -0,0 +1,64 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include + +#ifndef TOKENPASTE +#define TOKENPASTE(x, y) x##y +#endif + +#ifndef TOKENPASTE2 +#define TOKENPASTE2(x, y) TOKENPASTE(x, y) +#endif + +#ifndef LOCKGUARD +#define LOCKGUARD(macro_mutex) \ + std::lock_guard TOKENPASTE2(__guard_, \ + __LINE__)(macro_mutex) +#endif + +#if defined(HAVE_CONSOLE_LOG) && !defined(LOG_DEBUG) +// Log to console if there's no standard log facility defined +#include +#ifndef LOG_DEBUG +#define LOG_DEBUG(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +#define LOG_TRACE(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +#define LOG_INFO(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +#define LOG_WARN(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +#define LOG_ERROR(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +#endif +#endif + +#ifndef LOG_DEBUG +// Don't log anything if there's no standard log facility defined +#define LOG_DEBUG(fmt_, ...) +#define LOG_TRACE(fmt_, ...) +#define LOG_INFO(fmt_, ...) +#define LOG_WARN(fmt_, ...) +#define LOG_ERROR(fmt_, ...) +#endif + +// Annex K macros +#if !defined(_MSC_VER) +#ifndef strncpy_s +#define strncpy_s(dest, destsz, src, count) \ + strncpy(dest, src, (destsz <= count) ? destsz : count) +#endif +#endif + +// SAL macro +#ifndef _Out_cap_ +#define _Out_cap_(size) +#endif diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/socket_data_transport.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/socket_data_transport.h new file mode 100644 index 000000000..f12ea1ab5 --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/socket_data_transport.h @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/exporters/statsd/metrics/connection_string_parser.h" +#include "opentelemetry/exporters/statsd/metrics/data_transport.h" +#include "opentelemetry/exporters/statsd/metrics/socket_tools.h" +#include "opentelemetry/version.h" + +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { +class SocketDataTransport : public DataTransport { +public: + SocketDataTransport(const ConnectionStringParser &parser); + bool Connect() noexcept override; + bool Send(MetricsEventType event_type, const char *data, + uint16_t length) noexcept override; + bool Disconnect() noexcept override; + ~SocketDataTransport() = default; + +private: + // Socket connection is re-established for every batch of events + SocketTools::SocketParams socketparams_{AF_UNIX, SOCK_STREAM, 0}; + SocketTools::Socket socket_; + std::unique_ptr addr_; + bool connected_{false}; +}; +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/socket_tools.h b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/socket_tools.h new file mode 100644 index 000000000..2fba8d3e0 --- /dev/null +++ b/exporters/statsd/include/opentelemetry/exporters/statsd/metrics/socket_tools.h @@ -0,0 +1,1129 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef _MSC_VER +#pragma warning(push) +// Disable: warning C4267: 'argument': conversion from `size_t` to `int`, +// possible loss of data. +// +// WinSock vs POSIX sockets use different definition of socket payload size, +// e.g. in definition of socket ::send 'len' argument: +// - WinSock: int len +// - Linux: size_t len +// - BSD: size_t len +// +// We keep C++ method signature identical and prefer `size_t`. It is expected +// that Windows client cann not physically attempt to send a buffer larger than +// a size of max int. +// +#pragma warning(disable : 4267) +#endif + +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#include +#include +#include + +// sa_family_t is not defined in Windows, so typedef-ing it. +// refer - https://learn.microsoft.com/en-us/windows/win32/winsock/sockaddr-2 +typedef u_short sa_family_t; + +#ifdef min +// NOMINMAX may be a better choice. However, defining it globally may break +// other. Code that depends on macro definition in Windows SDK. +#undef min +#undef max +#endif + +// This code requires WinSock2 on Windows. +#pragma comment(lib, "ws2_32.lib") +// Workaround for libcurl redefinition of afunix.h struct : +// https://github.com/curl/curl/blob/7645324072c2f052fa662aded6f26821141ecda1/lib/config-win32.h#L721 +// Unfortunately libcurl defines a structure that should otherwise be normally +// defined by afunix.h . When that happens, we cannot build the sockets library +// with Unix domain support. +#if !defined(USE_UNIX_SOCKETS) +#ifdef __has_include +#if __has_include() +// Win 10 SDK 17063+ is necessary for Unix domain sockets support. +#include +#define HAVE_UNIX_DOMAIN +#endif +#endif +#endif + +#else +#define HAVE_UNIX_DOMAIN +#include + +#ifdef __linux__ +#include +#endif + +#if __APPLE__ +#include "TargetConditionals.h" +// Use kqueue on mac +#include +#include +#include +#endif + +// Common POSIX headers for Linux and Mac OS X +#include +#include +#include +#include +#include +#include +#include +#endif + +#if !defined(_MSC_VER) && !defined(__STDC_LIB_EXT1__) +#ifndef strncpy_s +#define strncpy_s(dest, destsz, src, count) \ + strncpy(dest, src, (destsz <= count) ? destsz : count) +#endif +#endif + +#include "opentelemetry/exporters/statsd/metrics/macros.h" + +namespace net { + +/// +/// A simple thread, derived class overloads onThread() method. +/// +struct Thread { + std::thread m_thread; + + std::atomic m_terminate{false}; + + /// + /// Thread Constructor + /// + /// Thread + Thread() {} + + /// + /// Start Thread + /// + void startThread(bool wait = true) { + m_terminate = false; + m_thread = std::thread([&]() { this->onThread(); }); + if (wait) { + waitForStart(); + } + } + + /// + /// Wait for thread start + /// + void waitForStart() { + while (!m_thread.joinable()) { + std::this_thread::yield(); + } + } + + /// + /// Join Thread + /// + void joinThread() { + m_terminate = true; + if (m_thread.joinable()) { + m_thread.join(); + } + } + + /// + /// Indicates if this thread should terminate + /// + /// + bool shouldTerminate() const { return m_terminate; } + + /// + /// Must be implemented by children + /// + virtual void onThread() = 0; + + /// + /// Thread destructor + /// + /// + virtual ~Thread() noexcept {} +}; + +}; // namespace net + +namespace SocketTools { + +#ifdef _WIN32 +// WinSocks need extra (de)initialization, solved by a global object here, +// whose constructor/destructor will be called before and after main(). +struct WsaInitializer { + WsaInitializer() { + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + } + + ~WsaInitializer() { WSACleanup(); } +}; + +static WsaInitializer g_wsaInitializer; + +#endif + +/// +/// Encapsulation of C struct sockaddr[_in|_un] with additional helper methods. +/// Both Internet and Unix Domain sockets are suported. +/// The struct may be cast directly to sockaddr: +/// - operator sockaddr *() +/// - size_t size() const - returns proper size depending on socket type. +/// +struct SocketAddr { + // The union is only as big as necessary to hold its largest data member. + // Modern OS (both Un*x and Windows) require at least sizeof(sockaddr_un). + union { + sockaddr m_data; + sockaddr_in m_data_in; + sockaddr_in6 m_data_in6; +#ifdef HAVE_UNIX_DOMAIN + sockaddr_un m_data_un; +#endif + }; + + constexpr static u_long const Loopback = 0x7F000001; + + // Indicator that the sockaddr is sockaddr_un + bool isUnixDomain; + size_t abstract_socket_size; + + /// + /// SocketAddr constructor + /// + /// SocketAddr + SocketAddr() { + isUnixDomain = false; + abstract_socket_size = 0; +#ifdef HAVE_UNIX_DOMAIN + memset(&m_data_un, 0, sizeof(m_data_un)); +#else + memset(&m_data_in6, 0, sizeof(m_data_in6)); +#endif + } + + SocketAddr(u_long addr, int port) { + isUnixDomain = false; + sockaddr_in &inet4 = reinterpret_cast(m_data); + inet4.sin_family = AF_INET; + inet4.sin_port = htons(static_cast(port)); + inet4.sin_addr.s_addr = htonl(addr); + } + + SocketAddr(const char *addr, bool unixDomain = false) : SocketAddr() { + isUnixDomain = unixDomain; + std::string ipAddress = addr; + auto found = ipAddress.find("://"); + if (found != std::string::npos) { + // always strip scheme + ipAddress.erase(0, found + 3); + } + +#ifdef HAVE_UNIX_DOMAIN + if (isUnixDomain) { + m_data_un.sun_family = AF_UNIX; + const char *unix_domain_path = ipAddress.data(); + // Max length of Unix domain filename is up to 108 chars + strncpy_s(m_data_un.sun_path, sizeof(m_data_un.sun_path), + unix_domain_path, sizeof(m_data_un.sun_path)); + // special handling for abstract socket - they should be + // prefixed with '@' character. + if (unix_domain_path[0] == '@') { + m_data_un.sun_path[0] = '\0'; + abstract_socket_size = ipAddress.size(); + } + return; + } +#endif + + // Convert {IPv4|IPv6}:{port} string to Network address and Port. + int port = 0; + + // If numColons is more than 2, then it is IPv6 address + size_t numColons = std::count(ipAddress.begin(), ipAddress.end(), ':'); + // Find last colon, which should indicate the port number + char const *lastColon = strrchr(ipAddress.data(), ':'); + if (lastColon) { + port = atoi(lastColon + 1); + // Erase port number + ipAddress.erase(lastColon - ipAddress.data()); + } + + // If there are more than two colons, it means the input is IPv6, e.g + // [fe80::c018:4a9b:3681:4e41]:3000 + if (numColons > 1) { + sockaddr_in6 &inet6 = m_data_in6; + inet6.sin6_family = AF_INET6; + inet6.sin6_port = htons(port); + void *pAddrBuf = &inet6.sin6_addr; + size_t len = ipAddress.length(); + if ((ipAddress[0] == '[') && (ipAddress[len - 1] == ']')) { + // Remove square brackets + ipAddress = ipAddress.substr(1, ipAddress.length() - 2); + } + if (!::inet_pton(inet6.sin6_family, ipAddress.c_str(), pAddrBuf)) { + LOG_ERROR("Invalid IPv6 address: %s", addr); + } + } else { + sockaddr_in &inet = m_data_in; + inet.sin_family = AF_INET; + inet.sin_port = htons(port); + void *pAddrBuf = &inet.sin_addr; + if (!::inet_pton(inet.sin_family, ipAddress.c_str(), pAddrBuf)) { + LOG_ERROR("Invalid IPv4 address: %s", addr); + } + } + } + + SocketAddr(SocketAddr const &other) = default; + + SocketAddr &operator=(SocketAddr const &other) = default; + + operator sockaddr *() { return &m_data; } + + operator const sockaddr *() const { return &m_data; } + + size_t size() const { +#ifdef HAVE_UNIX_DOMAIN + if (isUnixDomain && abstract_socket_size) + return sizeof(sa_family_t) + abstract_socket_size; + else if(isUnixDomain) + // Unix domain struct m_data_un + return sizeof(m_data_un); +#endif + // IPv4 struct m_data_in + if (m_data.sa_family == AF_INET) + return sizeof(m_data_in); + // IPv6 struct m_data_in6 + if (m_data.sa_family == AF_INET6) + return sizeof(m_data_in6); + // RAW socket? + return sizeof(m_data); + } + + int port() const { +#ifdef HAVE_UNIX_DOMAIN + if (isUnixDomain) { + return -1; + } +#endif + + switch (m_data.sa_family) { + case AF_INET6: { + return ntohs(m_data_in6.sin6_port); + } + case AF_INET: { + return ntohs(m_data_in.sin_port); + } + default: + return -1; + } + } + + std::string toString() const { + std::ostringstream os; + +#ifdef HAVE_UNIX_DOMAIN + if (isUnixDomain) { + os << (const char *)(m_data_un.sun_path); + } else +#endif + { + switch (m_data.sa_family) { + case AF_INET6: { + char buff[NI_MAXHOST] = {0}; + inet_ntop(AF_INET6, &(m_data_in6.sin6_addr), buff, sizeof(buff)); + os << '[' << buff << ']'; + os << ':' << ntohs(m_data_in6.sin6_port); + break; + } + case AF_INET: { + u_long addr = ntohl(m_data_in.sin_addr.s_addr); + os << (addr >> 24) << '.' << ((addr >> 16) & 255) << '.' + << ((addr >> 8) & 255) << '.' << (addr & 255); + os << ':' << ntohs(m_data_in.sin_port); + break; + } + default: + os << "[?AF?" << m_data.sa_family << ']'; + } + }; + return os.str(); + } +}; // namespace SocketTools + +static const char *kSchemeUDP = "udp"; +static const char *kSchemeTCP = "tcp"; +static const char *kSchemeUnix = "unix"; +static const char *kSchemeUnk = "unknown"; + +struct SocketParams { + int af; // POSIX socket domain + int type; // POSIX socket type + int proto; // POSIX socket protocol + + /** + * @brief Determine connection scheme based on socket parameters: + * "tcp", "udp", "unix" or "unknown". + * + * @return Text representation of scheme. + */ + inline const char *scheme() { + if ((af == AF_INET) || (af == AF_INET6)) { + if (type == SOCK_DGRAM) + return kSchemeUDP; + if (type == SOCK_STREAM) + return kSchemeTCP; + } + if (af == AF_UNIX) { + return kSchemeUnix; + } + return kSchemeUnk; + } +}; + +/// +/// Encapsulation of a socket (non-exclusive ownership) +/// +struct Socket { +#ifdef _WIN32 + typedef SOCKET Type; + static constexpr Type const Invalid = INVALID_SOCKET; +#else + typedef int Type; /* POSIX m_sock type is int */ + static constexpr Type const Invalid = -1; +#endif + + Type m_sock; + + Socket(SocketParams params) : Socket(params.af, params.type, params.proto) {} + + Socket(Type sock = Invalid) : m_sock(sock) {} + + Socket(int af, int type, int proto) { m_sock = ::socket(af, type, proto); } + + ~Socket() {} + + operator Socket::Type() const { return m_sock; } + + bool operator==(Socket const &other) const { + return (m_sock == other.m_sock); + } + + bool operator!=(Socket const &other) const { + return (m_sock != other.m_sock); + } + + bool operator<(Socket const &other) const { return (m_sock < other.m_sock); } + + bool invalid() const { return (m_sock == Invalid); } + + void setNonBlocking() { + assert(m_sock != Invalid); +#ifdef _WIN32 + u_long value = 1; + ::ioctlsocket(m_sock, FIONBIO, &value); +#else + int flags = ::fcntl(m_sock, F_GETFL, 0); + ::fcntl(m_sock, F_SETFL, flags | O_NONBLOCK); +#endif + } + + bool setReuseAddr() { + assert(m_sock != Invalid); +#ifdef _WIN32 + BOOL value = TRUE; +#else + int value = 1; +#endif + return (::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast(&value), sizeof(value)) == 0); + } + + bool setNoDelay() { + assert(m_sock != Invalid); +#ifdef _WIN32 + BOOL value = TRUE; +#else + int value = 1; +#endif + return (::setsockopt(m_sock, IPPROTO_TCP, TCP_NODELAY, + reinterpret_cast(&value), sizeof(value)) == 0); + } + + bool connect(SocketAddr const &addr) { + assert(m_sock != Invalid); + return (::connect(m_sock, (const sockaddr *)addr, addr.size()) == 0); + } + + void close() { +#ifdef _WIN32 + ::closesocket(m_sock); +#else + ::close(m_sock); +#endif + m_sock = Invalid; + } + + int recvfrom(_Out_cap_(size) void *buffer, size_t size, int flags, + SocketAddr &clientAddr) { + assert(m_sock != Invalid); +#ifdef _WIN32 + int len = clientAddr.size(); +#else + socklen_t len = clientAddr.size(); +#endif + return static_cast(::recvfrom(m_sock, reinterpret_cast(buffer), + size, flags, clientAddr, &len)); + } + + int recv(_Out_cap_(size) void *buffer, size_t size, int flags = 0) { + assert(m_sock != Invalid); + return static_cast( + ::recv(m_sock, reinterpret_cast(buffer), size, flags)); + } + + size_t readall(char *buffer, size_t size) { + size_t total_bytes_received = 0; + int bytes_received = 0; + // Read response fully + do { + bytes_received = recv((void *)(buffer + total_bytes_received), + size - total_bytes_received); + if (bytes_received > 0) { + total_bytes_received += bytes_received; + } else if (bytes_received == 0) { +#ifndef _WIN32 + // recv() on a blocking socket may return EAGAIN in case + // if the call timed out (no data received in time period + // specified as socket timeout) + if (errno == EAGAIN) + continue; +#else + // recv() returns 0 only when 0-byte buffer is requested + // or when the other peer has gracefully disconnected. + // No data received. + break; +#endif + } else { + // recv() error occurred. + break; + } + } while ((bytes_received > 0) && (total_bytes_received < size)); + return total_bytes_received; + } + + size_t writeall(char const *buffer, size_t size) { + size_t total_bytes_sent = 0; + int bytes_sent = 0; + // Write response fully + do { + bytes_sent = + send((void *)(buffer + total_bytes_sent), size - total_bytes_sent); + if (bytes_sent > 0) { + total_bytes_sent += bytes_sent; + } else if (bytes_sent == 0) { + // No more data to send or can't send anymore. + break; + } + if (bytes_sent < 0) { + // send() error occurred. + break; + } + } while (total_bytes_sent < size); + return total_bytes_sent; + } + + int send(void const *buffer, size_t size) { + assert(m_sock != Invalid); + if ((m_sock == Invalid) || (buffer == nullptr) || (size == 0)) + return 0; + return static_cast( + ::send(m_sock, reinterpret_cast(buffer), size, 0)); + } + + int sendto(void const *buffer, size_t size, int flags, SocketAddr &destAddr) { + assert(m_sock != Invalid); + if ((m_sock == Invalid) || (buffer == nullptr) || (size == 0)) + return 0; + int len = destAddr.size(); + return static_cast(::sendto(m_sock, + reinterpret_cast(buffer), + size, flags, destAddr, len)); + } + + int bind(SocketAddr const &addr) { + assert(m_sock != Invalid); + return ::bind(m_sock, addr, addr.size()); + } + + bool getsockname(SocketAddr &addr) const { + assert(m_sock != Invalid); +#ifdef _WIN32 + int addrlen = sizeof(addr); +#else + socklen_t addrlen = sizeof(addr); +#endif + return (::getsockname(m_sock, addr, &addrlen) == 0); + } + + template int getsockopt(int level, int optname, T &optval) { +#ifdef _WIN32 + int optlen = sizeof(T); + return ::getsockopt(m_sock, level, optname, (char *)(&optval), &optlen); +#else + socklen_t optlen = sizeof(T); + return ::getsockopt(m_sock, level, optname, (void *)&optval, &optlen); +#endif + } + + bool listen(size_t backlog) { + assert(m_sock != Invalid); + return (::listen(m_sock, backlog) == 0); + } + + bool accept(Socket &csock, SocketAddr &caddr) { + assert(m_sock != Invalid); +#ifdef _WIN32 + int addrlen = sizeof(caddr); +#else + socklen_t addrlen = sizeof(caddr); +#endif + csock = ::accept(m_sock, caddr, &addrlen); + return !csock.invalid(); + } + + bool shutdown(int how) { + assert(m_sock != Invalid); + return (::shutdown(m_sock, how) == 0); + } + + int error() const { +#ifdef _WIN32 + return ::WSAGetLastError(); +#else + return errno; +#endif + } + + enum { +#ifdef _WIN32 + ErrorWouldBlock = WSAEWOULDBLOCK +#else + ErrorWouldBlock = EWOULDBLOCK +#endif + }; + + enum { +#ifdef _WIN32 + ShutdownReceive = SD_RECEIVE, + ShutdownSend = SD_SEND, + ShutdownBoth = SD_BOTH +#else + ShutdownReceive = SHUT_RD, + ShutdownSend = SHUT_WR, + ShutdownBoth = SHUT_RDWR +#endif + }; +}; + +/// +/// Socket Data +/// +struct SocketData { + Socket socket; + int flags; + + SocketData() : socket(), flags(0) {} + + bool operator==(Socket s) { return (socket == s); } +}; + +/// +/// Socket Reactor +/// +struct Reactor : protected net::Thread { + /// + /// Socket State callback + /// + class SocketCallback { + public: + virtual void onSocketReadable(Socket sock) = 0; + virtual void onSocketWritable(Socket sock) = 0; + virtual void onSocketAcceptable(Socket sock) = 0; + virtual void onSocketClosed(Socket sock) = 0; + }; + + /// + /// Socket State + /// + enum State { Readable = 1, Writable = 2, Acceptable = 4, Closed = 8 }; + + SocketCallback &m_callback; + + std::recursive_mutex m_sockets_mutex; + std::vector m_sockets; + + // Event loop is required for stream sockets + bool m_streaming{true}; + +#ifdef _WIN32 + /* use WinSock events on Windows */ + std::vector m_events{}; +#endif + +#ifdef __linux__ + /* use epoll on Linux */ + int m_epollFd; +#endif + +#ifdef TARGET_OS_MAC + /* use kqueue on Mac */ +#define KQUEUE_SIZE 32 + int kq{0}; + struct kevent m_events[KQUEUE_SIZE]; +#endif + +public: + Reactor(SocketCallback &callback) : m_callback(callback) { +#ifdef __linux__ +#ifdef ANDROID + m_epollFd = ::epoll_create(0); +#else + m_epollFd = ::epoll_create1(0); +#endif +#endif + +#ifdef TARGET_OS_MAC + bzero(&m_events[0], sizeof(m_events)); + kq = kqueue(); +#endif + } + + ~Reactor() { +#ifdef __linux__ + ::close(m_epollFd); +#endif +#ifdef TARGET_OS_MAC + ::close(kq); +#endif + } + + /// + /// Add Socket + /// + /// + /// + void addSocket(const Socket &socket, int flags) { + if (flags == 0) { + removeSocket(socket); + return; + } + + LOCKGUARD(m_sockets_mutex); + if ((flags == State::Readable) && (m_sockets.size() == 0)) { + // No listen/accept - readable UDP datagram + m_streaming = false; + LOG_TRACE("Reactor: Adding datagram socket 0x%x with flags 0x%x", + static_cast(socket), flags); + m_sockets.push_back(SocketData()); + m_sockets.back().socket = socket; + m_sockets.back().flags = 0; + return; + } + + if (m_streaming) { + auto it = std::find(m_sockets.begin(), m_sockets.end(), socket); + if (it == m_sockets.end()) { + LOG_TRACE("Reactor: Adding socket 0x%x with flags 0x%x", + static_cast(socket), flags); +#ifdef _WIN32 + m_events.push_back(::WSACreateEvent()); +#endif +#ifdef __linux__ + epoll_event event = {}; + event.data.fd = socket; + event.events = 0; + if (::epoll_ctl(m_epollFd, EPOLL_CTL_ADD, socket, &event) != 0) { + LOG_ERROR("Reactor: epoll_ctl failed! errno=%d", errno); + } +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = socket.m_sock; + EV_SET(&event, event.ident, EVFILT_READ, EV_ADD, 0, 0, NULL); + kevent(kq, &event, 1, NULL, 0, NULL); + EV_SET(&event, event.ident, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + kevent(kq, &event, 1, NULL, 0, NULL); +#endif + m_sockets.push_back(SocketData()); + m_sockets.back().socket = socket; + m_sockets.back().flags = 0; + it = m_sockets.end() - 1; + } else { + LOG_TRACE("Reactor: Updating socket 0x%x with flags 0x%x", + static_cast(socket), flags); + } + + if (it->flags != flags) { + it->flags = flags; +#ifdef _WIN32 + long lNetworkEvents = 0; + if (it->flags & Readable) { + lNetworkEvents |= FD_READ; + } + if (it->flags & Writable) { + lNetworkEvents |= FD_WRITE; + } + if (it->flags & Acceptable) { + lNetworkEvents |= FD_ACCEPT; + } + if (it->flags & Closed) { + lNetworkEvents |= FD_CLOSE; + } + auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it); + ::WSAEventSelect(socket, *eventIt, lNetworkEvents); +#endif +#ifdef __linux__ + int events = 0; + if (it->flags & Readable) { + events |= EPOLLIN; + }; + if (it->flags & Writable) { + events |= EPOLLOUT; + }; + if (it->flags & Acceptable) { + events |= EPOLLIN; + }; + // if (it->flags & Closed) - always handled (EPOLLERR | EPOLLHUP) + epoll_event event = {}; + event.data.fd = socket; + event.events = events; + if (::epoll_ctl(m_epollFd, EPOLL_CTL_MOD, socket, &event) != 0) { + LOG_ERROR("Reactor: epoll_ctl failed! errno=%d", errno); + } +#endif +#ifdef TARGET_OS_MAC + // TODO: [MG] - Mac OS X socket doesn't currently support updating flags +#endif + } + } + } + + /// + /// Remove Socket + /// + /// + void removeSocket(const Socket &socket) { + LOCKGUARD(m_sockets_mutex); + LOG_TRACE("Reactor: Removing socket 0x%x", static_cast(socket)); + auto it = std::find(m_sockets.begin(), m_sockets.end(), socket); + if (it != m_sockets.end()) { + if (m_streaming) { +#ifdef _WIN32 + auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it); + ::WSAEventSelect(it->socket, *eventIt, 0); + ::WSACloseEvent(*eventIt); + m_events.erase(eventIt); +#endif +#ifdef __linux__ + if (::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, socket, nullptr) != 0) { + LOG_ERROR("Reactor: epoll_ctl failed! errno=%d", errno); + }; +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = socket; + EV_SET(&event, socket, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) { + //// Already removed? + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&event, socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) { + //// Already removed? + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } +#endif + } + m_sockets.erase(it); + } + } + + /// + /// Start server + /// + void start() { + LOG_INFO("Reactor: Starting..."); + startThread(); + } + + /// + /// Stop server + /// + void stop() { + LOG_INFO("Reactor: Stopping..."); + // If UDP server, then force-close it to stop. + if (!m_streaming) { + LOCKGUARD(m_sockets_mutex); + if (m_sockets.size()) { + m_sockets[0].socket.close(); + } + } + joinThread(); + + // Only acquire the lock after the worker(s) have joined + LOCKGUARD(m_sockets_mutex); +#ifdef _WIN32 + for (auto &hEvent : m_events) { + ::WSACloseEvent(hEvent); + } +#else /* Linux and Mac */ + for (auto &sd : m_sockets) { +#ifdef __linux__ + if (::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, sd.socket, nullptr) != 0) { + LOG_ERROR("Reactor: epoll_ctl failed! errno=%d", errno); + }; +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = sd.socket; + EV_SET(&event, sd.socket, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) { + LOG_ERROR("Reactor: cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&event, sd.socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) { + LOG_ERROR("Reactor: cannot delete fd=0x%x from kqueue!", event.ident); + } +#endif + } +#endif + // unbind + if (m_sockets.size()) { + m_sockets[0].socket.close(); + } + m_sockets.clear(); + } + + /// + /// Thread Loop for async events processing + /// + virtual void onThread() override { + LOG_INFO("Reactor: Thread started"); + + if (!m_streaming) { + // UDP Server implementation. + // Process only one bound address at the moment, not many. + // This single-threaded implementation passes UDP buffers + // to onSocketReadable, that should decide what to do with + // the socket. Callback may implement its own thread pool. + Socket socket = m_sockets[0].socket; + LOG_TRACE("Reactor: socket 0x%x receive loop started...", + static_cast(socket)); + while (!shouldTerminate()) { + m_callback.onSocketReadable(socket); + } + m_callback.onSocketClosed(socket); + LOG_TRACE("Reactor: socket 0x%x closed.", static_cast(socket)); + return; + } + + while (!shouldTerminate()) { + // TCP and Unix Domain Server implementation. + // + // Use event-based notification with array of client + // bidirectional stream sockets concurrently processed + // by single thread. Facility differs depending on OS: + // + // - Windows: use WSA socket events + // - Linux: use epoll + // - Mac: use kqueue + // +#ifdef _WIN32 + DWORD dwResult = + ::WSAWaitForMultipleEvents(static_cast(m_events.size()), + m_events.data(), FALSE, 500, FALSE); + if (dwResult == WSA_WAIT_TIMEOUT) { + continue; + } + + if (dwResult > WSA_WAIT_EVENT_0 + m_events.size()) { + LOG_WARN("Reactor: stale event on closed socket dwResult=%d", + (int)dwResult); + continue; + } + + int index = dwResult - WSA_WAIT_EVENT_0; + + m_sockets_mutex.lock(); + Socket socket = m_sockets[index].socket; + int flags = m_sockets[index].flags; + m_sockets_mutex.unlock(); + + WSANETWORKEVENTS ne; + ::WSAEnumNetworkEvents(socket, m_events[index], &ne); + LOG_TRACE( + "Reactor: Handling socket 0x%x (index %d) with active flags 0x%x " + "(armed 0x%x)", + static_cast(socket), index, ne.lNetworkEvents, flags); + + if ((flags & Readable) && (ne.lNetworkEvents & FD_READ)) { + m_callback.onSocketReadable(socket); + } + if ((flags & Writable) && (ne.lNetworkEvents & FD_WRITE)) { + m_callback.onSocketWritable(socket); + } + if ((flags & Acceptable) && (ne.lNetworkEvents & FD_ACCEPT)) { + m_callback.onSocketAcceptable(socket); + } + if ((flags & Closed) && (ne.lNetworkEvents & FD_CLOSE)) { + m_callback.onSocketClosed(socket); + } +#endif + +#ifdef __linux__ + { + epoll_event events[4]; + int result = ::epoll_wait(m_epollFd, events, + sizeof(events) / sizeof(events[0]), 500); + if (result == 0) + continue; + if (result < 0) { + LOG_ERROR("Reactor: got errno=%d!", errno); + continue; + }; + assert(result >= 1 && static_cast(result) <= + sizeof(events) / sizeof(events[0])); + + LOCKGUARD(m_sockets_mutex); + for (int i = 0; i < result; i++) { + auto it = + std::find(m_sockets.begin(), m_sockets.end(), events[i].data.fd); + assert(it != m_sockets.end()); + Socket socket = it->socket; + int flags = it->flags; + + LOG_TRACE( + "Reactor: Handling socket 0x%x active flags 0x%x (armed 0x%x)", + static_cast(socket), events[i].events, flags); + + if ((flags & Readable) && (events[i].events & EPOLLIN)) { + m_callback.onSocketReadable(socket); + } + if ((flags & Writable) && (events[i].events & EPOLLOUT)) { + m_callback.onSocketWritable(socket); + } + if ((flags & Acceptable) && (events[i].events & EPOLLIN)) { + m_callback.onSocketAcceptable(socket); + } + if ((flags & Closed) && (events[i].events & (EPOLLHUP | EPOLLERR))) { + LOG_TRACE("Reactor: handling socket 0x%x onSocketClosed", + static_cast(socket)); + m_callback.onSocketClosed(socket); + } + } + } +#endif + +#if defined(TARGET_OS_MAC) + { + LOCKGUARD(m_sockets_mutex); + unsigned waitms = 500; // never block for more than 500ms + struct timespec timeout; + timeout.tv_sec = waitms / 1000; + timeout.tv_nsec = (waitms % 1000) * 1000 * 1000; + + int nev = kevent(kq, NULL, 0, m_events, KQUEUE_SIZE, &timeout); + for (int i = 0; i < nev; i++) { + struct kevent &event = m_events[i]; + int fd = (int)event.ident; + auto it = std::find(m_sockets.begin(), m_sockets.end(), fd); + assert(it != m_sockets.end()); + Socket socket = it->socket; + int flags = it->flags; + + LOG_TRACE("Handling socket 0x%x active flags 0x%x (armed 0x%x)", + static_cast(socket), event.flags, event.fflags); + + if (event.filter == EVFILT_READ) { + if (flags & Acceptable) { + m_callback.onSocketAcceptable(socket); + } + if (flags & Readable) { + m_callback.onSocketReadable(socket); + } + continue; + } + + if (event.filter == EVFILT_WRITE) { + if (flags & Writable) { + m_callback.onSocketWritable(socket); + } + continue; + } + + if ((event.flags & EV_EOF) || (event.flags & EV_ERROR)) { + LOG_TRACE("event.filter=%s", "EVFILT_WRITE"); + m_callback.onSocketClosed(socket); + it->flags = Closed; + struct kevent kevt; + EV_SET(&kevt, event.ident, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL)) { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&kevt, event.ident, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL)) { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + continue; + } + LOG_ERROR("Reactor: unhandled kevent!"); + } + } +#endif + } + LOG_TRACE("Reactor: Thread done"); + } +}; + +} // namespace SocketTools + +#ifdef _MSC_VER +#pragma warning(pop) +#endif diff --git a/exporters/statsd/opentelemetry-cpp b/exporters/statsd/opentelemetry-cpp new file mode 160000 index 000000000..6175aa0b2 --- /dev/null +++ b/exporters/statsd/opentelemetry-cpp @@ -0,0 +1 @@ +Subproject commit 6175aa0b213eea053247e43b4f35b8d201fa356e diff --git a/exporters/statsd/src/etw_data_transport.cc b/exporters/statsd/src/etw_data_transport.cc new file mode 100644 index 000000000..020a71ac5 --- /dev/null +++ b/exporters/statsd/src/etw_data_transport.cc @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/exporters/statsd/metrics/etw_data_transport.h" +#include "opentelemetry/exporters/statsd/metrics/macros.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { +#define GUID_FORMAT \ + "%08lX-%04hX-%04hX-%02hhX%02hhX-%02hhX%02hhX%02hhX%02hhX%02hhX%02hhX" +#define GUID_ARG(guid) \ + (guid).Data1, (guid).Data2, (guid).Data3, (guid).Data4[0], (guid).Data4[1], \ + (guid).Data4[2], (guid).Data4[3], (guid).Data4[4], (guid).Data4[5], \ + (guid).Data4[6], (guid).Data4[7] + +ETWDataTransport::ETWDataTransport(const size_t offset_to_skip) + : offset_to_skip_{offset_to_skip} { + auto status = + ::EventRegister(&kMDMProviderGUID, NULL, NULL, &provider_handle_); + if (status != ERROR_SUCCESS) { + LOG_ERROR("ETWDataTransport:: Failed to initialize the ETW provider. " + "Metrics will not be published, Provider ID: {" GUID_FORMAT "}", + GUID_ARG(kMDMProviderGUID)); + provider_handle_ = INVALID_HANDLE; + } +} + +bool ETWDataTransport::Connect() noexcept { + // connection is already established in constructor. Check if it is still + // valid. + if (provider_handle_ == INVALID_HANDLE) { + LOG_ERROR("ETWDataTransport:: Failed to initialize the ETW provider. " + "Metrics will not be published"); + return false; + } + return true; +} + +bool ETWDataTransport::Send(MetricsEventType event_type, const char *data, + uint16_t length) noexcept { + if (provider_handle_ == INVALID_HANDLE) { + LOG_ERROR("ETWDataTransport:: ETW Provider Handle is not valid. Metrics is " + "dropped"); + return false; + } + const unsigned int descriptorSize = 1; + EVENT_DATA_DESCRIPTOR dataDescriptor[descriptorSize]; + ::ZeroMemory(&dataDescriptor, sizeof(dataDescriptor)); + // skip the event_id and the payload length (as expected by the ETW listener) + ::EventDataDescCreate(&dataDescriptor[0], data + offset_to_skip_, + length - offset_to_skip_); + + EVENT_DESCRIPTOR evtDescriptor; + ::ZeroMemory(&evtDescriptor, sizeof(EVENT_DESCRIPTOR)); + evtDescriptor.Version = 1; + evtDescriptor.Version = 0; + evtDescriptor.Id = static_cast(event_type); + auto result = ::EventWrite(provider_handle_, &evtDescriptor, descriptorSize, + dataDescriptor); + if (result != ERROR_SUCCESS) { + LOG_ERROR("ETWDataTransport:: Failed to publish metric to ETW. Error: %d", + result); + return false; + } + return true; +} + +bool ETWDataTransport::Disconnect() noexcept { + // provider is deregistered in destructor. + return true; +} + +ETWDataTransport::~ETWDataTransport() { + if (provider_handle_ != INVALID_HANDLE) { + ::EventUnregister(provider_handle_); + provider_handle_ = INVALID_HANDLE; + } +} + +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/src/exporter.cc b/exporters/statsd/src/exporter.cc new file mode 100644 index 000000000..0fc6729bc --- /dev/null +++ b/exporters/statsd/src/exporter.cc @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/exporters/statsd/metrics/exporter.h" +#include "opentelemetry/exporters/statsd/metrics/macros.h" +#ifdef _WIN32 +#include "opentelemetry/exporters/statsd/metrics/etw_data_transport.h" +#else +#include "opentelemetry/exporters/statsd/metrics/socket_data_transport.h" +#endif +#include "opentelemetry/sdk/metrics/export/metric_producer.h" +#include "opentelemetry/sdk_config.h" + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { +Exporter::Exporter(const ExporterOptions &options) + : options_(options), connection_string_parser_(options_.connection_string), + data_transport_{nullptr} { + if (connection_string_parser_.IsValid()) { +#ifdef _WIN32 + if (connection_string_parser_.transport_protocol_ == + TransportProtocol::kETW) { + data_transport_ = std::unique_ptr( + new ETWDataTransport(kBinaryHeaderSize)); + } +#else + if (connection_string_parser_.transport_protocol_ == TransportProtocol::kUNIX + || connection_string_parser_.transport_protocol_ == TransportProtocol::kTCP + || connection_string_parser_.transport_protocol_ == TransportProtocol::kUDP) { + data_transport_ = + std::unique_ptr(new SocketDataTransport( + connection_string_parser_)); + } +#endif + } + // Connect transport at initialization + auto status = data_transport_->Connect(); + if (!status) { + LOG_ERROR("[Statsd Exporter] Connect failed. No data would be sent."); + is_shutdown_ = true; + return; + } +} + +sdk::metrics::AggregationTemporality Exporter::GetAggregationTemporality( + sdk::metrics::InstrumentType instrument_type) const noexcept { + if (instrument_type == sdk::metrics::InstrumentType::kUpDownCounter) + { + return sdk::metrics::AggregationTemporality::kCumulative; + } + return sdk::metrics::AggregationTemporality::kDelta; +} + +opentelemetry::sdk::common::ExportResult Exporter::Export( + const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept { + auto shutdown = false; + { + const std::lock_guard locked(lock_); + shutdown = is_shutdown_; + } + if (shutdown) { + OTEL_INTERNAL_LOG_ERROR("[Statsd Exporter] Exporting " + << data.scope_metric_data_.size() + << " metric(s) failed, exporter is shutdown"); + return sdk::common::ExportResult::kFailure; + } + + if (data.scope_metric_data_.empty()) { + return sdk::common::ExportResult::kSuccess; + } + + for (auto &record : data.scope_metric_data_) { + for (const auto &metric_data : record.metric_data_) { + MetricsEventType event_type; + if (metric_data.instrument_descriptor.type_ == + sdk::metrics::InstrumentType::kUpDownCounter) { + event_type = MetricsEventType::Gauge; + } else if (metric_data.instrument_descriptor.type_ == + sdk::metrics::InstrumentType::kObservableCounter) { + event_type = MetricsEventType::ScaledFixedPointNumber; + } else if (metric_data.instrument_descriptor.type_ == + sdk::metrics::InstrumentType::kObservableGauge) { + event_type = MetricsEventType::FloatingPointNumber; + } else { + event_type = MetricsEventType::Unknown; + } + if (event_type == MetricsEventType::Unknown) { + LOG_ERROR("[Statsd Exporter] Exporting " + << metric_data.instrument_descriptor.name_ + << " failed, unsupported metric type"); + continue; + } + for (auto &point_data_with_attributes : metric_data.point_data_attr_) { + ValueType new_value; + if (nostd::holds_alternative( + point_data_with_attributes.point_data)) { + auto value = nostd::get( + point_data_with_attributes.point_data); + new_value = value.value_; + + + if (!nostd::holds_alternative(value.value_) && !value.is_monotonic_) { + // NOTE - Potential for minor precision loss implicitly going from + // int64_t to double - + // - A 64-bit integer can hold more significant decimal digits + // than a standard + // IEEE (64-bit) double precision floating-point + // representation + new_value = static_cast(nostd::get(new_value)); + } + + } else if (nostd::holds_alternative( + point_data_with_attributes.point_data)) { + auto value = nostd::get( + point_data_with_attributes.point_data); + new_value = value.value_; + if (nostd::holds_alternative(value.value_)) { + // NOTE - Potential for minor precision loss implicitly going from + // int64_t to double - + // - A 64-bit integer can hold more significant decimal digits + // than a standard + // IEEE (64-bit) double precision floating-point representation + new_value = static_cast(nostd::get(new_value)); + } + + } else if (nostd::holds_alternative( + point_data_with_attributes.point_data)) { + auto value = nostd::get( + point_data_with_attributes.point_data); + ValueType new_sum = value.sum_; + ValueType new_min = value.min_; + ValueType new_max = value.max_; + } + + this->SendMetrics(metric_data.instrument_descriptor.name_, + event_type, new_value); + } + } + } + return opentelemetry::sdk::common::ExportResult::kSuccess; +} + +void Exporter::SendMetrics(std::string metric_name, MetricsEventType type, ValueType value) noexcept { + std::string message; + + std::string value_str; + if (nostd::holds_alternative(value)) { + value_str = std::to_string(nostd::get(value)); + } else if (nostd::holds_alternative(value)) { + value_str = std::to_string(nostd::get(value)); + } else { + LOG_ERROR("[Statsd Exporter] SendMetrics - " + "Unsupported value type"); + return; + } + + message = metric_name + ":" + value_str + "|" + + MetricsEventTypeToString(type); + data_transport_->Send(type, message.c_str(), message.size()); +} + +bool Exporter::ForceFlush(std::chrono::microseconds timeout) noexcept { + return true; +} + +bool Exporter::Shutdown(std::chrono::microseconds timeout) noexcept { + const std::lock_guard locked(lock_); + is_shutdown_ = true; + return true; +} + +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/statsd/src/socket_data_transport.cc b/exporters/statsd/src/socket_data_transport.cc new file mode 100644 index 000000000..85e58973f --- /dev/null +++ b/exporters/statsd/src/socket_data_transport.cc @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/exporters/statsd/metrics/connection_string_parser.h" +#include "opentelemetry/exporters/statsd/metrics/socket_data_transport.h" +#include "opentelemetry/exporters/statsd/metrics/macros.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace exporter { +namespace statsd { +namespace metrics { + +SocketDataTransport::SocketDataTransport( + const ConnectionStringParser &parser) +{ + bool is_unix_domain = false; + + if (parser.transport_protocol_ == TransportProtocol::kTCP) { + socketparams_ = {AF_INET, SOCK_STREAM, 0}; + } + else if (parser.transport_protocol_ == TransportProtocol::kUDP) { + socketparams_ = {AF_INET, SOCK_DGRAM, 0}; + } + else if (parser.transport_protocol_ == TransportProtocol::kUNIX) { + socketparams_ = {AF_UNIX, SOCK_STREAM, 0}; + is_unix_domain = true; + } + else { + LOG_ERROR("Statsd Exporter: Invalid transport protocol"); + } + addr_.reset(new SocketTools::SocketAddr(parser.connection_string_.c_str(), is_unix_domain)); +} + +bool SocketDataTransport::Connect() noexcept { + if (!connected_) { + socket_ = SocketTools::Socket(socketparams_); + connected_ = socket_.connect(*addr_); + if (!connected_) { + LOG_ERROR("Statsd Exporter: UDS::Connect failed"); + return false; + } + } + return true; +} + +bool SocketDataTransport::Send(MetricsEventType event_type, + char const *data, + uint16_t length) noexcept { + int error_code = 0; + if (connected_) { + socket_.getsockopt(SOL_SOCKET, SO_ERROR, error_code); + } else { + LOG_WARN( + "Statsd Exporter: UDS::Send Socket disconnected - Trying to connect"); + auto status = Connect(); + if (!status) { + LOG_WARN( + "Statsd Exporter: UDS::Send Socket reconnect failed. Send failed"); + } + } + if (error_code != 0) { + LOG_ERROR("Statsd Exporter: UDS::Send failed - not connected"); + connected_ = false; + return false; + } + + // try to write + size_t sent_size = socket_.writeall(data, length); + if (length != sent_size) { + Disconnect(); + LOG_ERROR("Statsd Exporter: UDS::Send failed"); + return false; + } + return true; +} + +bool SocketDataTransport::Disconnect() noexcept { + if (connected_) { + connected_ = false; + if (socket_.invalid()) { + socket_.close(); + return true; + } + } + LOG_WARN("Statsd Exporter: Already disconnected"); + return false; +} +} // namespace metrics +} // namespace statsd +} // namespace exporter +OPENTELEMETRY_END_NAMESPACE \ No newline at end of file diff --git a/exporters/statsd/test/common/socket_server.h b/exporters/statsd/test/common/socket_server.h new file mode 100644 index 000000000..8d161a7a7 --- /dev/null +++ b/exporters/statsd/test/common/socket_server.h @@ -0,0 +1,420 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef SOCKET_SERVER_H +#define SOCKET_SERVER_H + +/** + * Socket server intended to be used for local loopback test purposes. + * Default namespace is: "testing". You can override it using: + * `#define SOCKET_SERVER_NS alternate_namespace_to_use` + */ +#ifndef SOCKET_SERVER_NS +#define SOCKET_SERVER_NS testing +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "opentelemetry/exporters/statsd/metrics/macros.h" +#include "opentelemetry/exporters/statsd/metrics/socket_tools.h" + +using namespace SocketTools; + +namespace SOCKET_SERVER_NS { + +/** + * @brief Common Server for TCP, UDP and Unix Domain. + */ +struct SocketServer : public Reactor::SocketCallback { + struct Connection { + enum State { + Idle, // No data transfer initiated + Receiving, // Receiving data + Responding, // Sending data + Closing, // Closing connection + Closed, // Closed connection + Aborted // Connection aborted + }; + + Socket socket; // Active client-server socket + SocketAddr client; // Client address + + std::string request_buffer; // Receive buffer for current event + std::string response_buffer; // Send buffer for current event + + std::set state; // Current connection state + bool keepalive{true}; // Keep connection alive (reserved for future use) + }; + + SocketAddr bind_address; // Server bind address + bool is_bound{false}; + SocketParams server_socket_params; // Server socket params + Socket server_socket; // Server listening socket + Reactor reactor; // Socket event handler + + // Custom callback when server receives data + std::function onRequest; + + // Custom callback when server sends a response + std::function onResponse; + + // Active client-server connections protected by recursive mutex + std::recursive_mutex connections_mutex; + std::map connections; + + // Macro to safely obtain TEMPORARY string buffer pointer +#define CLID(conn) conn.client.toString().c_str() + + const SocketAddr &address() const { return bind_address; }; + + /** + * @brief Route to start TCP, UDP or Unix Domain socket server. + * @param addr Address or Unix domain socket name to bind to. + * @param sock Socket type. + * @param numConnections Maximum number of connections. + */ + SocketServer(SocketAddr addr, SocketParams params, int numConnections = 10) + : bind_address(addr), server_socket_params(params), reactor(*this) { + server_socket = Socket(server_socket_params); + + // Default lambda here implements an echo server + onRequest = [this](Connection &conn) { + conn.state.insert(SocketServer::Connection::Responding); + }; + + onResponse = [this](Connection &) { + // Empty response + }; + + int rc = server_socket.bind(bind_address); + if (rc != 0) { + LOG_ERROR("Server: bind failed! result=%d", rc); + return; + } + + is_bound = true; + LOG_INFO("Server: bind successful. result=%d", rc); + server_socket.getsockname(bind_address); + if (server_socket_params.type == SOCK_STREAM) { + // In TCP and Unix Domain mode we listen and accept. + reactor.addSocket(server_socket, Reactor::Acceptable); + server_socket.listen(numConnections); + } else { + // In UDP mode we read in a loop, no need to accept. + reactor.addSocket(server_socket, Reactor::Readable); + } + + LOG_INFO("Server: Listening on %s://%s", server_socket_params.scheme(), + bind_address.toString().c_str()); + } + + /** + * @brief Start server. + */ + void Start() { reactor.start(); } + + /** + * @brief Stop server. + */ + void Stop() { reactor.stop(); } + + /** + * @brief Handle Reactor::State::Acceptable event. + * @param socket Client socket. + */ + virtual void onSocketAcceptable(Socket socket) override { + LOG_TRACE("Server: accepting socket fd=0x%llx", socket.m_sock); + + Socket csocket; + SocketAddr caddr; + if (socket.accept(csocket, caddr)) { +#ifdef HAVE_UNIX_DOMAIN + // If server is Unix domain, then the client socket is also Unix domain + if (bind_address.isUnixDomain) { + caddr.isUnixDomain = bind_address.isUnixDomain; + // Sometimes AF_UNIX does not auto-populate + // the bind address on accept. Thus, copy. + std::copy(std::begin(bind_address.m_data_un.sun_path), + std::end(bind_address.m_data_un.sun_path), + std::begin(caddr.m_data_un.sun_path)); + }; +#endif + + LOCKGUARD(connections_mutex); + csocket.setNonBlocking(); + Connection &conn = connections[csocket]; + conn.socket = csocket; + conn.state = {Connection::Idle}; + conn.client = caddr; + reactor.addSocket(csocket, Reactor::Readable | Reactor::Closed); + LOG_TRACE("Server: [%s] accepted", CLID(conn)); + } + } + + /** + * @brief Handle Reactor::State::Reasable event. + * @param socket Client socket. + */ + virtual void onSocketReadable(Socket socket) override { + LOG_TRACE("Server: reading socket fd=0x%x", + static_cast(socket.m_sock)); + int size = 0; + decltype(connections)::iterator it; + { + LOCKGUARD(connections_mutex); + it = connections.find(socket); + } + if (it != connections.end()) { + // TCP or Unix domain connection. + Connection &conn_tcp = it->second; + ReadStreamBuffer(conn_tcp); + onRequest(conn_tcp); + HandleConnection(conn_tcp); + } else { + // UDP datagram connection. + // Read the contents in one shot. + Connection conn_udp; + conn_udp.socket = socket; + conn_udp.state = {Connection::Receiving}; + ReadDatagramBuffer(conn_udp); + onRequest(conn_udp); + HandleConnection(conn_udp); + } + } + + /** + * @brief Event triggered when server may write data back to client. + * @param socket Client socket. + */ + virtual void onSocketWritable(Socket socket) override { + LOG_TRACE("Server: writing socket fd=0x%llx", socket.m_sock); + decltype(connections)::iterator it; + { + LOCKGUARD(connections_mutex); + it = connections.find(socket); + if (it == connections.end()) { + LOG_ERROR("Server: socket not found in connections map!"); + return; + } + } + Connection &conn = it->second; + conn.state.insert(Connection::Responding); + HandleConnection(conn); + } + + /** + * @brief Handle event when socket is closed. + * @param socket + */ + virtual void onSocketClosed(Socket socket) override { + LOG_TRACE("Server: closing socket fd=0x%llx", socket.m_sock); + LOCKGUARD(connections_mutex); + auto it = connections.find(socket); + if (it != connections.end()) { + Connection &conn = it->second; + conn.state.insert(Connection::Closing); + HandleConnection(conn); + return; + } + LOG_ERROR("Server: socket not found in connections map!"); + } + + /** + * @brief Read from TCP or Unix Domain connection into request_buffer. + * This function invokes `HandleConnection` to process the buffer. + * + * @param conn_tcp Connection object. + */ + virtual void ReadStreamBuffer(Connection &conn_tcp) { + conn_tcp.request_buffer.clear(); + conn_tcp.request_buffer.resize(4096, 0); + size_t size = conn_tcp.socket.readall( + const_cast(conn_tcp.request_buffer.data()), + conn_tcp.request_buffer.size()); + if (size > 0) { + LOG_TRACE("Server: [%s] stream read %zu bytes", CLID(conn_tcp), size); + conn_tcp.request_buffer.resize(size); + // Handle connection: process request_buffer + conn_tcp.state.insert(Connection::Receiving); + } else { + conn_tcp.request_buffer.resize(0); + LOG_ERROR("Server: [%s] failed to read client stream, errno=%d", + CLID(conn_tcp), errno); + conn_tcp.state.insert(Connection::Closing); + } + } + + /** + * @brief Read from UDP connection into request_buffer. + * This function invokes `HandleConnection` to process the buffer. + * + * @param conn_udp + */ + virtual void ReadDatagramBuffer(Connection &conn_udp) { + // Maximum size is 0xffff - (sizeof(IP Header) + sizeof(UDP Header)). + // Try to read the entire datagram. + conn_udp.request_buffer.resize(0xffff); + int size = conn_udp.socket.recvfrom( + (void *)(conn_udp.request_buffer.data()), 0xffff, 0, conn_udp.client); + if (size > 0) { + LOG_ERROR("Server: [%s] datagram read %d bytes", CLID(conn_udp), size); + conn_udp.request_buffer.resize(size); + // Handle connection: process request_buffer + conn_udp.state.insert(Connection::Receiving); + } else { + conn_udp.request_buffer.resize(0); + LOG_ERROR("Server: [%s] failed to read client datagram", CLID(conn_udp)); + } + } + + /** + * @brief Handle a timeslice of sending data back to client. If sending is + * blocked, but not all data has been sent, then this function returns with an + * indication that it needs to be called for the same connection again. + * + * @param conn Client-server connection. + * + * @return true if not all data has been bytes_sent. + */ + bool WriteResponseBuffer(Connection &conn) { + if (conn.response_buffer.empty()) { + LOG_TRACE("Server: [%s] response blocked, empty response buffer!", + CLID(conn)); + return false; + } + + size_t total_bytes_sent = 0; + uint32_t optval = 0; + + conn.socket.getsockopt(SOL_SOCKET, SO_TYPE, optval); + + // Handle UDP response + if (optval == SOCK_DGRAM) { + total_bytes_sent = conn.socket.sendto( + conn.response_buffer.data(), + static_cast(conn.response_buffer.size()), 0, conn.client); + LOG_TRACE("Server: [%s] datagram sent %zu bytes", CLID(conn), + total_bytes_sent); + return false; + } + + // Handle TCP and Unix Domain response + reactor.addSocket(conn.socket, SocketTools::Reactor::Writable); + total_bytes_sent = conn.socket.writeall(conn.response_buffer.data(), + conn.request_buffer.size()); + if (conn.response_buffer.size() != total_bytes_sent) { + conn.response_buffer.erase(0, total_bytes_sent); + LOG_WARN("Server: [%s] response blocked, total sent %zu bytes", + CLID(conn), total_bytes_sent); + // Need to send more + conn.state.insert(Connection::Responding); + return true; + } + + // Done sending + conn.state.erase(Connection::Responding); + conn.state.insert(Connection::Idle); + LOG_TRACE("Server: [%s] response complete, total sent %zu bytes", + CLID(conn), total_bytes_sent); + return false; + } + + /** + * @brief Handle event when connection is closed. + * @param conn + */ + void onConnectionClosed(Connection &conn) { + LOG_TRACE("Server: [%s] connection closing...", CLID(conn)); + if ((!conn.state.count(Connection::Idle)) && + (!conn.state.count(Connection::Closing))) { + conn.state = {Connection::Aborted}; + onConnectionAborted(conn); + } + + // reactor.addSocket(conn.socket, SocketTools::Reactor::Closed); + + reactor.removeSocket(conn.socket); + LOCKGUARD(connections_mutex); + auto it = connections.find(conn.socket); + conn.socket.close(); + conn.state.clear(); + conn.state.insert(Connection::Closed); + LOG_TRACE("Server: [%s] connection closed.", CLID(conn)); + if (it != connections.end()) { + connections.erase(it); + } + } + + void onConnectionAborted(Connection &conn) { + LOG_WARN("Server: [%s] connection closed unexpectedly", CLID(conn)); + } + + void CloseConnection(Connection &conn) { + LOG_TRACE("Server: [%s] closing connection...", CLID(conn)); + conn.socket.shutdown(SocketTools::Socket::ShutdownSend); + onConnectionClosed(conn); + } + + /** + * @brief Handle connection state update. + * + * Connection states: + * - Idle - start receiving. + * - Receiving - handle client request. + * - Responding - respond back to client. + * - Closing - closing connection. + * - Closed - connection closed. + * + * @param conn + */ + void HandleConnection(Connection &conn) { + + if (conn.state.count(Connection::Responding)) { + reactor.addSocket(conn.socket, Reactor::Writable | Reactor::Closed); + // Got data to send back + LOG_TRACE("Server: [%s] responding...", CLID(conn)); + // If WriteResponseBuffer returns true, then more data to send. + if (WriteResponseBuffer(conn)) { + return; + } + // No more data to send. Stop responding. + conn.state.erase(Connection::Responding); + reactor.addSocket(conn.socket, Reactor::Readable | Reactor::Closed); + } + + if (conn.state.count(Connection::Closing)) { + onConnectionClosed(conn); + return; + } + + // If we are done responding, we may need to keep the socket open + if (conn.keepalive) { + LOG_TRACE("Server: [%s] idle (keep-alive)", CLID(conn)); + reactor.addSocket(conn.socket, + SocketTools::Reactor::Readable | Reactor::Closed); + conn.state.insert(Connection::Idle); + } + } +}; +} // namespace SOCKET_SERVER_NS +#endif