Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit c59a087

Browse files
authored
Make wait_period configurable in async transport (#618)
Make wait_period configurable for AsyncTransport. Clean up the inline docs. Switch Azure exporter to AsyncTransport.
1 parent 3b60af7 commit c59a087

File tree

4 files changed

+45
-45
lines changed

4 files changed

+45
-45
lines changed

contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import json
1616
import requests
1717

18-
from opencensus.common.transports import sync
18+
from opencensus.common.transports.async_ import AsyncTransport
1919
from opencensus.ext.azure.common import Options
2020
from opencensus.ext.azure.common import utils
2121
from opencensus.ext.azure.common.protocol import Data
@@ -26,24 +26,19 @@
2626
from opencensus.trace import execution_context
2727
from opencensus.trace.span import SpanKind
2828

29+
__all__ = ['AzureExporter']
30+
2931

3032
class AzureExporter(base_exporter.Exporter):
3133
"""An exporter that sends traces to Microsoft Azure Monitor.
3234
3335
:type options: dict
3436
:param options: Options for the exporter. Defaults to None.
35-
36-
:type transport: :class:`type`
37-
:param transport: Class for creating new transport objects. It should
38-
extend from the base_exporter :class:`.Transport` type
39-
and implement :meth:`.Transport.export`. Defaults to
40-
:class:`.SyncTransport`. The other option is
41-
:class:`.AsyncTransport`.
4237
"""
4338

44-
def __init__(self, options=None, transport=sync.SyncTransport):
39+
def __init__(self, options=None):
4540
self.options = options or Options()
46-
self.transport = transport(self)
41+
self.transport = AsyncTransport(self, max_batch_size=100)
4742

4843
def span_data_to_envelope(self, sd):
4944
# print('[AzMon]', sd)

contrib/opencensus-ext-azure/tests/test_azure_trace_exporter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919

2020
class TestAzureExporter(unittest.TestCase):
2121
def test_export(self):
22-
exporter = trace_exporter.AzureExporter(transport=MockTransport)
22+
exporter = trace_exporter.AzureExporter()
23+
exporter.transport = MockTransport()
2324
exporter.export(None)
2425
self.assertTrue(exporter.transport.export_called)
2526

2627
@mock.patch('requests.post', return_value=mock.Mock())
2728
def test_emit(self, request_mock):
28-
exporter = trace_exporter.AzureExporter(transport=MockTransport)
29+
exporter = trace_exporter.AzureExporter()
30+
exporter.transport = MockTransport()
2931
exporter.emit([])
3032

3133
def test_span_data_to_envelope(self):

opencensus/common/transports/async_.py

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
2525
_DEFAULT_MAX_BATCH_SIZE = 600
26-
_WAIT_PERIOD = 60.0 # Seconds
26+
_DEFAULT_WAIT_PERIOD = 60.0 # Seconds
2727
_WORKER_THREAD_NAME = 'opencensus.common.Worker'
2828
_WORKER_TERMINATOR = object()
2929

@@ -33,10 +33,7 @@ class _Worker(object):
3333
3434
:type exporter: :class:`~opencensus.trace.base_exporter.Exporter` or
3535
:class:`~opencensus.stats.base_exporter.StatsExporter`
36-
:param exporter: Instances of Exporter objects. Defaults to
37-
:class:`.PrintExporter`. The rest options are
38-
:class:`.ZipkinExporter`, :class:`.StackdriverExporter`,
39-
:class:`.LoggingExporter`, :class:`.FileExporter`.
36+
:param exporter: Instance of Exporter object.
4037
4138
:type grace_period: float
4239
:param grace_period: The amount of time to wait for pending data to
@@ -45,12 +42,19 @@ class _Worker(object):
4542
:type max_batch_size: int
4643
:param max_batch_size: The maximum number of items to send at a time
4744
in the background thread.
45+
46+
:type wait_period: int
47+
:param wait_period: The amount of time to wait before sending the next
48+
batch of data.
4849
"""
49-
def __init__(self, exporter, grace_period=_DEFAULT_GRACE_PERIOD,
50-
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
50+
def __init__(self, exporter,
51+
grace_period=_DEFAULT_GRACE_PERIOD,
52+
max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
53+
wait_period=_DEFAULT_WAIT_PERIOD):
5154
self.exporter = exporter
5255
self._grace_period = grace_period
5356
self._max_batch_size = max_batch_size
57+
self._wait_period = wait_period
5458
self._queue = queue.Queue(0)
5559
self._lock = threading.Lock()
5660
self._event = threading.Event()
@@ -116,8 +120,8 @@ def _thread_main(self):
116120

117121
# self._event is set at exit, at which point we start draining the
118122
# queue immediately. If self._event is unset, block for
119-
# _WAIT_PERIOD between each batch of exports.
120-
self._event.wait(_WAIT_PERIOD)
123+
# self.wait_period between each batch of exports.
124+
self._event.wait(self._wait_period)
121125

122126
if quit_:
123127
break
@@ -185,10 +189,7 @@ class AsyncTransport(base.Transport):
185189
186190
:type exporter: :class:`~opencensus.trace.base_exporter.Exporter` or
187191
:class:`~opencensus.stats.base_exporter.StatsExporter`
188-
:param exporter: Instances of Exporter objects. Defaults to
189-
:class:`.PrintExporter`. The rest options are
190-
:class:`.ZipkinExporter`, :class:`.StackdriverExporter`,
191-
:class:`.LoggingExporter`, :class:`.FileExporter`.
192+
:param exporter: Instance of Exporter object.
192193
193194
:type grace_period: float
194195
:param grace_period: The amount of time to wait for pending data to
@@ -197,12 +198,23 @@ class AsyncTransport(base.Transport):
197198
:type max_batch_size: int
198199
:param max_batch_size: The maximum number of items to send at a time
199200
in the background thread.
201+
202+
:type wait_period: int
203+
:param wait_period: The amount of time to wait before sending the next
204+
batch of data.
200205
"""
201206

202-
def __init__(self, exporter, grace_period=_DEFAULT_GRACE_PERIOD,
203-
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
207+
def __init__(self, exporter,
208+
grace_period=_DEFAULT_GRACE_PERIOD,
209+
max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
210+
wait_period=_DEFAULT_WAIT_PERIOD):
204211
self.exporter = exporter
205-
self.worker = _Worker(exporter, grace_period, max_batch_size)
212+
self.worker = _Worker(
213+
exporter,
214+
grace_period,
215+
max_batch_size,
216+
wait_period,
217+
)
206218
self.worker.start()
207219

208220
def export(self, data):

tests/unit/common/transports/test_async.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@
1919
from opencensus.common.transports import async_
2020

2121

22-
# Don't let workers wait between exports in testing
23-
wait_period_patch = mock.patch(
24-
'opencensus.common.transports.async_._WAIT_PERIOD', 0)
25-
26-
2722
class Test_Worker(unittest.TestCase):
2823

2924
def _start_worker(self, worker):
@@ -118,7 +113,7 @@ def test__export_pending_data_did_not_join(self):
118113

119114
def test__thread_main(self):
120115
exporter = mock.Mock()
121-
worker = async_._Worker(exporter)
116+
worker = async_._Worker(exporter, wait_period=0)
122117

123118
trace1 = {
124119
'traceId': 'test1',
@@ -133,15 +128,14 @@ def test__thread_main(self):
133128
worker.enqueue(trace2)
134129
worker._queue.put_nowait(async_._WORKER_TERMINATOR)
135130

136-
with wait_period_patch:
137-
worker._thread_main()
131+
worker._thread_main()
138132

139133
self.assertTrue(worker.exporter.emit.called)
140134
self.assertEqual(worker._queue.qsize(), 0)
141135

142136
def test__thread_main_batches(self):
143137
exporter = mock.Mock()
144-
worker = async_._Worker(exporter, max_batch_size=2)
138+
worker = async_._Worker(exporter, max_batch_size=2, wait_period=0)
145139

146140
# Enqueue three records and the termination signal. This should be
147141
# enough to perform two separate batches and a third loop with just
@@ -169,8 +163,7 @@ def test__thread_main_batches(self):
169163

170164
worker._queue.put_nowait(async_._WORKER_TERMINATOR)
171165

172-
with wait_period_patch:
173-
worker._thread_main()
166+
worker._thread_main()
174167

175168
self.assertEqual(worker._queue.qsize(), 0)
176169

@@ -184,7 +177,7 @@ def emit(self, span):
184177
self.exported.append(span)
185178

186179
exporter = Exporter()
187-
worker = async_._Worker(exporter, max_batch_size=2)
180+
worker = async_._Worker(exporter, max_batch_size=2, wait_period=0)
188181

189182
# Enqueue three records and the termination signal. This should be
190183
# enough to perform two separate batches and a third loop with just
@@ -199,8 +192,7 @@ def emit(self, span):
199192
worker.enqueue(span_data1)
200193
worker.enqueue(span_data2)
201194

202-
with wait_period_patch:
203-
worker._thread_main()
195+
worker._thread_main()
204196

205197
self.assertEqual(exporter.exported, [span_data1])
206198

@@ -221,7 +213,7 @@ def emit(self, span):
221213
raise Exception("This exporter is broken !")
222214

223215
exporter = Exporter()
224-
worker = async_._Worker(exporter, max_batch_size=2)
216+
worker = async_._Worker(exporter, max_batch_size=2, wait_period=0)
225217

226218
span_data0 = [mock.Mock()]
227219
span_data1 = [mock.Mock()]
@@ -232,8 +224,7 @@ def emit(self, span):
232224
worker.enqueue(span_data2)
233225
worker.enqueue(async_._WORKER_TERMINATOR)
234226

235-
with wait_period_patch:
236-
worker._thread_main()
227+
worker._thread_main()
237228

238229
# Span 2 should throw an exception, only span 0 and 1 are left
239230
self.assertEqual(exporter.exported, span_data0 + span_data1)

0 commit comments

Comments
 (0)