Skip to content

Commit 59633ac

Browse files
authored
Fix Arbitrary Properties of kombu Producer.publish (#1544)
* Fix arbitrary properties of kombu Producer.publish * Add regression test
1 parent 29571ee commit 59633ac

File tree

5 files changed

+31
-8
lines changed

5 files changed

+31
-8
lines changed

newrelic/hooks/messagebroker_kombu.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def bind_publish(
8282
"expiration": expiration,
8383
"timeout": timeout,
8484
"confirm_timeout": confirm_timeout,
85-
"properties": properties,
85+
**properties,
8686
}
8787

8888

tests/datastore_rediscluster/test_uninstrumented_rediscluster_methods.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
"result_callbacks",
139139
"set_default_node",
140140
"user_on_connect_func",
141+
"startup_nodes",
141142
}
142143

143144
REDIS_MODULES = {"bf", "cf", "cms", "ft", "graph", "json", "tdigest", "topk", "ts", "vset"}

tests/messagebroker_kombu/conftest.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import json
16-
import pickle
1715
import uuid
1816

1917
import kombu
@@ -109,7 +107,7 @@ def consumer_validate_dt(producer, consumer_connection, queue, consume_validate_
109107
def consume(events):
110108
def _consume(body, message):
111109
message.ack()
112-
events.append({"body": body, "routing_key": message.delivery_info["routing_key"]})
110+
events.append({"body": body, "routing_key": message.delivery_info["routing_key"], "message": message})
113111

114112
return _consume
115113

@@ -118,7 +116,7 @@ def _consume(body, message):
118116
def consume_error(events):
119117
def _consume(body, message):
120118
message.ack()
121-
events.append({"body": body, "routing_key": message.delivery_info["routing_key"]})
119+
events.append({"body": body, "routing_key": message.delivery_info["routing_key"], "message": message})
122120
raise RuntimeError("Error in consumer callback")
123121

124122
return _consume
@@ -135,7 +133,7 @@ def _consume(body, message):
135133
txn._test_request_headers = message.headers
136134

137135
message.ack()
138-
events.append({"body": body, "routing_key": message.delivery_info["routing_key"]})
136+
events.append({"body": body, "routing_key": message.delivery_info["routing_key"], "message": message})
139137

140138
return _consume
141139

tests/messagebroker_kombu/test_producer.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from newrelic.api.background_task import background_task
2323
from newrelic.api.function_trace import FunctionTrace
2424
from newrelic.common.object_names import callable_name
25+
from newrelic.common.object_wrapper import transient_function_wrapper
2526

2627

2728
def test_trace_metrics(send_producer_message, exchange):
@@ -105,3 +106,24 @@ def test():
105106
)
106107

107108
test()
109+
110+
111+
def test_custom_properties(exchange, queue, events, get_consumer_record, producer):
112+
validate_custom_properties_called = []
113+
114+
@transient_function_wrapper("kombu.messaging", "Producer._publish")
115+
def validate_custom_properties(wrapped, instance, args, kwargs):
116+
from newrelic.common.signature import bind_args
117+
118+
bound_args = bind_args(wrapped, args, kwargs)
119+
properties = bound_args["properties"]
120+
assert properties.get("custom_property", "") == "baz", "custom_property was deleted by instrumentation."
121+
validate_custom_properties_called.append(True)
122+
123+
@background_task()
124+
@validate_custom_properties
125+
def test():
126+
producer.publish({"foo": 123}, exchange=exchange, routing_key="bar", declare=[queue], custom_property="baz")
127+
128+
test()
129+
assert validate_custom_properties_called, "validate_custom_properties was not called."

tox.ini

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ envlist =
6868
{windows,windows_arm64}-cross_agent-{py313,py314}-{with,without}_extensions,
6969

7070
# Integration Tests (only run on Linux)
71-
cassandra-datastore_cassandradriver-{py38,py39,py310,py311,py312,pypy311}-cassandralatest,
71+
cassandra-datastore_cassandradriver-py38-cassandra032903,
72+
cassandra-datastore_cassandradriver-{py39,py310,py311,py312,pypy311}-cassandralatest,
7273
elasticsearchserver07-datastore_elasticsearch-{py38,py39,py310,py311,py312,py313,py314,pypy311}-elasticsearch07,
7374
elasticsearchserver08-datastore_elasticsearch-{py38,py39,py310,py311,py312,py313,py314,pypy311}-elasticsearch08,
7475
firestore-datastore_firestore-{py38,py39,py310,py311,py312,py313,py314},
@@ -283,7 +284,8 @@ deps =
283284
datastore_aiomysql: sqlalchemy<2
284285
datastore_bmemcached: python-binary-memcached
285286
datastore_cassandradriver-cassandralatest: cassandra-driver
286-
datastore_cassandradriver-cassandralatest: twisted
287+
datastore_cassandradriver-cassandra032903: cassandra-driver<3.29.3
288+
datastore_cassandradriver: twisted
287289
datastore_elasticsearch: requests
288290
datastore_elasticsearch: httpx
289291
datastore_elasticsearch-elasticsearch07: elasticsearch[async]<8.0

0 commit comments

Comments
 (0)