Skip to content
14 changes: 8 additions & 6 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ def _process_configuration(section):
_process_setting(section, "azure_operator.enabled", "getboolean", None)
_process_setting(section, "package_reporting.enabled", "getboolean", None)
_process_setting(section, "instrumentation.graphql.capture_introspection_queries", "getboolean", None)
_process_setting(
section, "instrumentation.kombu.ignored_exchanges", "get", newrelic.core.config.parse_space_separated_into_list
)
_process_setting(section, "instrumentation.kombu.consumer.enabled", "getboolean", None)


# Loading of configuration from specified file and for specified
Expand Down Expand Up @@ -2847,12 +2851,10 @@ def _process_module_builtin_defaults():
_process_module_definition(
"kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat"
)
# Kombu instrumentation is causing crashes so until we figure out the root cause
# comment it out.
# _process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging")
# _process_module_definition(
# "kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion"
# )
_process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging")
_process_module_definition(
"kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion"
)
_process_module_definition("logging", "newrelic.hooks.logger_logging", "instrument_logging")

_process_module_definition("loguru", "newrelic.hooks.logger_loguru", "instrument_loguru")
Expand Down
53 changes: 52 additions & 1 deletion newrelic/hooks/messagebroker_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,64 @@
}


def bind_publish(
body,
routing_key=None,
delivery_mode=None,
mandatory=False,
immediate=False,
priority=0,
content_type=None,
content_encoding=None,
serializer=None,
headers=None,
compression=None,
exchange=None,
retry=False,
retry_policy=None,
declare=None,
expiration=None,
timeout=None,
confirm_timeout=None,
**properties,
):
return {
"body": body,
"routing_key": routing_key,
"delivery_mode": delivery_mode,
"mandatory": mandatory,
"immediate": immediate,
"priority": priority,
"content_type": content_type,
"content_encoding": content_encoding,
"serializer": serializer,
"headers": headers,
"compression": compression,
"exchange": exchange,
"retry": retry,
"retry_policy": retry_policy,
"declare": declare,
"expiration": expiration,
"timeout": timeout,
"confirm_timeout": confirm_timeout,
"properties": properties,
}


def wrap_Producer_publish(wrapped, instance, args, kwargs):
transaction = current_transaction()

if transaction is None:
return wrapped(*args, **kwargs)

bound_args = bind_args(wrapped, args, kwargs)
try:
bound_args = bind_publish(*args, **kwargs)
except Exception:
_logger.debug(
"Unable to bind arguments for kombu.messaging.Producer.publish. Report this issue to New Relic support.", record_exception=True
)
return wrapped(*args, **kwargs)

headers = bound_args["headers"]
headers = headers if headers else {}
value = bound_args["body"]
Expand Down
12 changes: 12 additions & 0 deletions tests/messagebroker_kombu/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,15 @@ def test():
producer.publish({"foo": object()}, exchange=exchange, routing_key="bar", declare=[queue])

test()


def test_producer_tries_to_parse_args(exchange, producer, queue, monkeypatch):
@validate_transaction_errors([callable_name(EncodeError)])
@background_task()
def test():
with pytest.raises(TypeError):
producer.publish(
{"foo": object()}, body={"foo": object()}, exchange=exchange, routing_key="bar", declare=[queue]
)

test()
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ envlist =
python-template_jinja2-py37-jinja2030103,
python-template_mako-{py37,py38,py39,py310,py311,py312,py313},
rabbitmq-messagebroker_pika-{py37,py38,py39,py310,py311,py312,py313,pypy310}-pikalatest,
;; Comment out Kombu until we can root cause the crash.
; rabbitmq-messagebroker_kombu-{py38,py39,py310,py311,py312,py313,pypy310}-kombulatest,
rabbitmq-messagebroker_kombu-{py38,py39,py310,py311,py312,py313,pypy310}-{kombulatest,kombu050204},
redis-datastore_redis-{py37,py311,pypy310}-redis04,
redis-datastore_redis-{py37,py38,py39,py310,py311,py312,py313,pypy310}-redislatest,
rediscluster-datastore_rediscluster-{py37,py312,py313,pypy310}-redislatest,
Expand Down Expand Up @@ -418,6 +417,7 @@ deps =
messagebroker_confluentkafka-confluentkafka0106: confluent-kafka<1.7
messagebroker_kafkapython-kafkapythonnglatest: kafka-python-ng
messagebroker_kombu-kombulatest: kombu
messagebroker_kombu-kombu050204: kombu<5.3.0
# TODO: Pinned to 2.0 for now, fix tests later
messagebroker_kafkapython-kafkapythonlatest: kafka-python<2.1
template_genshi-genshilatest: genshi
Expand Down
Loading