Skip to content

Commit 5aa8237

Browse files
authored
Use regular queue, use flush thread per process, delay worker init (#11)
* Use regular queue, use flush thread per process * Small changes, fix tests * Changes, tests
1 parent b06b096 commit 5aa8237

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

logtail/handler.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,33 @@ def __init__(self,
3232
self.source_token = source_token
3333
self.host = host
3434
self.context = context
35-
self.pipe = multiprocessing.JoinableQueue(maxsize=buffer_capacity)
35+
self.pipe = queue.Queue(maxsize=buffer_capacity)
3636
self.uploader = Uploader(self.source_token, self.host)
3737
self.drop_extra_events = drop_extra_events
3838
self.include_extra_attributes = include_extra_attributes
3939
self.buffer_capacity = buffer_capacity
4040
self.flush_interval = flush_interval
4141
self.raise_exceptions = raise_exceptions
4242
self.dropcount = 0
43+
# Do not initialize the flush thread yet because it causes issues on Render.
44+
self.flush_thread = None
45+
46+
def ensure_flush_thread_alive(self):
47+
if self.flush_thread and self.flush_thread.is_alive():
48+
return
49+
4350
self.flush_thread = FlushWorker(
4451
self.uploader,
4552
self.pipe,
4653
self.buffer_capacity,
4754
self.flush_interval
4855
)
49-
if self._is_main_process():
50-
self.flush_thread.start()
51-
52-
def _is_main_process(self):
53-
return multiprocessing.current_process()._parent_pid == None
56+
self.flush_thread.start()
5457

5558
def emit(self, record):
5659
try:
57-
if self._is_main_process() and not self.flush_thread.is_alive():
58-
self.flush_thread.start()
60+
self.ensure_flush_thread_alive()
61+
5962
message = self.format(record)
6063
frame = create_frame(record, message, self.context, include_extra_attributes=self.include_extra_attributes)
6164
try:

tests/test_handler.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,43 @@ def test_handler_creates_pipe_from_args(self, MockWorker):
2828
buffer_capacity=buffer_capacity,
2929
flush_interval=flush_interval
3030
)
31-
self.assertEqual(handler.pipe._maxsize, buffer_capacity)
31+
self.assertTrue(handler.pipe.empty())
3232

3333
@mock.patch('logtail.handler.FlushWorker')
34-
def test_handler_creates_and_starts_worker_from_args(self, MockWorker):
34+
def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockWorker):
3535
buffer_capacity = 9
3636
flush_interval = 9
3737
handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval)
38+
39+
self.assertFalse(MockWorker.called)
40+
41+
logger = logging.getLogger(__name__)
42+
logger.handlers = []
43+
logger.addHandler(handler)
44+
logger.critical('hello')
45+
3846
MockWorker.assert_called_with(
3947
handler.uploader,
4048
handler.pipe,
4149
buffer_capacity,
4250
flush_interval
4351
)
44-
self.assertTrue(handler.flush_thread.start.called)
52+
self.assertEqual(handler.flush_thread.start.call_count, 1)
4553

4654
@mock.patch('logtail.handler.FlushWorker')
4755
def test_emit_starts_thread_if_not_alive(self, MockWorker):
4856
handler = LogtailHandler(source_token=self.source_token)
49-
self.assertTrue(handler.flush_thread.start.call_count, 1)
50-
handler.flush_thread.is_alive = mock.Mock(return_value=False)
5157

5258
logger = logging.getLogger(__name__)
5359
logger.handlers = []
5460
logger.addHandler(handler)
5561
logger.critical('hello')
5662

63+
self.assertEqual(handler.flush_thread.start.call_count, 1)
64+
handler.flush_thread.is_alive = mock.Mock(return_value=False)
65+
66+
logger.critical('hello')
67+
5768
self.assertEqual(handler.flush_thread.start.call_count, 2)
5869

5970
@mock.patch('logtail.handler.FlushWorker')

0 commit comments

Comments
 (0)