diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index 24911c395..b81960907 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -250,6 +250,7 @@ def _flush(self, buffer, forced_flush=False) -> None: """ if not self.state.should_try(): logger.error("dropping flushed data due to transport failure back-off") + buffer.close() else: fileobj = buffer.fileobj # get a reference to the fileobj before closing the gzip file buffer.close() @@ -261,6 +262,8 @@ def _flush(self, buffer, forced_flush=False) -> None: except Exception as e: self.handle_transport_fail(e) + data.release() + def start_thread(self, pid=None) -> None: super(Transport, self).start_thread(pid=pid) if (not self._thread or self.pid != self._thread.pid) and not self._closed: diff --git a/elasticapm/utils/__init__.py b/elasticapm/utils/__init__.py index 0f7b52c0d..4403f5abd 100644 --- a/elasticapm/utils/__init__.py +++ b/elasticapm/utils/__init__.py @@ -78,6 +78,8 @@ def get_name_from_func(func: FunctionType) -> str: return "partial({})".format(get_name_from_func(func.func)) elif hasattr(func, "_partialmethod") and hasattr(func._partialmethod, "func"): return "partial({})".format(get_name_from_func(func._partialmethod.func)) + elif hasattr(func, "__partialmethod__") and hasattr(func.__partialmethod__, "func"): + return "partial({})".format(get_name_from_func(func.__partialmethod__.func)) module = func.__module__ diff --git a/tests/transports/test_base.py b/tests/transports/test_base.py index 1aa1a8941..2f77c3e95 100644 --- a/tests/transports/test_base.py +++ b/tests/transports/test_base.py @@ -107,18 +107,25 @@ def test_empty_queue_flush(mock_send, elasticapm_client): transport.close() -@mock.patch("elasticapm.transport.base.Transport.send") +@mock.patch("elasticapm.transport.base.Transport._flush") @pytest.mark.parametrize("elasticapm_client", [{"api_request_time": "5s"}], indirect=True) -def test_metadata_prepended(mock_send, elasticapm_client): +def test_metadata_prepended(mock_flush, elasticapm_client): transport = Transport(client=elasticapm_client, compress_level=0) transport.start_thread() transport.queue("error", {}, flush=True) transport.close() - assert mock_send.call_count == 1 - args, kwargs = mock_send.call_args - data = gzip.decompress(args[0]) + assert mock_flush.call_count == 1 + args, kwargs = mock_flush.call_args + buffer = args[0] + # this test used to mock send but after we fixed a leak for not releasing the memoryview containing + # the gzipped data we cannot read it anymore. So reimplement _flush and read the data ourselves + fileobj = buffer.fileobj + buffer.close() + compressed_data = fileobj.getbuffer() + data = gzip.decompress(compressed_data) data = data.decode("utf-8").split("\n") assert "metadata" in data[0] + compressed_data.release() @mock.patch("elasticapm.transport.base.Transport.send")