Skip to content

Commit 3f8fdf2

Browse files
authored
Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer (#1656)
1 parent b8d7448 commit 3f8fdf2

File tree

3 files changed

+20
-0
lines changed

3 files changed

+20
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4646
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
4747
- mongo db - fix db statement capturing
4848
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
49+
- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
50+
([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656))
4951

5052
## Version 1.15.0/0.36b0 (2022-12-10)
5153

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
173173
def committed(self, partitions, timeout=-1):
174174
return self._consumer.committed(partitions, timeout)
175175

176+
def commit(self, *args, **kwargs):
177+
return self._consumer.commit(*args, **kwargs)
178+
176179
def consume(
177180
self, num_messages=1, *args, **kwargs
178181
): # pylint: disable=keyword-arg-before-vararg

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,18 @@ def test_instrument_api(self) -> None:
5858

5959
consumer = instrumentation.uninstrument_consumer(consumer)
6060
self.assertEqual(consumer.__class__, Consumer)
61+
62+
def test_consumer_commit_method_exists(self) -> None:
63+
instrumentation = ConfluentKafkaInstrumentor()
64+
65+
consumer = Consumer(
66+
{
67+
"bootstrap.servers": "localhost:29092",
68+
"group.id": "mygroup",
69+
"auto.offset.reset": "earliest",
70+
}
71+
)
72+
73+
consumer = instrumentation.instrument_consumer(consumer)
74+
self.assertEqual(consumer.__class__, ProxiedConsumer)
75+
self.assertTrue(hasattr(consumer, "commit"))

0 commit comments

Comments
 (0)