Skip to content

Commit 1c3566d

Browse files
fix(kafka): allow producer/consumer initialized with unpacked config [APMS-15618] [backport 2.21] (#13316)
Backport 02da078 from #13303 to 2.21. Fixes an issue where Kafka connection initialization failed when passing in an unpacked config into Producer or Consumer. confluent-kafka supports both unpacked config and dict config. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Quinna Halim <[email protected]>
1 parent 6743428 commit 1c3566d

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

ddtrace/contrib/internal/kafka/patch.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ def get_version():
6262

6363

6464
class TracedProducerMixin:
65-
def __init__(self, config, *args, **kwargs):
65+
def __init__(self, config=None, *args, **kwargs):
66+
if not config:
67+
config = kwargs
6668
super(TracedProducerMixin, self).__init__(config, *args, **kwargs)
6769
self._dd_bootstrap_servers = (
6870
config.get("bootstrap.servers")
@@ -79,7 +81,9 @@ def __bool__(self):
7981

8082

8183
class TracedConsumerMixin:
82-
def __init__(self, config, *args, **kwargs):
84+
def __init__(self, config=None, *args, **kwargs):
85+
if not config:
86+
config = kwargs
8387
super(TracedConsumerMixin, self).__init__(config, *args, **kwargs)
8488
self._group_id = config.get("group.id", "")
8589
self._auto_commit = asbool(config.get("enable.auto.commit", True))
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
fixes:
3+
- |
4+
kafka: Fixes an issue where a producer or consumer initialized with an unpacked config resulted in TypeError, causing a failed connection.
5+
confluent-kafka supports both unpacked and packed config; this change allows initialization with either.

tests/contrib/kafka/test_kafka.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import pytest
1313

1414
from ddtrace.contrib.internal.kafka.patch import TracedConsumer
15+
from ddtrace.contrib.internal.kafka.patch import TracedProducer
1516
from ddtrace.contrib.internal.kafka.patch import patch
1617
from ddtrace.contrib.internal.kafka.patch import unpatch
1718
import ddtrace.internal.datastreams # noqa: F401 - used as part of mock patching
@@ -211,6 +212,19 @@ def test_consumer_created_with_logger_does_not_raise(tracer):
211212
consumer.close()
212213

213214

215+
def test_consumer_initialized_with_unpacked_config(tracer):
216+
"""Test that adding a logger to a Consumer init does not raise any errors."""
217+
consumer = confluent_kafka.Consumer(
218+
**{
219+
"bootstrap.servers": BOOTSTRAP_SERVERS,
220+
"group.id": GROUP_ID,
221+
"auto.offset.reset": "earliest",
222+
},
223+
)
224+
assert isinstance(consumer, TracedConsumer)
225+
consumer.close()
226+
227+
214228
def test_empty_list_from_consume_does_not_raise():
215229
# https://github.com/DataDog/dd-trace-py/issues/8846
216230
patch()
@@ -247,6 +261,23 @@ def test_producer_bootstrap_servers(config, expect_servers, tracer):
247261
assert producer._dd_bootstrap_servers is None
248262

249263

264+
@pytest.mark.parametrize(
265+
"config,expect_servers",
266+
[
267+
({"bootstrap.servers": BOOTSTRAP_SERVERS}, BOOTSTRAP_SERVERS),
268+
({"metadata.broker.list": BOOTSTRAP_SERVERS}, BOOTSTRAP_SERVERS),
269+
({}, None),
270+
],
271+
)
272+
def test_producer_initialized_unpacked_config(config, expect_servers, tracer):
273+
producer = confluent_kafka.Producer(**config)
274+
assert isinstance(producer, TracedProducer)
275+
if expect_servers is not None:
276+
assert producer._dd_bootstrap_servers == expect_servers
277+
else:
278+
assert producer._dd_bootstrap_servers is None
279+
280+
250281
def test_produce_single_server(dummy_tracer, producer, kafka_topic):
251282
Pin.override(producer, tracer=dummy_tracer)
252283
producer.produce(kafka_topic, PAYLOAD, key=KEY)

0 commit comments

Comments
 (0)