Skip to content

Commit 5bfdf96

Browse files
author
赵浩彬
committed
Replace kafka-python patch __next__ with poll
1 parent 74536f1 commit 5bfdf96

File tree

4 files changed

+62
-58
lines changed

4 files changed

+62
-58
lines changed

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def process_msg(message):
9595
_instruments_kafka_python,
9696
_instruments_kafka_python_ng,
9797
)
98-
from opentelemetry.instrumentation.kafka.utils import _wrap_next, _wrap_send
98+
from opentelemetry.instrumentation.kafka.utils import _wrap_poll, _wrap_send
9999
from opentelemetry.instrumentation.kafka.version import __version__
100100
from opentelemetry.instrumentation.utils import unwrap
101101

@@ -150,10 +150,10 @@ def _instrument(self, **kwargs):
150150
)
151151
wrap_function_wrapper(
152152
kafka.KafkaConsumer,
153-
"__next__",
154-
_wrap_next(tracer, consume_hook),
153+
"poll",
154+
_wrap_poll(tracer, consume_hook),
155155
)
156156

157157
def _uninstrument(self, **kwargs):
158158
unwrap(kafka.KafkaProducer, "send")
159-
unwrap(kafka.KafkaConsumer, "__next__")
159+
unwrap(kafka.KafkaConsumer, "poll")

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -199,30 +199,30 @@ def _create_consumer_span(
199199
context.detach(token)
200200

201201

202-
def _wrap_next(
203-
tracer: Tracer,
204-
consume_hook: ConsumeHookT,
205-
) -> Callable:
206-
def _traced_next(func, instance, args, kwargs):
207-
record = func(*args, **kwargs)
208-
209-
if record:
210-
bootstrap_servers = (
211-
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
212-
)
213-
214-
extracted_context = propagate.extract(
215-
record.headers, getter=_kafka_getter
216-
)
217-
_create_consumer_span(
218-
tracer,
219-
consume_hook,
220-
record,
221-
extracted_context,
222-
bootstrap_servers,
223-
args,
224-
kwargs,
225-
)
226-
return record
227-
228-
return _traced_next
202+
def _wrap_poll(tracer: Tracer, consume_hook: ConsumeHookT) -> Callable:
203+
def _traced_poll(func, instance, args, kwargs):
204+
records = func(*args, **kwargs)
205+
206+
for items in records.values():
207+
for record in items:
208+
if record:
209+
bootstrap_servers = (
210+
KafkaPropertiesExtractor.extract_bootstrap_servers(
211+
instance)
212+
)
213+
214+
extracted_context = propagate.extract(
215+
record.headers, getter=_kafka_getter
216+
)
217+
_create_consumer_span(
218+
tracer,
219+
consume_hook,
220+
record,
221+
extracted_context,
222+
bootstrap_servers,
223+
args,
224+
kwargs,
225+
)
226+
return records
227+
228+
return _traced_poll

instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_instrumentation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ def test_instrument_api(self) -> None:
3232
instrumentation.instrument()
3333
self.assertTrue(isinstance(KafkaProducer.send, BoundFunctionWrapper))
3434
self.assertTrue(
35-
isinstance(KafkaConsumer.__next__, BoundFunctionWrapper)
35+
isinstance(KafkaConsumer.poll, BoundFunctionWrapper)
3636
)
3737

3838
instrumentation.uninstrument()
3939
self.assertFalse(isinstance(KafkaProducer.send, BoundFunctionWrapper))
4040
self.assertFalse(
41-
isinstance(KafkaConsumer.__next__, BoundFunctionWrapper)
41+
isinstance(KafkaConsumer.poll, BoundFunctionWrapper)
4242
)
4343

4444
@patch("opentelemetry.instrumentation.kafka.distribution")

instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
_get_span_name,
2222
_kafka_getter,
2323
_kafka_setter,
24-
_wrap_next,
24+
_wrap_poll,
2525
_wrap_send,
2626
)
2727
from opentelemetry.trace import SpanKind
@@ -142,42 +142,46 @@ def wrap_send_helper(
142142
@mock.patch(
143143
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers"
144144
)
145-
def test_wrap_next(
146-
self,
147-
extract_bootstrap_servers: mock.MagicMock,
148-
_create_consumer_span: mock.MagicMock,
149-
extract: mock.MagicMock,
145+
def test_wrap_poll(
146+
self,
147+
extract_bootstrap_servers: mock.MagicMock,
148+
_create_consumer_span: mock.MagicMock,
149+
extract: mock.MagicMock,
150150
) -> None:
151+
self.args = []
152+
self.kwargs = {"timeout_ms": 1000}
151153
tracer = mock.MagicMock()
152154
consume_hook = mock.MagicMock()
153-
original_next_callback = mock.MagicMock()
155+
original_poll_callback = mock.MagicMock()
154156
kafka_consumer = mock.MagicMock()
155157

156-
wrapped_next = _wrap_next(tracer, consume_hook)
157-
record = wrapped_next(
158-
original_next_callback, kafka_consumer, self.args, self.kwargs
158+
wrapped_poll = _wrap_poll(tracer, consume_hook)
159+
records = wrapped_poll(
160+
original_poll_callback, kafka_consumer, self.args, self.kwargs
159161
)
160162

161163
extract_bootstrap_servers.assert_called_once_with(kafka_consumer)
162164
bootstrap_servers = extract_bootstrap_servers.return_value
163165

164-
original_next_callback.assert_called_once_with(
166+
original_poll_callback.assert_called_once_with(
165167
*self.args, **self.kwargs
166168
)
167-
self.assertEqual(record, original_next_callback.return_value)
168-
169-
extract.assert_called_once_with(record.headers, getter=_kafka_getter)
170-
context = extract.return_value
171-
172-
_create_consumer_span.assert_called_once_with(
173-
tracer,
174-
consume_hook,
175-
record,
176-
context,
177-
bootstrap_servers,
178-
self.args,
179-
self.kwargs,
180-
)
169+
self.assertEqual(records, original_poll_callback.return_value)
170+
171+
for items in records.values():
172+
for record in items:
173+
extract.assert_called_once_with(record.headers, getter=_kafka_getter)
174+
context = extract.return_value
175+
176+
_create_consumer_span.assert_called_once_with(
177+
tracer,
178+
consume_hook,
179+
record,
180+
context,
181+
bootstrap_servers,
182+
self.args,
183+
self.kwargs,
184+
)
181185

182186
@mock.patch("opentelemetry.trace.set_span_in_context")
183187
@mock.patch("opentelemetry.context.attach")

0 commit comments

Comments
 (0)