Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])

### Added

### Fixed
Expand Down Expand Up @@ -39,9 +42,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113))
- `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets.
([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)])
- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])
- `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error.
([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)])


## Version 1.31.0/0.52b0 (2025-03-12)

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def func():

from wrapt import wrap_function_wrapper as _wrap

from opentelemetry.instrumentation.asyncio.instrumentation_state import (
_is_instrumented,
)
from opentelemetry.instrumentation.asyncio.package import _instruments
from opentelemetry.instrumentation.asyncio.utils import (
get_coros_to_trace,
Expand Down Expand Up @@ -237,7 +240,12 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
)

def trace_to_thread(self, func: callable):
"""Trace a function."""
"""
Trace a function, but if already instrumented, skip double-wrapping.
"""
if _is_instrumented(func):
return func

start = default_timer()
func_name = getattr(func, "__name__", None)
if func_name is None and isinstance(func, functools.partial):
Expand Down Expand Up @@ -270,6 +278,13 @@ def trace_item(self, coro_or_future):
return coro_or_future

async def trace_coroutine(self, coro):
"""
Wrap a coroutine so that we measure its duration, metrics, etc.
If already instrumented, simply 'await coro' to preserve call behavior.
"""
if _is_instrumented(coro):
return await coro

if not hasattr(coro, "__name__"):
return await coro
start = default_timer()
Expand Down Expand Up @@ -303,6 +318,12 @@ async def trace_coroutine(self, coro):
self.record_process(start, attr, span, exception)

def trace_future(self, future):
"""
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
"""
if _is_instrumented(future):
return future

start = default_timer()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Instrumentation State Tracker

This module provides helper functions to safely track whether a coroutine,
Future, or function has already been instrumented by the OpenTelemetry
asyncio instrumentation layer.

Some Python objects (like coroutines or functions) may not support setting
custom attributes or weak references. To avoid memory leaks and runtime
errors, this module uses a WeakKeyDictionary to safely track instrumented
objects.

If an object cannot be weak-referenced, it is silently skipped.

Usage:
if not _is_instrumented(obj):
_mark_instrumented(obj)
# instrument the object...
"""

import weakref
from typing import Any

# A global WeakSet to track instrumented objects.
# Entries are automatically removed when the objects are garbage collected.
_instrumented_tasks = weakref.WeakSet()


def _is_instrumented(obj: Any) -> bool:
"""
Check whether the object has already been instrumented.
If not, mark it as instrumented (only if weakref is supported).

Args:
obj: A coroutine, function, or Future.

Returns:
True if the object was already instrumented.
False if the object is not trackable (no weakref support), or just marked now.

Note:
In Python 3.12+, some internal types like `async_generator_asend`
raise TypeError when weakref is attempted.
"""
try:
if obj in _instrumented_tasks:
return True
_instrumented_tasks.add(obj)
return False
except TypeError:
# Object doesn't support weak references → can't track instrumentation
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
A general test verifying that when the same Future objects (or coroutines) are
repeatedly instrumented (for example, via `trace_future`), callback references
do not leak. In this example, we mimic a typical scenario where a small set of
Futures might be reused throughout an application's lifecycle.
"""

import asyncio

from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
from opentelemetry.test.test_base import TestBase


class TestAsyncioDuplicateInstrument(TestBase):
"""
Tests whether repeated instrumentation of the same Futures leads to
exponential callback growth (potential memory leak).
"""

def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

self.instrumentor = AsyncioInstrumentor()
self.instrumentor.instrument()

def tearDown(self):
self.instrumentor.uninstrument()
self.loop.close()
asyncio.set_event_loop(None)
super().tearDown()

def test_duplicate_instrumentation_of_futures(self):
"""
If instrumentor.trace_future is called multiple times on the same Future,
we should NOT see an unbounded accumulation of callbacks.
"""
fut1 = asyncio.Future()
fut2 = asyncio.Future()

num_iterations = 10
for _ in range(num_iterations):
self.instrumentor.trace_future(fut1)
self.instrumentor.trace_future(fut2)

self.assertLessEqual(
len(fut1._callbacks),
1,
f"fut1 has {len(fut1._callbacks)} callbacks. Potential leak!",
)
self.assertLessEqual(
len(fut2._callbacks),
1,
f"fut2 has {len(fut2._callbacks)} callbacks. Potential leak!",
)