Skip to content

Commit 79d3810

Browse files
tcwaltherJon Wayne Parrott
authored andcommitted
Add max_latency to BackgroundThreadTransport (#4762)
1 parent 3461b9a commit 79d3810

File tree

2 files changed

+117
-9
lines changed

2 files changed

+117
-9
lines changed

google/cloud/logging/handlers/transports/background_thread.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import atexit
2323
import logging
2424
import threading
25+
import time
2526

2627
from six.moves import range
2728
from six.moves import queue
@@ -30,12 +31,13 @@
3031

3132
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
3233
_DEFAULT_MAX_BATCH_SIZE = 10
34+
_DEFAULT_MAX_LATENCY = 0 # Seconds
3335
_WORKER_THREAD_NAME = 'google.cloud.logging.Worker'
3436
_WORKER_TERMINATOR = object()
3537
_LOGGER = logging.getLogger(__name__)
3638

3739

38-
def _get_many(queue_, max_items=None):
40+
def _get_many(queue_, max_items=None, max_latency=0):
3941
"""Get multiple items from a Queue.
4042
4143
Gets at least one (blocking) and at most ``max_items`` items
@@ -48,14 +50,22 @@ def _get_many(queue_, max_items=None):
4850
:param max_items: The maximum number of items to get. If ``None``, then all
4951
available items in the queue are returned.
5052
53+
:type max_latency: float
54+
:param max_latency: The maximum number of seconds to wait for more than one
55+
item from a queue. This number includes the time required to retrieve
56+
the first item.
57+
5158
:rtype: Sequence
5259
:returns: A sequence of items retrieved from the queue.
5360
"""
61+
start = time.time()
5462
# Always return at least one item.
5563
items = [queue_.get()]
5664
while max_items is None or len(items) < max_items:
5765
try:
58-
items.append(queue_.get_nowait())
66+
elapsed = time.time() - start
67+
timeout = max(0, max_latency - elapsed)
68+
items.append(queue_.get(timeout=timeout))
5969
except queue.Empty:
6070
break
6171
return items
@@ -74,13 +84,22 @@ class _Worker(object):
7484
:type max_batch_size: int
7585
:param max_batch_size: The maximum number of items to send at a time
7686
in the background thread.
87+
88+
:type max_latency: float
89+
:param max_latency: The amount of time to wait for new logs before
90+
sending a new batch. It is strongly recommended to keep this smaller
91+
than the grace_period. This means this is effectively the longest
92+
amount of time the background thread will hold onto log entries
93+
before sending them to the server.
7794
"""
7895

7996
def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD,
80-
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
97+
max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
98+
max_latency=_DEFAULT_MAX_LATENCY):
8199
self._cloud_logger = cloud_logger
82100
self._grace_period = grace_period
83101
self._max_batch_size = max_batch_size
102+
self._max_latency = max_latency
84103
self._queue = queue.Queue(0)
85104
self._operational_lock = threading.Lock()
86105
self._thread = None
@@ -112,7 +131,9 @@ def _thread_main(self):
112131
quit_ = False
113132
while True:
114133
batch = self._cloud_logger.batch()
115-
items = _get_many(self._queue, max_items=self._max_batch_size)
134+
items = _get_many(
135+
self._queue, max_items=self._max_batch_size,
136+
max_latency=self._max_latency)
116137

117138
for item in items:
118139
if item is _WORKER_TERMINATOR:
@@ -249,15 +270,24 @@ class BackgroundThreadTransport(Transport):
249270
:type batch_size: int
250271
:param batch_size: The maximum number of items to send at a time in the
251272
background thread.
273+
274+
:type max_latency: float
275+
:param max_latency: The amount of time to wait for new logs before
276+
sending a new batch. It is strongly recommended to keep this smaller
277+
than the grace_period. This means this is effectively the longest
278+
amount of time the background thread will hold onto log entries
279+
before sending them to the server.
252280
"""
253281

254282
def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
255-
batch_size=_DEFAULT_MAX_BATCH_SIZE):
283+
batch_size=_DEFAULT_MAX_BATCH_SIZE,
284+
max_latency=_DEFAULT_MAX_LATENCY):
256285
self.client = client
257286
logger = self.client.logger(name)
258287
self.worker = _Worker(logger,
259288
grace_period=grace_period,
260-
max_batch_size=batch_size)
289+
max_batch_size=batch_size,
290+
max_latency=max_latency)
261291
self.worker.start()
262292

263293
def send(self, record, message, resource=None, labels=None):

tests/unit/handlers/transports/test_background_thread.py

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,24 @@ def test_flush(self):
7878

7979
def test_worker(self):
8080
client = _Client(self.PROJECT)
81-
name = 'python_logger'
81+
name = 'python_logger'
8282
batch_size = 30
8383
grace_period = 20.
84+
max_latency = 0.1
8485
transport, worker = self._make_one(client,
8586
name,
8687
grace_period=grace_period,
87-
batch_size=batch_size)
88+
batch_size=batch_size,
89+
max_latency=max_latency)
8890
worker_grace_period = worker.call_args[1]['grace_period'] # **kwargs.
8991
worker_batch_size = worker.call_args[1]['max_batch_size']
92+
worker_max_latency = worker.call_args[1]['max_latency']
9093
self.assertEqual(worker_grace_period,
9194
grace_period)
9295
self.assertEqual(worker_batch_size,
9396
batch_size)
97+
self.assertEqual(worker_max_latency,
98+
max_latency)
9499

95100

96101
class Test_Worker(unittest.TestCase):
@@ -115,13 +120,16 @@ def test_constructor(self):
115120
logger = _Logger(self.NAME)
116121
grace_period = 50
117122
max_batch_size = 50
123+
max_latency = 0.1
118124

119125
worker = self._make_one(
120-
logger, grace_period=grace_period, max_batch_size=max_batch_size)
126+
logger, grace_period=grace_period, max_batch_size=max_batch_size,
127+
max_latency=max_latency)
121128

122129
self.assertEqual(worker._cloud_logger, logger)
123130
self.assertEqual(worker._grace_period, grace_period)
124131
self.assertEqual(worker._max_batch_size, max_batch_size)
132+
self.assertEqual(worker._max_latency, max_latency)
125133
self.assertFalse(worker.is_alive)
126134
self.assertIsNone(worker._thread)
127135

@@ -264,6 +272,74 @@ def test__thread_main_batches(self):
264272
self.assertFalse(worker._cloud_logger._batch.commit_called)
265273
self.assertEqual(worker._queue.qsize(), 0)
266274

275+
@mock.patch('time.time', autospec=True, return_value=1)
276+
def test__thread_main_max_latency(self, time):
277+
# Note: this test is a bit brittle as it assumes the operation of
278+
# _get_many invokes queue.get() followed by queue._get(). It fails
279+
# the "change detector" test in that way. However, this is still a
280+
# useful test to verify the queue timeout is appropriately calculated.
281+
from six.moves import queue
282+
from google.cloud.logging.handlers.transports import background_thread
283+
284+
# Use monotonically increasing time.
285+
time.side_effect = range(1, 6)
286+
287+
worker = self._make_one(
288+
_Logger(self.NAME), max_latency=2, max_batch_size=10)
289+
worker._queue = mock.create_autospec(queue.Queue, instance=True)
290+
291+
worker._queue.get.side_effect = [
292+
{'info': {'message': '1'}}, # Single record.
293+
queue.Empty(), # Emulate a queue.get() timeout.
294+
{'info': {'message': '1'}}, # Second record.
295+
background_thread._WORKER_TERMINATOR, # Stop the thread.
296+
queue.Empty(), # Emulate a queue.get() timeout.
297+
]
298+
299+
worker._thread_main()
300+
301+
self.assertEqual(worker._cloud_logger._num_batches, 2)
302+
self.assertTrue(worker._cloud_logger._batch.commit_called)
303+
self.assertEqual(worker._cloud_logger._batch.commit_count, 1)
304+
305+
# Time should have been called five times.
306+
#
307+
# For the first batch, it should have been called:
308+
# * Once to get the start time. (1)
309+
# * Once to get the elapsed time while grabbing the second item.
310+
# (2)
311+
#
312+
# For the second batch, it should have been called:
313+
# * Once to get start time. (3)
314+
# * Once to get the elapsed time while grabbing the second item.
315+
# (3)
316+
# * Once to get the elapsed time while grabbing the final
317+
# item. (4)
318+
# * Once final time to get the elapsed time while receiving
319+
# the empty queue.
320+
#
321+
self.assertEqual(time.call_count, 5)
322+
323+
# Queue.get should've been called 5 times as well, but with different
324+
# timeouts due to the monotonically increasing time.
325+
#
326+
# For the first batch, it will be called once without a timeout
327+
# (for the first item) and then with timeout=1, as start will be
328+
# 1 and now will be 2.
329+
#
330+
# For the second batch, it will be called once without a timeout
331+
# (for the first item) and then with timeout=1, as start will be
332+
# 3 and now will be 4, and finally with timeout=0 as start will be 3
333+
# and now will be 5.
334+
#
335+
worker._queue.get.assert_has_calls([
336+
mock.call(),
337+
mock.call(timeout=1),
338+
mock.call(),
339+
mock.call(timeout=1),
340+
mock.call(timeout=0)
341+
])
342+
267343
def test_flush(self):
268344
worker = self._make_one(_Logger(self.NAME))
269345
worker._queue = mock.Mock(spec=queue.Queue)
@@ -331,9 +407,11 @@ def __init__(self, name):
331407
self.name = name
332408
self._batch_cls = _Batch
333409
self._batch = None
410+
self._num_batches = 0
334411

335412
def batch(self):
336413
self._batch = self._batch_cls()
414+
self._num_batches += 1
337415
return self._batch
338416

339417

0 commit comments

Comments
 (0)