Skip to content

Commit 4a02ee5

Browse files
committed
fix threafing bug with send buffer and threaded workers (#335)
When using threaded workers (e.g. Django runserver), writes to the send buffer need to be guarded by a threading lock. Otherwise we end up with garbled content in the buffer, which then later on leads to parsing errors in the APM Server. fixes #334
1 parent 999e913 commit 4a02ee5

File tree

3 files changed

+14
-13
lines changed

3 files changed

+14
-13
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v4.0.0...v4.0.1)
55

66
* fixed an issue with instrumenting redis-py 3.0+
7-
7+
* fixed a multithreading issue that occurs when using threaded workers (#335)
8+
89
## v4.0.0
910
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v3.0.2...v4.0.0)
1011

elasticapm/transport/base.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ def __init__(
5353
self._max_flush_time = max_flush_time
5454
self._max_buffer_size = max_buffer_size
5555
self._queued_data = None
56-
self._flush_lock = threading.Lock()
56+
self._queue_lock = threading.Lock()
5757
self._last_flush = timeit.default_timer()
5858
self._flush_timer = None
5959

6060
def queue(self, event_type, data, flush=False):
61-
self._queue(self.queued_data, {event_type: data})
61+
with self._queue_lock:
62+
self.queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
6263
since_last_flush = timeit.default_timer() - self._last_flush
6364
queue_size = self.queued_data_size
6465
if flush:
@@ -77,27 +78,26 @@ def queue(self, event_type, data, flush=False):
7778
)
7879
self.flush()
7980
elif not self._flush_timer:
80-
with self._flush_lock:
81+
with self._queue_lock:
8182
self._start_flush_timer()
8283

83-
def _queue(self, queue, data):
84-
queue.write((self._json_serializer(data) + "\n").encode("utf-8"))
85-
8684
@property
8785
def queued_data(self):
8886
if self._queued_data is None:
8987
if self._compress_level:
9088
self._queued_data = gzip.GzipFile(fileobj=BytesIO(), mode="w", compresslevel=self._compress_level)
9189
else:
9290
self._queued_data = BytesIO()
93-
self._queue(self._queued_data, {"metadata": self._metadata})
91+
self._queued_data.write((self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8"))
9492
return self._queued_data
9593

9694
@property
9795
def queued_data_size(self):
98-
f = self.queued_data
99-
# return size of the underlying BytesIO object if it is compressed
100-
return f.fileobj.tell() if hasattr(f, "fileobj") else f.tell()
96+
f = self._queued_data
97+
if f:
98+
# return size of the underlying BytesIO object if it is compressed
99+
return f.fileobj.tell() if hasattr(f, "fileobj") else f.tell()
100+
return 0
101101

102102
def flush(self, sync=False, start_flush_timer=True):
103103
"""
@@ -106,7 +106,7 @@ def flush(self, sync=False, start_flush_timer=True):
106106
:param start_flush_timer: set to True if the flush timer thread should be restarted at the end of the flush
107107
:return: None
108108
"""
109-
with self._flush_lock:
109+
with self._queue_lock:
110110
self._stop_flush_timer()
111111
queued_data, self._queued_data = self._queued_data, None
112112
if queued_data and not self.state.should_try():

elasticapm/transport/http.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def send(self, data):
5959
else:
6060
message = "HTTP %s: " % response.status
6161
print_trace = True
62-
message += body.decode("utf8")
62+
message += body.decode("utf8", errors="replace")
6363
raise TransportException(message, data, print_trace=print_trace)
6464
return response.getheader("Location")
6565
finally:

0 commit comments

Comments
 (0)