Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
- `opentelemetry-instrumentation-asyncpg` Fix fallback for empty queries.
([#3253](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3253))

## Version 1.30.0/0.51b0 (2025-02-03)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ def _uninstrument(self, **__):

async def _do_execute(self, func, instance, args, kwargs):
exception = None
params = getattr(instance, "_params", {})
name = args[0] if args[0] else params.get("database", "postgresql")
params = getattr(instance, "_params", None)
name = (
args[0] if args[0] else getattr(params, "database", "postgresql")
)

try:
# Strip leading comments so we get the operation name.
Expand Down Expand Up @@ -185,11 +187,11 @@ async def _do_execute(self, func, instance, args, kwargs):
async def _do_cursor_execute(self, func, instance, args, kwargs):
"""Wrap cursor based functions. For every call this will generate a new span."""
exception = None
params = getattr(instance._connection, "_params", {})
params = getattr(instance._connection, "_params", None)
name = (
instance._query
if instance._query
else params.get("database", "postgresql")
else getattr(params, "database", "postgresql")
)

try:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import os
from collections import namedtuple
from unittest.mock import patch

import asyncpg

Expand All @@ -20,7 +22,26 @@ def async_call(coro):
return loop.run_until_complete(coro)


class TestFunctionalAsyncPG(TestBase):
class CheckSpanMixin:
def check_span(self, span, expected_db_name=POSTGRES_DB_NAME):
self.assertEqual(
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
)
self.assertEqual(
span.attributes[SpanAttributes.DB_NAME], expected_db_name
)
self.assertEqual(
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
)


class TestFunctionalAsyncPG(TestBase, CheckSpanMixin):
def setUp(self):
super().setUp()
self._tracer = self.tracer_provider.get_tracer(__name__)
Expand All @@ -39,25 +60,54 @@ def tearDown(self):
AsyncPGInstrumentor().uninstrument()
super().tearDown()

def check_span(self, span):
self.assertEqual(
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
)
self.assertEqual(
span.attributes[SpanAttributes.DB_NAME], POSTGRES_DB_NAME
)
self.assertEqual(
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
)
def test_instrumented_execute_method_without_arguments(self, *_, **__):
"""Should create a span for execute()."""
async_call(self._connection.execute("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT")
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
)

def test_instrumented_execute_method_error(self, *_, **__):
"""Should create an error span for execute() with the database name as the span name."""
with self.assertRaises(AttributeError):
async_call(self._connection.execute(""))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")

def test_instrumented_fetch_method_without_arguments(self, *_, **__):
"""Should create a span from fetch()."""
async_call(self._connection.fetch("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT")
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
)

def test_instrumented_execute_method_without_arguments(self, *_, **__):
async_call(self._connection.execute("SELECT 42;"))
def test_instrumented_fetch_method_empty_query(self, *_, **__):
"""Should create an error span for fetch() with the database name as the span name."""
async_call(self._connection.fetch(""))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")

def test_instrumented_fetchval_method_without_arguments(self, *_, **__):
"""Should create a span for fetchval()."""
async_call(self._connection.fetchval("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
Expand All @@ -67,17 +117,105 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
)

def test_instrumented_fetch_method_without_arguments(self, *_, **__):
async_call(self._connection.fetch("SELECT 42;"))
def test_instrumented_fetchval_method_empty_query(self, *_, **__):
"""Should create an error span for fetchval() with the database name as the span name."""
async_call(self._connection.fetchval(""))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")

def test_instrumented_fetchrow_method_without_arguments(self, *_, **__):
"""Should create a span for fetchrow()."""
async_call(self._connection.fetchrow("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT")
self.assertEqual(
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
)

def test_instrumented_fetchrow_method_empty_query(self, *_, **__):
"""Should create an error span for fetchrow() with the database name as the span name."""
async_call(self._connection.fetchrow(""))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")

def test_instrumented_cursor_execute_method_without_arguments(
self, *_, **__
):
"""Should create spans for the transaction as well as the cursor fetches."""

async def _cursor_execute():
async with self._connection.transaction():
async for record in self._connection.cursor(
"SELECT generate_series(0, 5);"
):
pass

async_call(_cursor_execute())
spans = self.memory_exporter.get_finished_spans()

self.check_span(spans[0])
self.assertEqual(spans[0].name, "BEGIN;")
self.assertEqual(
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)

for span in spans[1:-1]:
self.check_span(span)
self.assertEqual(span.name, "CURSOR: SELECT")
self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT],
"SELECT generate_series(0, 5);",
)
self.assertIs(StatusCode.UNSET, span.status.status_code)

self.check_span(spans[-1])
self.assertEqual(spans[-1].name, "COMMIT;")
self.assertEqual(
spans[-1].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
)

def test_instrumented_cursor_execute_method_empty_query(self, *_, **__):
"""Should create spans for the transaction and cursor fetches with the database name as the span name."""

async def _cursor_execute():
async with self._connection.transaction():
async for record in self._connection.cursor(""):
pass

async_call(_cursor_execute())
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 3)

self.check_span(spans[0])
self.assertEqual(
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)

self.check_span(spans[1])
self.assertEqual(spans[1].name, f"CURSOR: {POSTGRES_DB_NAME}")
self.assertEqual(spans[1].attributes[SpanAttributes.DB_STATEMENT], "")
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)

self.check_span(spans[2])
self.assertEqual(
spans[2].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
)

def test_instrumented_remove_comments(self, *_, **__):
"""Should remove comments from the query and set the span name correctly."""
async_call(self._connection.fetch("/* leading comment */ SELECT 42;"))
async_call(
self._connection.fetch(
Expand All @@ -88,25 +226,30 @@ def test_instrumented_remove_comments(self, *_, **__):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 3)
self.check_span(spans[0])
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(spans[0].name, "SELECT")
self.assertEqual(
spans[0].attributes[SpanAttributes.DB_STATEMENT],
"/* leading comment */ SELECT 42;",
)
self.check_span(spans[1])
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
self.assertEqual(spans[1].name, "SELECT")
self.assertEqual(
spans[1].attributes[SpanAttributes.DB_STATEMENT],
"/* leading comment */ SELECT 42; /* trailing comment */",
)
self.check_span(spans[2])
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
self.assertEqual(spans[2].name, "SELECT")
self.assertEqual(
spans[2].attributes[SpanAttributes.DB_STATEMENT],
"SELECT 42; /* trailing comment */",
)

def test_instrumented_transaction_method(self, *_, **__):
"""Should create spans for the transaction and the inner execute()."""

async def _transaction_execute():
async with self._connection.transaction():
await self._connection.execute("SELECT 42;")
Expand Down Expand Up @@ -134,6 +277,8 @@ async def _transaction_execute():
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)

def test_instrumented_failed_transaction_method(self, *_, **__):
"""Should create spans for the transaction as well as an error span for execute()."""

async def _transaction_execute():
async with self._connection.transaction():
await self._connection.execute("SELECT 42::uuid;")
Expand Down Expand Up @@ -164,6 +309,7 @@ async def _transaction_execute():
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)

def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
"""Should not capture parameters when capture_parameters is False."""
async_call(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
Expand All @@ -174,7 +320,7 @@ def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
)


class TestFunctionalAsyncPG_CaptureParameters(TestBase):
class TestFunctionalAsyncPG_CaptureParameters(TestBase, CheckSpanMixin):
def setUp(self):
super().setUp()
self._tracer = self.tracer_provider.get_tracer(__name__)
Expand All @@ -195,24 +341,8 @@ def tearDown(self):
AsyncPGInstrumentor().uninstrument()
super().tearDown()

def check_span(self, span):
self.assertEqual(
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
)
self.assertEqual(
span.attributes[SpanAttributes.DB_NAME], POSTGRES_DB_NAME
)
self.assertEqual(
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
)

def test_instrumented_execute_method_with_arguments(self, *_, **__):
"""Should create a span for execute() with captured parameters."""
async_call(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
Expand All @@ -228,6 +358,7 @@ def test_instrumented_execute_method_with_arguments(self, *_, **__):
)

def test_instrumented_fetch_method_with_arguments(self, *_, **__):
"""Should create a span for fetch() with captured parameters."""
async_call(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
Expand All @@ -242,10 +373,11 @@ def test_instrumented_fetch_method_with_arguments(self, *_, **__):
)

def test_instrumented_executemany_method_with_arguments(self, *_, **__):
"""Should create a span for executemany with captured parameters."""
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT $1;"
Expand All @@ -255,15 +387,41 @@ def test_instrumented_executemany_method_with_arguments(self, *_, **__):
)

def test_instrumented_execute_interface_error_method(self, *_, **__):
"""Should create an error span for execute() with captured parameters."""
with self.assertRaises(asyncpg.InterfaceError):
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
)
self.assertEqual(
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
)

def test_instrumented_executemany_method_empty_query(self, *_, **__):
"""Should create a span for executemany() with captured parameters."""
async_call(self._connection.executemany("", []))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
self.assertEqual(
spans[0].attributes["db.statement.parameters"], "([],)"
)

def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__):
"""Should create a span for fetch() with "postgresql" as the span name."""
with patch.object(
self._connection, "_params", namedtuple("ConnectionParams", [])
):
async_call(self._connection.fetch(""))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(spans[0].name, "postgresql")
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")