Skip to content

Commit a32cc29

Browse files
umaannamalailrafeei
authored andcommitted
Remove kafka metric instrumentation. (#626)
1 parent 6d71a66 commit a32cc29

File tree

3 files changed

+2
-192
lines changed

3 files changed

+2
-192
lines changed

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,14 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import logging
15-
import math
1614
import sys
17-
import threading
1815

19-
from kafka.metrics.metrics_reporter import AbstractMetricsReporter
20-
21-
import newrelic.core.agent
2216
from newrelic.api.application import application_instance
2317
from newrelic.api.message_trace import MessageTrace
2418
from newrelic.api.message_transaction import MessageTransaction
2519
from newrelic.api.time_trace import notice_error
2620
from newrelic.api.transaction import current_transaction
2721
from newrelic.common.object_wrapper import wrap_function_wrapper
28-
from newrelic.packages import six
29-
from newrelic.samplers.decorators import data_source_factory
30-
31-
_logger = logging.getLogger(__name__)
3222

3323
HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
3424
HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
@@ -187,141 +177,11 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
187177
return record
188178

189179

190-
class KafkaMetricsDataSource(object):
191-
_instance = None
192-
193-
def __init__(self):
194-
self.reporters = []
195-
196-
@classmethod
197-
@data_source_factory(name="Kafka Metrics Reporter")
198-
def factory(cls, settings=None, environ=None):
199-
return cls.singleton()
200-
201-
@classmethod
202-
def singleton(cls, register=True):
203-
# If already initialized, exit early
204-
if cls._instance:
205-
return cls._instance
206-
207-
# Init and register instance on class
208-
instance = cls()
209-
cls._instance = instance
210-
211-
# register_data_source takes a callable so let it rerun singleton to retrieve the instance
212-
if register:
213-
try:
214-
_logger.debug("Registering kafka metrics data source.")
215-
newrelic.core.agent.agent_instance().register_data_source(cls.factory)
216-
except Exception:
217-
_logger.exception(
218-
"Attempt to register kafka metrics data source has failed. Data source will be skipped."
219-
)
220-
221-
return instance
222-
223-
def register(self, reporter):
224-
self.reporters.append(reporter)
225-
226-
def unregister(self, reporter):
227-
if reporter in self.reporters:
228-
self.reporters.remove(reporter)
229-
230-
def start(self):
231-
return
232-
233-
def stop(self):
234-
# Clear references to reporters to prevent them from participating in a reference cycle.
235-
self.reporters = []
236-
237-
def __call__(self):
238-
for reporter in self.reporters:
239-
for name, metric in six.iteritems(reporter.snapshot()):
240-
yield name, metric
241-
242-
243-
class NewRelicMetricsReporter(AbstractMetricsReporter):
244-
def __init__(self, *args, **kwargs):
245-
super(NewRelicMetricsReporter, self).__init__(*args, **kwargs)
246-
247-
# Register with data source for harvesting
248-
self.data_source = KafkaMetricsDataSource.singleton()
249-
self.data_source.register(self)
250-
251-
self._metrics = {}
252-
self._lock = threading.Lock()
253-
254-
def close(self, *args, **kwargs):
255-
self.data_source.unregister(self)
256-
with self._lock:
257-
self._metrics = {}
258-
259-
def init(self, metrics):
260-
for metric in metrics:
261-
self.metric_change(metric)
262-
263-
@staticmethod
264-
def invalid_metric_value(metric):
265-
name, value = metric
266-
return not any((math.isinf(value), math.isnan(value), value == 0))
267-
268-
def snapshot(self):
269-
with self._lock:
270-
# metric.value can only be called once, so care must be taken when filtering
271-
metrics = ((name, metric.value()) for name, metric in six.iteritems(self._metrics))
272-
return {
273-
"MessageBroker/Kafka/Internal/%s" % name: {"count": value}
274-
for name, value in filter(self.invalid_metric_value, metrics)
275-
}
276-
277-
def get_metric_name(self, metric):
278-
metric_name = metric.metric_name # Get MetricName object to work with
279-
280-
name = metric_name.name
281-
group = metric_name.group
282-
283-
if "topic" in metric_name.tags:
284-
topic = metric_name.tags["topic"]
285-
return "/".join((group, topic, name))
286-
else:
287-
return "/".join((group, name))
288-
289-
def metric_change(self, metric):
290-
name = self.get_metric_name(metric)
291-
with self._lock:
292-
self._metrics[name] = metric
293-
294-
def metric_removal(self, metric):
295-
name = self.get_metric_name(metric)
296-
with self._lock:
297-
if name in self._metrics:
298-
self._metrics.pop(name)
299-
300-
def configure(self, configs):
301-
return
302-
303-
304-
def wrap_KafkaProducerConsumer_init(wrapped, instance, args, kwargs):
305-
try:
306-
if "metric_reporters" in kwargs:
307-
metric_reporters = list(kwargs.get("metric_reporters", []))
308-
metric_reporters.append(NewRelicMetricsReporter)
309-
kwargs["metric_reporters"] = [metric_reporters]
310-
else:
311-
kwargs["metric_reporters"] = [NewRelicMetricsReporter]
312-
except Exception:
313-
pass
314-
315-
return wrapped(*args, **kwargs)
316-
317-
318180
def instrument_kafka_producer(module):
319181
if hasattr(module, "KafkaProducer"):
320-
wrap_function_wrapper(module, "KafkaProducer.__init__", wrap_KafkaProducerConsumer_init)
321182
wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send)
322183

323184

324185
def instrument_kafka_consumer_group(module):
325186
if hasattr(module, "KafkaConsumer"):
326-
wrap_function_wrapper(module, "KafkaConsumer.__init__", wrap_KafkaProducerConsumer_init)
327187
wrap_function_wrapper(module.KafkaConsumer, "__next__", wrap_kafkaconsumer_next)

tests/messagebroker_kafkapython/conftest.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
from newrelic.api.transaction import current_transaction
2828
from newrelic.common.object_wrapper import transient_function_wrapper
29-
from newrelic.hooks.messagebroker_kafkapython import KafkaMetricsDataSource
3029

3130
DB_SETTINGS = kafka_settings()[0]
3231

@@ -55,7 +54,7 @@
5554

5655

5756
@pytest.fixture(scope="function")
58-
def producer(topic, data_source):
57+
def producer(topic):
5958
producer = kafka.KafkaProducer(
6059
bootstrap_servers=BROKER, api_version=(2, 0, 2), value_serializer=lambda v: json.dumps(v).encode("utf-8")
6160
)
@@ -64,7 +63,7 @@ def producer(topic, data_source):
6463

6564

6665
@pytest.fixture(scope="function")
67-
def consumer(topic, data_source, producer):
66+
def consumer(topic, producer):
6867
consumer = kafka.KafkaConsumer(
6968
topic,
7069
bootstrap_servers=BROKER,
@@ -99,17 +98,6 @@ def topic():
9998
# admin.delete_topics([topic])
10099

101100

102-
@pytest.fixture(scope="session")
103-
def data_source():
104-
"""
105-
Must be required by consumer and producer fixtures, or the first one of them to be
106-
instantiated will create and register the singleton. We rely on the singleton to
107-
not be registered to properly test the output of it without interference from the
108-
harvest thread.
109-
"""
110-
return KafkaMetricsDataSource.singleton(register=False)
111-
112-
113101
@transient_function_wrapper(kafka.producer.kafka, "KafkaProducer.send.__wrapped__")
114102
# Place transient wrapper underneath instrumentation
115103
def cache_kafka_producer_headers(wrapped, instance, args, kwargs):

tests/messagebroker_kafkapython/test_metrics.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)