Skip to content

Commit 16c3d0f

Browse files
committed
modify testcase
1 parent e9546b2 commit 16c3d0f

File tree

2 files changed

+47
-48
lines changed

2 files changed

+47
-48
lines changed

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,3 +318,43 @@ def _compare_spans(
318318
self.assertEqual(
319319
expected_span["attributes"], dict(span.attributes)
320320
)
321+
322+
async def test_send_and_wait(self) -> None:
323+
AIOKafkaInstrumentor().uninstrument()
324+
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
325+
326+
producer = await self.producer_factory()
327+
add_message_mock: mock.AsyncMock = (
328+
producer._message_accumulator.add_message
329+
)
330+
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]
331+
332+
tracer = self.tracer_provider.get_tracer(__name__)
333+
with tracer.start_as_current_span("test_span") as span:
334+
await producer.send_and_wait("topic_1", b"value_1")
335+
336+
add_message_mock.assert_awaited_with(
337+
TopicPartition(topic="topic_1", partition=1),
338+
None,
339+
b"value_1",
340+
40.0,
341+
timestamp_ms=None,
342+
headers=[("traceparent", mock.ANY)],
343+
)
344+
assert (
345+
add_message_mock.call_args_list[0]
346+
.kwargs["headers"][0][1]
347+
.startswith(
348+
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
349+
)
350+
)
351+
352+
await producer.send_and_wait("topic_2", b"value_2")
353+
add_message_mock.assert_awaited_with(
354+
TopicPartition(topic="topic_2", partition=1),
355+
None,
356+
b"value_2",
357+
40.0,
358+
timestamp_ms=None,
359+
headers=[("traceparent", mock.ANY)],
360+
)

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,6 @@
2828
)
2929
from opentelemetry.trace import SpanKind
3030

31-
SEND_RETURN_VALUE = None
32-
33-
34-
async def original_send(
35-
topic,
36-
value=None,
37-
key=None,
38-
partition=None,
39-
timestamp_ms=None,
40-
headers=None,
41-
):
42-
return SEND_RETURN_VALUE
43-
4431

4532
class TestUtils(IsolatedAsyncioTestCase):
4633
def setUp(self) -> None:
@@ -122,36 +109,6 @@ async def test_wrap_send_with_topic_as_kwarg(
122109
extract_bootstrap_servers,
123110
)
124111

125-
@mock.patch(
126-
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
127-
)
128-
@mock.patch(
129-
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
130-
)
131-
@mock.patch(
132-
"opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
133-
)
134-
@mock.patch("opentelemetry.trace.set_span_in_context")
135-
@mock.patch("opentelemetry.propagate.inject")
136-
async def test_wrap_send_with_headers_as_args(
137-
self,
138-
inject: mock.MagicMock,
139-
set_span_in_context: mock.MagicMock,
140-
enrich_span: mock.MagicMock,
141-
extract_send_partition: mock.AsyncMock,
142-
extract_bootstrap_servers: mock.MagicMock,
143-
) -> None:
144-
# like send_and_wait
145-
self.args = [self.topic_name, None, None, None, None, None]
146-
self.kwargs = {}
147-
await self.wrap_send_helper(
148-
inject,
149-
set_span_in_context,
150-
enrich_span,
151-
extract_send_partition,
152-
extract_bootstrap_servers,
153-
)
154-
155112
async def wrap_send_helper(
156113
self,
157114
inject: mock.MagicMock,
@@ -163,8 +120,6 @@ async def wrap_send_helper(
163120
tracer = mock.MagicMock()
164121
produce_hook = mock.AsyncMock()
165122
original_send_callback = mock.AsyncMock()
166-
original_send_callback.side_effect = original_send
167-
original_send_callback.return_value = SEND_RETURN_VALUE
168123
kafka_producer = mock.MagicMock()
169124
expected_span_name = _get_span_name("send", self.topic_name)
170125

@@ -176,7 +131,9 @@ async def wrap_send_helper(
176131
extract_bootstrap_servers.assert_called_once_with(
177132
kafka_producer.client
178133
)
179-
extract_send_partition.assert_awaited_once()
134+
extract_send_partition.assert_awaited_once_with(
135+
kafka_producer, self.args, self.kwargs
136+
)
180137
tracer.start_as_current_span.assert_called_once_with(
181138
expected_span_name, kind=SpanKind.PRODUCER
182139
)
@@ -197,9 +154,11 @@ async def wrap_send_helper(
197154
self.headers, context=context, setter=_aiokafka_setter
198155
)
199156

200-
produce_hook.assert_awaited_once()
157+
produce_hook.assert_awaited_once_with(span, self.args, self.kwargs)
201158

202-
original_send_callback.assert_awaited_once()
159+
original_send_callback.assert_awaited_once_with(
160+
*self.args, **self.kwargs
161+
)
203162
self.assertEqual(retval, original_send_callback.return_value)
204163

205164
@mock.patch("opentelemetry.propagate.extract")

0 commit comments

Comments
 (0)