Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit e577df9

Browse files
authored
Configurable queue capacity for azure exporters (#949)
1 parent e1d3c3b commit e577df9

File tree

9 files changed

+65
-6
lines changed

9 files changed

+65
-6
lines changed

contrib/opencensus-ext-azure/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
([#936](https://github.com/census-instrumentation/opencensus-python/pull/936))
1111
- Fix attach rate metrics for VM to only ping data service on retry
1212
([#946](https://github.com/census-instrumentation/opencensus-python/pull/946))
13+
- Added queue capacity configuration for exporters
14+
([#949](https://github.com/census-instrumentation/opencensus-python/pull/949))
1315

1416
## 1.0.4
1517
Released 2020-06-29

contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def process_options(options):
5656
TEMPDIR_PREFIX + TEMPDIR_SUFFIX
5757
)
5858

59+
# proxies
5960
if options.proxies is None:
6061
options.proxies = '{}'
6162

@@ -109,6 +110,7 @@ def __init__(self, *args, **kwargs):
109110
max_batch_size=100,
110111
minimum_retry_interval=60, # minimum retry interval in seconds
111112
proxies=None, # string maps url schemes to the url of the proxies
113+
queue_capacity=8192,
112114
storage_maintenance_period=60,
113115
storage_max_size=50*1024*1024, # 50MiB
114116
storage_path=None,

contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self, **options):
2828
self.max_batch_size = options.max_batch_size
2929
# TODO: queue should be moved to tracer
3030
# too much refactor work, leave to the next PR
31-
self._queue = Queue(capacity=8192) # TODO: make this configurable
31+
self._queue = Queue(capacity=options.queue_capacity)
3232
# TODO: worker should not be created in the base exporter
3333
self._worker = Worker(self._queue, self)
3434
self._worker.start()

contrib/opencensus-ext-azure/opencensus/ext/azure/common/storage.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ def gets(self):
133133
if path.endswith('.tmp'):
134134
if name < timeout_deadline:
135135
try:
136-
os.remove(path) # TODO: log data loss
136+
os.remove(path)
137+
logger.warning(
138+
'File write exceeded timeout. Dropping telemetry')
137139
except Exception:
138140
pass # keep silent
139141
if path.endswith('.lock'):
@@ -148,7 +150,10 @@ def gets(self):
148150
if path.endswith('.blob'):
149151
if name < retention_deadline:
150152
try:
151-
os.remove(path) # TODO: log data loss
153+
os.remove(path)
154+
logger.warning(
155+
'File write exceeded retention.' +
156+
'Dropping telemetry')
152157
except Exception:
153158
pass # keep silent
154159
else:

contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ def _transmit(self, envelopes):
7979
logger.info('Transmission succeeded: %s.', text)
8080
return 0
8181
if response.status_code == 206: # Partial Content
82-
# TODO: store the unsent data
8382
if data:
8483
try:
8584
resend_envelopes = []

contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(self, **options):
5757
)
5858
self._telemetry_processors = []
5959
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))
60-
self._queue = Queue(capacity=8192) # TODO: make this configurable
60+
self._queue = Queue(capacity=self.options.queue_capacity)
6161
self._worker = Worker(self._queue, self)
6262
self._worker.start()
6363
heartbeat_metrics.enable_heartbeat_metrics(

contrib/opencensus-ext-azure/tests/test_azure_log_exporter.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,22 @@ def test_init_handler_with_proxies(self):
9898
'{"https":"https://test-proxy.com"}',
9999
)
100100

101+
def test_init_handler_with_queue_capacity(self):
102+
handler = log_exporter.AzureLogHandler(
103+
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
104+
queue_capacity=500,
105+
)
106+
107+
self.assertEqual(
108+
handler.options.queue_capacity,
109+
500
110+
)
111+
112+
self.assertEqual(
113+
handler._worker._src._queue.maxsize,
114+
500
115+
)
116+
101117
@mock.patch('requests.post', return_value=mock.Mock())
102118
def test_exception(self, requests_mock):
103119
logger = logging.getLogger(self.id())
@@ -289,6 +305,22 @@ def test_init_handler_with_proxies(self):
289305
'{"https":"https://test-proxy.com"}',
290306
)
291307

308+
def test_init_handler_with_queue_capacity(self):
309+
handler = log_exporter.AzureEventHandler(
310+
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
311+
queue_capacity=500,
312+
)
313+
314+
self.assertEqual(
315+
handler.options.queue_capacity,
316+
500
317+
)
318+
# pylint: disable=protected-access
319+
self.assertEqual(
320+
handler._worker._src._queue.maxsize,
321+
500
322+
)
323+
292324
@mock.patch('requests.post', return_value=mock.Mock())
293325
def test_exception(self, requests_mock):
294326
logger = logging.getLogger(self.id())

contrib/opencensus-ext-azure/tests/test_azure_trace_exporter.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,22 @@ def test_init_exporter_with_proxies(self):
5858
'{"https":"https://test-proxy.com"}',
5959
)
6060

61+
def test_init_exporter_with_queue_capacity(self):
62+
exporter = trace_exporter.AzureExporter(
63+
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
64+
queue_capacity=500,
65+
)
66+
67+
self.assertEqual(
68+
exporter.options.queue_capacity,
69+
500
70+
)
71+
# pylint: disable=protected-access
72+
self.assertEqual(
73+
exporter._worker.src._queue.maxsize,
74+
500
75+
)
76+
6177
@mock.patch('requests.post', return_value=mock.Mock())
6278
def test_emit_empty(self, request_mock):
6379
exporter = trace_exporter.AzureExporter(

opencensus/common/schedule/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414

1515
from six.moves import queue
1616

17+
import logging
1718
import threading
1819
import time
1920

21+
logger = logging.getLogger(__name__)
22+
2023

2124
class PeriodicTask(threading.Thread):
2225
"""Thread that periodically calls a given function.
@@ -128,7 +131,7 @@ def put(self, item, block=True, timeout=None):
128131
try:
129132
self._queue.put(item, block, timeout)
130133
except queue.Full:
131-
pass # TODO: log data loss
134+
logger.warning('Queue is full. Dropping telemetry.')
132135

133136
def puts(self, items, block=True, timeout=None):
134137
if block and timeout is not None:

0 commit comments

Comments
 (0)