diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fb5449698..faa873c64f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ Increment the: * [TEST] Remove workaround for metrics cardinality limit test [#3663](https://github.com/open-telemetry/opentelemetry-cpp/pull/3663) +* [METRICS] Allow registering one callback for multiple instruments + [#3667](https://github.com/open-telemetry/opentelemetry-cpp/pull/3667) * [SDK] Fix typo in hashmap method GetEnteries [#3680](https://github.com/open-telemetry/opentelemetry-cpp/pull/3680) diff --git a/api/include/opentelemetry/metrics/meter.h b/api/include/opentelemetry/metrics/meter.h index 59cfb87822..390bf006e6 100644 --- a/api/include/opentelemetry/metrics/meter.h +++ b/api/include/opentelemetry/metrics/meter.h @@ -3,7 +3,10 @@ #pragma once +#include + #include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/nostd/span.h" #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/nostd/unique_ptr.h" #include "opentelemetry/version.h" @@ -25,6 +28,8 @@ template class Gauge; class ObservableInstrument; +class MultiObserverResult; +using MultiObservableCallbackPtr = void (*)(MultiObserverResult &, void *); /** * Handles instrument creation and provides a facility for batch recording. @@ -169,6 +174,32 @@ class Meter nostd::string_view name, nostd::string_view description = "", nostd::string_view unit = "") noexcept = 0; + +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 + + /** + * Registers a callback to be invoked when metrics are collected by this meter. The callback will + * be passed a MultiObserverResult which it can use to make observations for any or all of the + * instruments provided in this registration. Any measurements recorded for instruments _not_ in + * the initial RegisterCallback call will be discarded. + * + * @param callback the callback to be invoked. + * @param state the state to be passed to the callback. + * @param instruments the instruments to be observed. + * @return a unique identifier for the registered callback, which can be used to unregister the + * callback in DeregisterCallback. + */ + virtual uintptr_t RegisterCallback(MultiObservableCallbackPtr callback, + void *state, + nostd::span instruments) noexcept = 0; + + /** + * Unregisters a callback previously registered with RegisterCallback. + * + * @param callback_id the unique identifier returned by RegisterCallback. + */ + virtual void DeregisterCallback(uintptr_t callback_id) noexcept = 0; +#endif }; } // namespace metrics OPENTELEMETRY_END_NAMESPACE diff --git a/api/include/opentelemetry/metrics/multi_observer_result.h b/api/include/opentelemetry/metrics/multi_observer_result.h new file mode 100644 index 0000000000..e3c6256225 --- /dev/null +++ b/api/include/opentelemetry/metrics/multi_observer_result.h @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include + +#include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/metrics/observer_result.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace metrics +{ + +class MultiObserverResult +{ + +public: + virtual ~MultiObserverResult() = default; + + /** + * Obtain an ObserverResultT for the given instrument, that can be used to record + * a measurement on said instrument from a multi-observer callback registered with + * Meter::RegisterCallback. The instrument _must_ have been included in the original + * call to Meter::RegisterCallback; any data points set on other instruments will be + * discarded. + * + * @param instrument The instrument for which to obtain an ObserverResult. + * @return An ObserverResultT for the given instrument. + */ + template + ObserverResultT &ForInstrument(const ObservableInstrument *instrument) = delete; + +protected: + // You can't have a virtual template, and you can't overload on return type, so we need to + // enumerate the options for the observer result type as separate methods to override. + virtual ObserverResultT &ForInstrumentDouble(const ObservableInstrument *instrument) = 0; + virtual ObserverResultT &ForInstrumentInt64(const ObservableInstrument *instrument) = 0; +}; + +template <> +inline ObserverResultT &MultiObserverResult::ForInstrument( + const ObservableInstrument *instrument) +{ + return ForInstrumentDouble(instrument); +} + +template <> +inline ObserverResultT &MultiObserverResult::ForInstrument( + const ObservableInstrument *instrument) +{ + return ForInstrumentInt64(instrument); +} + +} // namespace metrics +OPENTELEMETRY_END_NAMESPACE diff --git a/api/include/opentelemetry/metrics/noop.h b/api/include/opentelemetry/metrics/noop.h index 1d508b9387..d472f47c0f 100644 --- a/api/include/opentelemetry/metrics/noop.h +++ b/api/include/opentelemetry/metrics/noop.h @@ -229,6 +229,18 @@ class NoopMeter final : public Meter return nostd::shared_ptr( new NoopObservableInstrument(name, description, unit)); } + +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 + uintptr_t RegisterCallback( + MultiObservableCallbackPtr /* callback */, + void * /* state */, + nostd::span /* instruments */) noexcept override + { + return 0; + } + + void DeregisterCallback(uintptr_t /* callback_id */) noexcept override {} +#endif }; /** diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index 9e2107fcd1..2fdc6c121c 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -131,6 +131,14 @@ class Meter final : public opentelemetry::metrics::Meter std::vector Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept; +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 + uintptr_t RegisterCallback( + opentelemetry::metrics::MultiObservableCallbackPtr callback, + void *state, + nostd::span instruments) noexcept override; + + void DeregisterCallback(uintptr_t callback_id) noexcept override; +#endif private: // order of declaration is important here - instrumentation scope should destroy after // meter-context. diff --git a/sdk/include/opentelemetry/sdk/metrics/multi_observer_result.h b/sdk/include/opentelemetry/sdk/metrics/multi_observer_result.h new file mode 100644 index 0000000000..11ee2e3090 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/multi_observer_result.h @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include + +#include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/metrics/multi_observer_result.h" +#include "opentelemetry/metrics/observer_result.h" +#include "opentelemetry/nostd/function_ref.h" +#include "opentelemetry/nostd/variant.h" +#include "opentelemetry/sdk/metrics/observer_result.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ +class OPENTELEMETRY_EXPORT MultiObserverResult final + : public opentelemetry::metrics::MultiObserverResult +{ +public: + void RegisterInstrument(opentelemetry::metrics::ObservableInstrument *instrument); + void DeregisterInstrument(opentelemetry::metrics::ObservableInstrument *instrument); + size_t InstrumentCount() const; + bool HasInstrument(const opentelemetry::metrics::ObservableInstrument *instrument) const; + void GetInstruments( + nostd::function_ref callback); + void Reset(); + void StoreResults(opentelemetry::common::SystemTimestamp collection_ts); + +protected: + opentelemetry::metrics::ObserverResultT &ForInstrumentDouble( + const opentelemetry::metrics::ObservableInstrument *instrument) override; + opentelemetry::metrics::ObserverResultT &ForInstrumentInt64( + const opentelemetry::metrics::ObservableInstrument *instrument) override; + +private: + // This is _different_ to opentelemetry::metrics::ObserverResult because this variant is + // a variant directly of ObserverResultT, not of _pointers_ to ObserverResultT. + // This allows us to avoid an unnescessary layer of inderection and a bunch of allocations. + using ObserverResultDirect = + nostd::variant, ObserverResultT>; + std::unordered_map + observer_results_; +}; +} // namespace metrics +} // namespace sdk + +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h b/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h index b584cc055f..d06801a007 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h @@ -3,34 +3,52 @@ #pragma once +#include #include #include #include #include "opentelemetry/common/timestamp.h" #include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/metrics/meter.h" +#include "opentelemetry/sdk/metrics/multi_observer_result.h" #include "opentelemetry/version.h" +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 +# include +# include "opentelemetry/nostd/span.h" +#endif + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { -struct ObservableCallbackRecord -{ - opentelemetry::metrics::ObservableCallbackPtr callback; - void *state; - opentelemetry::metrics::ObservableInstrument *instrument; -}; +struct ObservableCallbackRecord; class ObservableRegistry { public: + // Constructor & destructor need to be defined in the observable_registry.cc TU, rather + // than implicitly defaulted here, so that we can have a unique_ptr to an incomplete + // class as a member. + ObservableRegistry(); + ~ObservableRegistry(); + + // Add a callback of the single-instrument form void AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, void *state, opentelemetry::metrics::ObservableInstrument *instrument); - + // Add a callback with the multi-instrument signature + uintptr_t AddCallback(opentelemetry::metrics::MultiObservableCallbackPtr callback, + void *state, + nostd::span instruments); + // Callbacks added with Meter::RegisterCallback have can be removed by passing back the handle + // returned + void RemoveCallback(uintptr_t id); + // Callbacks added with ObservableInstrument::AddCallback can be removed by passing back the + // original (callback function, state, instrument). void RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback, void *state, opentelemetry::metrics::ObservableInstrument *instrument); @@ -40,7 +58,7 @@ class ObservableRegistry void Observe(opentelemetry::common::SystemTimestamp collection_ts); private: - std::vector> callbacks_; + std::unordered_map> callbacks_; std::mutex callbacks_m_; }; diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index 73abddc16a..c156aa6fb5 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -12,6 +12,7 @@ add_library( meter_context.cc meter_context_factory.cc metric_reader.cc + multi_observer_result.cc instrument_metadata_validator.cc export/periodic_exporting_metric_reader.cc export/periodic_exporting_metric_reader_factory.cc diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index d7974d7590..475f8a9c13 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -44,6 +44,10 @@ # include "opentelemetry/sdk/metrics/exemplar/reservoir_utils.h" #endif +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 +# include "opentelemetry/metrics/meter.h" +#endif + namespace { @@ -663,6 +667,21 @@ std::vector Meter::Collect(CollectorHandle *collector, return metric_data_list; } +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 +uintptr_t Meter::RegisterCallback( + opentelemetry::metrics::MultiObservableCallbackPtr callback, + void *state, + nostd::span instruments) noexcept +{ + return observable_registry_->AddCallback(callback, state, instruments); +} + +void Meter::DeregisterCallback(uintptr_t callback_id) noexcept +{ + observable_registry_->RemoveCallback(callback_id); +} +#endif + // Implementation of the log message recommended by the SDK specification for duplicate instruments. // See // https://github.com/open-telemetry/opentelemetry-specification/blob/9c8c30631b0e288de93df7452f91ed47f6fba330/specification/metrics/sdk.md?plain=1#L882 diff --git a/sdk/src/metrics/multi_observer_result.cc b/sdk/src/metrics/multi_observer_result.cc new file mode 100644 index 0000000000..a8ea0d177a --- /dev/null +++ b/sdk/src/metrics/multi_observer_result.cc @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include +#include + +#include "opentelemetry/common/timestamp.h" +#include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/metrics/observer_result.h" +#include "opentelemetry/nostd/function_ref.h" +#include "opentelemetry/nostd/variant.h" +#include "opentelemetry/sdk/common/global_log_handler.h" +#include "opentelemetry/sdk/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/multi_observer_result.h" +#include "opentelemetry/sdk/metrics/observer_result.h" +#include "opentelemetry/sdk/metrics/state/metric_storage.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +namespace +{ +struct StoreResultVisitor +{ + void operator()(ObserverResultT &result) const + { + storage->RecordLong(result.GetMeasurements(), collection_ts); + } + + void operator()(ObserverResultT &result) const + { + storage->RecordDouble(result.GetMeasurements(), collection_ts); + } + + void operator()(const nostd::monostate &) const {} + + AsyncWritableMetricStorage *storage; + opentelemetry::common::SystemTimestamp collection_ts; +}; +} // namespace + +void MultiObserverResult::RegisterInstrument( + opentelemetry::metrics::ObservableInstrument *instrument) +{ + observer_results_.emplace(instrument, nostd::monostate()); +} + +void MultiObserverResult::DeregisterInstrument( + opentelemetry::metrics::ObservableInstrument *instrument) +{ + observer_results_.erase(instrument); +} + +size_t MultiObserverResult::InstrumentCount() const +{ + return observer_results_.size(); +} + +bool MultiObserverResult::HasInstrument( + const opentelemetry::metrics::ObservableInstrument *instrument) const +{ + return observer_results_.find(const_cast( + instrument)) != observer_results_.end(); +} + +void MultiObserverResult::GetInstruments( + nostd::function_ref callback) +{ + for (auto &el : observer_results_) + { + callback(el.first); + } +} + +void MultiObserverResult::Reset() +{ + for (auto it = observer_results_.begin(); it != observer_results_.end(); ++it) + { + it->second = nostd::monostate(); + } +} + +void MultiObserverResult::StoreResults(opentelemetry::common::SystemTimestamp collection_ts) +{ + for (auto &el : observer_results_) + { + auto *instrument = el.first; + auto &result = el.second; + + auto storage = static_cast(instrument) + ->GetMetricStorage(); + nostd::visit(StoreResultVisitor{storage, collection_ts}, result); + } +} + +opentelemetry::metrics::ObserverResultT &MultiObserverResult::ForInstrumentDouble( + const opentelemetry::metrics::ObservableInstrument *instrument) +{ + static opentelemetry::sdk::metrics::ObserverResultT null_result; + // const_cast is appropriate here, because we're _not_ modifying the passed-in pointer; + // we just need to make it non-const to be able to look it up in our map. + auto it = observer_results_.find( + const_cast(instrument)); + if (it == observer_results_.end()) + { + OTEL_INTERNAL_LOG_ERROR("[MultiObserverResult::ForInstrumentDouble]" + << "The instrument is not registered on with callback"); + return null_result; + } + auto *inner = nostd::get_if>(&it->second); + if (inner == nullptr) + { + return it->second.emplace>(); + } + else + { + return *inner; + } +} + +opentelemetry::metrics::ObserverResultT &MultiObserverResult::ForInstrumentInt64( + const opentelemetry::metrics::ObservableInstrument *instrument) +{ + static opentelemetry::sdk::metrics::ObserverResultT null_result; + auto it = observer_results_.find( + const_cast(instrument)); + if (it == observer_results_.end()) + { + OTEL_INTERNAL_LOG_ERROR("[MultiObserverResult::ForInstrumentInt64]" + << "The instrument is not registered on with callback"); + return null_result; + } + + auto *inner = nostd::get_if>(&it->second); + if (inner == nullptr) + { + return it->second.emplace>(); + } + else + { + return *inner; + } +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/metrics/state/observable_registry.cc b/sdk/src/metrics/state/observable_registry.cc index d8d6afb01b..9402ee87b1 100644 --- a/sdk/src/metrics/state/observable_registry.cc +++ b/sdk/src/metrics/state/observable_registry.cc @@ -1,24 +1,24 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -#include #include #include -#include +#include #include -#include +#include "opentelemetry/common/key_value_iterable.h" #include "opentelemetry/common/timestamp.h" #include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/metrics/meter.h" #include "opentelemetry/metrics/observer_result.h" +#include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/nostd/shared_ptr.h" -#include "opentelemetry/sdk/common/global_log_handler.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/metrics/async_instruments.h" #include "opentelemetry/sdk/metrics/instruments.h" -#include "opentelemetry/sdk/metrics/observer_result.h" -#include "opentelemetry/sdk/metrics/state/metric_storage.h" +#include "opentelemetry/sdk/metrics/multi_observer_result.h" #include "opentelemetry/sdk/metrics/state/observable_registry.h" -#include "opentelemetry/sdk/metrics/view/attributes_processor.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -26,16 +26,57 @@ namespace sdk { namespace metrics { +struct ObservableCallbackRecord +{ + ObservableCallbackRecord(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state, + opentelemetry::metrics::ObservableInstrument *instrument) + : callback(callback), state(state) + { + observable_result.RegisterInstrument(static_cast(instrument)); + } + + ObservableCallbackRecord(opentelemetry::metrics::MultiObservableCallbackPtr callback, + void *state, + nostd::span instruments) + : callback(callback), state(state) + { + for (auto *instrument : instruments) + { + observable_result.RegisterInstrument(static_cast(instrument)); + } + } + + nostd::variant + callback; + void *state; + MultiObserverResult observable_result; +}; + +ObservableRegistry::ObservableRegistry() = default; +ObservableRegistry::~ObservableRegistry() = default; void ObservableRegistry::AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, void *state, opentelemetry::metrics::ObservableInstrument *instrument) { - // TBD - Check if existing - std::unique_ptr record( - new ObservableCallbackRecord{callback, state, instrument}); std::lock_guard lock_guard{callbacks_m_}; - callbacks_.push_back(std::move(record)); + auto record = std::make_unique(callback, state, instrument); + auto token = reinterpret_cast(record.get()); + callbacks_.insert({token, std::move(record)}); +} + +uintptr_t ObservableRegistry::AddCallback( + opentelemetry::metrics::MultiObservableCallbackPtr callback, + void *state, + nostd::span instruments) +{ + std::lock_guard lock_guard{callbacks_m_}; + auto record = std::make_unique(callback, state, instruments); + auto token = reinterpret_cast(record.get()); + callbacks_.insert({token, std::move(record)}); + return token; } void ObservableRegistry::RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback, @@ -43,65 +84,143 @@ void ObservableRegistry::RemoveCallback(opentelemetry::metrics::ObservableCallba opentelemetry::metrics::ObservableInstrument *instrument) { std::lock_guard lock_guard{callbacks_m_}; - auto new_end = std::remove_if( - callbacks_.begin(), callbacks_.end(), - [callback, state, instrument](const std::unique_ptr &record) { - return record->callback == callback && record->state == state && - record->instrument == instrument; - }); - callbacks_.erase(new_end, callbacks_.end()); + for (auto it = callbacks_.begin(); it != callbacks_.end();) + { + const auto &record = it->second; + // Remove the callback if it's registered with the the single-instrument signature + auto observable_callback_ptr = + nostd::get_if(&record->callback); + if (observable_callback_ptr && *observable_callback_ptr == callback && record->state == state && + record->observable_result.HasInstrument(instrument)) + { + it = callbacks_.erase(it); + } + else + { + ++it; + } + } } -void ObservableRegistry::CleanupCallback(opentelemetry::metrics::ObservableInstrument *instrument) +void ObservableRegistry::RemoveCallback(uintptr_t id) { std::lock_guard lock_guard{callbacks_m_}; - auto iter = std::remove_if(callbacks_.begin(), callbacks_.end(), - [instrument](const std::unique_ptr &record) { - return record->instrument == instrument; - }); - callbacks_.erase(iter, callbacks_.end()); + callbacks_.erase(id); } -void ObservableRegistry::Observe(opentelemetry::common::SystemTimestamp collection_ts) +void ObservableRegistry::CleanupCallback(opentelemetry::metrics::ObservableInstrument *instrument) { - static DefaultAttributesProcessor default_attribute_processor; std::lock_guard lock_guard{callbacks_m_}; - for (auto &callback_wrap : callbacks_) + auto sdk_instrument = static_cast(instrument); + for (auto it = callbacks_.begin(); it != callbacks_.end();) { - auto value_type = - static_cast(callback_wrap->instrument) - ->GetInstrumentDescriptor() - .value_type_; - auto storage = - static_cast(callback_wrap->instrument) - ->GetMetricStorage(); - if (!storage) + // Remove the instrument from the multi-callback when the instrument is destroyed + it->second->observable_result.DeregisterInstrument(sdk_instrument); + + // If the multi-callback has no instruments left, remove it from the registry + if (it->second->observable_result.InstrumentCount() == 0) { - OTEL_INTERNAL_LOG_ERROR("[ObservableRegistry::Observe] - Error during observe." - << "The metric storage is invalid"); - return; + it = callbacks_.erase(it); } - if (value_type == InstrumentValueType::kDouble) + else { - nostd::shared_ptr> ob_res( - new opentelemetry::sdk::metrics::ObserverResultT(&default_attribute_processor)); - callback_wrap->callback(ob_res, callback_wrap->state); - storage->RecordDouble( - static_cast *>(ob_res.get()) - ->GetMeasurements(), - collection_ts); + ++it; } - else + } +} + +namespace +{ + +template +class ObserverResultTAdapter : public opentelemetry::metrics::ObserverResultT +{ +public: + ObserverResultTAdapter(opentelemetry::metrics::ObserverResultT *inner) : inner(inner) {} + + void Observe(T value) noexcept override + { + if (inner) { - nostd::shared_ptr> ob_res( - new opentelemetry::sdk::metrics::ObserverResultT(&default_attribute_processor)); - callback_wrap->callback(ob_res, callback_wrap->state); - storage->RecordLong( - static_cast *>(ob_res.get()) - ->GetMeasurements(), - collection_ts); + inner->Observe(value); } } + void Observe(T value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override + { + if (inner) + { + inner->Observe(value, attributes); + } + } + opentelemetry::metrics::ObserverResultT *inner; +}; + +struct InvokeCallbackVisitor +{ + void operator()(const opentelemetry::metrics::ObservableCallbackPtr &callback) + { + record->observable_result.GetInstruments( + [&](opentelemetry::metrics::ObservableInstrument *instrument) { + auto value_type = + static_cast(instrument) + ->GetInstrumentDescriptor() + .value_type_; + if (value_type == InstrumentValueType::kDouble) + { + invoke_single_instrument_callback(callback, instrument); + } + else + { + invoke_single_instrument_callback(callback, instrument); + } + }); + } + void operator()(const opentelemetry::metrics::MultiObservableCallbackPtr &callback) + { + callback(record->observable_result, record->state); + } + + template + void invoke_single_instrument_callback( + const opentelemetry::metrics::ObservableCallbackPtr &callback, + opentelemetry::metrics::ObservableInstrument *instrument) + { + // This is all a bit strangely shaped, but it's in the name of back-compat. + // The signature of ObservableCallbackPtr is that it takes a nostd::shared_ptr to + // an abstract ObserverResultT. The new implementation of ObservableRegistry doesn't + // actually need to box the ObserverResultT's into a shared_ptr, but we need to construct + // such a shared_ptr here in order to invoke the callback with the old signature. + // MultiObserverResult::ForInstrument returns a reference to an abstract ObserverResultT, + // which is allocated inside record->observable_result; thus, we need to allocate a wrapper + // around that which can be on the heap in the shared_ptr. + ObserverResultTAdapter *adapter = + new ObserverResultTAdapter(&record->observable_result.ForInstrument(instrument)); + opentelemetry::nostd::shared_ptr> result_wrapper( + adapter); + callback(result_wrapper, record->state); + // It's possible that callback() holds the shared_ptr after we return here. The reference to + // the actual ObserverResultT inside record->observable_result might not be valid for the full + // lifetime of that shared_ptr. So, null it out. + // (yes, that means the shared_ptr will now no longer _do_ anything, but it didn't in the old + // implementation either; the only requirement here is using the shared_ptr doesn't crash). + adapter->inner = nullptr; + } + + ObservableCallbackRecord *record; +}; +} // namespace + +void ObservableRegistry::Observe(opentelemetry::common::SystemTimestamp collection_ts) +{ + std::lock_guard lock_guard{callbacks_m_}; + for (const auto &pair : callbacks_) + { + auto &cb_record = pair.second; + cb_record->observable_result.Reset(); + // Visitor will either invoke the single-instrument or multi-instrument form of the callback + nostd::visit(InvokeCallbackVisitor{cb_record.get()}, cb_record->callback); + cb_record->observable_result.StoreResults(collection_ts); + } } } // namespace metrics diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index eafb116661..280d9975f6 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -35,7 +35,8 @@ foreach( periodic_exporting_metric_reader_test instrument_metadata_validator_test metric_test_stress - instrument_descriptor_test) + instrument_descriptor_test + multi_observer_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} ${GTEST_BOTH_LIBRARIES} ${GMOCK_LIB} ${CMAKE_THREAD_LIBS_INIT} diff --git a/sdk/test/metrics/multi_observer_test.cc b/sdk/test/metrics/multi_observer_test.cc new file mode 100644 index 0000000000..d4d87fe5dd --- /dev/null +++ b/sdk/test/metrics/multi_observer_test.cc @@ -0,0 +1,216 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Multi-observer functionality requires ABI version 2+ +#if OPENTELEMETRY_ABI_VERSION_NO >= 2 +# include +# include +# include +# include +# include +# include + +# include "common.h" +# include "opentelemetry/metrics/async_instruments.h" +# include "opentelemetry/metrics/meter.h" +# include "opentelemetry/metrics/meter_provider.h" +# include "opentelemetry/metrics/multi_observer_result.h" +# include "opentelemetry/metrics/observer_result.h" +# include "opentelemetry/nostd/function_ref.h" +# include "opentelemetry/nostd/shared_ptr.h" +# include "opentelemetry/nostd/span.h" +# include "opentelemetry/nostd/string_view.h" +# include "opentelemetry/sdk/metrics/data/metric_data.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" +# include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/meter_provider.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" + +using namespace opentelemetry; +using namespace opentelemetry::sdk::metrics; + +namespace +{ +nostd::shared_ptr InitMeter(MetricReader **metricReaderPtr, + const std::string &meter_name = "meter_name") +{ + static std::shared_ptr provider(new MeterProvider()); + std::unique_ptr metric_reader(new MockMetricReader()); + *metricReaderPtr = metric_reader.get(); + auto p = std::static_pointer_cast(provider); + p->AddMetricReader(std::move(metric_reader)); + auto meter = provider->GetMeter(meter_name); + return meter; +} + +void basic_multi_observable_callback(metrics::MultiObserverResult &result, void *state) +{ + auto *instruments = reinterpret_cast *>(state); + + auto &counter_record = result.ForInstrument(instruments->at(0)); + counter_record.Observe(100, {{"service", "frontend"}}); + counter_record.Observe(200, {{"service", "backend"}}); + + auto &gauge_record = result.ForInstrument(instruments->at(1)); + gauge_record.Observe(95.5, {{"cpu", "core1"}}); + gauge_record.Observe(87.2, {{"cpu", "core2"}}); +} +} // namespace + +TEST(MultiObserverTest, BasicMultiObservableCallback) +{ + MetricReader *metric_reader_ptr = nullptr; + auto meter = InitMeter(&metric_reader_ptr, "multi_observer_test"); + + auto observable_counter = + meter->CreateInt64ObservableCounter("test_counter", "Test counter", "1"); + auto observable_gauge = meter->CreateDoubleObservableGauge("test_gauge", "Test gauge", "%"); + + // Create a vector of raw pointers for the span + std::vector instrument_ptrs; + instrument_ptrs.push_back(observable_counter.get()); + instrument_ptrs.push_back(observable_gauge.get()); + + // Register a single callback that observes both instruments + auto callback_id = meter->RegisterCallback( + basic_multi_observable_callback, &instrument_ptrs, + nostd::span{instrument_ptrs.data(), instrument_ptrs.size()}); + + // Collect metrics and verify both instruments have data + size_t counter_metrics = 0; + size_t gauge_metrics = 0; + + metric_reader_ptr->Collect([&](ResourceMetrics &metric_data) { + EXPECT_EQ(metric_data.scope_metric_data_.size(), 1); + + for (const auto &metric : metric_data.scope_metric_data_[0].metric_data_) + { + if (metric.instrument_descriptor.name_ == "test_counter") + { + counter_metrics++; + EXPECT_EQ(metric.point_data_attr_.size(), 2); // frontend and backend services + } + else if (metric.instrument_descriptor.name_ == "test_gauge") + { + gauge_metrics++; + EXPECT_EQ(metric.point_data_attr_.size(), 2); // core1 and core2 CPUs + } + } + return true; + }); + + EXPECT_EQ(counter_metrics, 1); + EXPECT_EQ(gauge_metrics, 1); + + // Clean up + meter->DeregisterCallback(callback_id); +} + +namespace +{ +void empty_multi_observable_callback(metrics::MultiObserverResult &, void *state) +{ + bool *was_called = static_cast(state); + *was_called = true; +} +} // namespace + +TEST(MultiObserverTest, EmptyInstrumentsList) +{ + MetricReader *metric_reader_ptr = nullptr; + auto meter = InitMeter(&metric_reader_ptr, "multi_observer_empty_test"); + std::vector empty_instruments; + + // Register callback with empty instruments list + bool was_called = false; + auto callback_id = + meter->RegisterCallback(empty_multi_observable_callback, &was_called, + nostd::span{empty_instruments}); + + metric_reader_ptr->Collect([&](ResourceMetrics &) { return true; }); + + EXPECT_TRUE(was_called); + + // Clean up + meter->DeregisterCallback(callback_id); + + // It won't be called again if we collect + was_called = false; + metric_reader_ptr->Collect([&](ResourceMetrics &) { return true; }); + EXPECT_FALSE(was_called); +} + +namespace +{ +void non_registered_instrument_callback(metrics::MultiObserverResult &result, void *state) +{ + auto *gauge = reinterpret_cast(state); + result.ForInstrument(gauge).Observe(23.5); +} +} // namespace + +TEST(MultiObserverTest, NonRegisteredInstrument) +{ + MetricReader *metric_reader_ptr = nullptr; + auto meter = InitMeter(&metric_reader_ptr, "multi_observer_non_registered_test"); + + auto observable_counter = + meter->CreateInt64ObservableCounter("test_counter", "Test counter", "1"); + auto observable_counter_ptr = observable_counter.get(); + auto observable_gauge = meter->CreateDoubleObservableGauge("test_gauge", "Test gauge", "%"); + + // Register with one instrument (the counter), but the callback will try and record a metric + // for the other instrument (the gauge) + auto callback_id = meter->RegisterCallback( + non_registered_instrument_callback, observable_gauge.get(), + nostd::span{&observable_counter_ptr, 1}); + + metric_reader_ptr->Collect([&](ResourceMetrics &metric_data) { + // It should not have recorded any metrics; the observation for the gauge should have been + // discarded. + EXPECT_EQ(0, metric_data.scope_metric_data_.size()); + return true; + }); + meter->DeregisterCallback(callback_id); +} + +namespace +{ + +struct CallbackDeregisterdWhenDestroyedState +{ + metrics::ObservableInstrument *counter = nullptr; + int times_called = 0; +}; + +void callback_deregistered_when_destroyed_callback(metrics::MultiObserverResult &result, + void *vstate) +{ + auto state = reinterpret_cast(vstate); + state->times_called++; + result.ForInstrument(state->counter).Observe(state->times_called); +} +} // namespace + +TEST(MultiObserverTest, CallbackDeregisteredWhenInstrumentDestroyed) +{ + MetricReader *metric_reader_ptr = nullptr; + auto meter = InitMeter(&metric_reader_ptr, "callback_deregistered_when_destroyed_test"); + + CallbackDeregisterdWhenDestroyedState state{}; + auto counter_owning = meter->CreateInt64ObservableCounter("counter", "Counter", "1"); + state.counter = counter_owning.get(); + meter->RegisterCallback(callback_deregistered_when_destroyed_callback, &state, + nostd::span{&state.counter, 1}); + + // If we run a collection, the callback gets called + metric_reader_ptr->Collect([&](ResourceMetrics &) { return true; }); + EXPECT_EQ(state.times_called, 1); + + // If we allow the instrument to be destroyed, the callback should now _NOT_ get called anymore + counter_owning = nullptr; + metric_reader_ptr->Collect([&](ResourceMetrics &) { return true; }); + EXPECT_EQ(state.times_called, 1); +} + +#endif