Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit e0603e0

Browse files
authored
Refactor async exporter (#642)
Introduced a Queue class to common namespace. Decoupled Queue, Worker and Exporter for AzureExporter.
1 parent 0016128 commit e0603e0

File tree

6 files changed

+361
-82
lines changed

6 files changed

+361
-82
lines changed

contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ class Options(Object):
2222
prototype = Object(
2323
endpoint='https://dc.services.visualstudio.com/v2/track',
2424
export_interval=15.0,
25+
grace_period=5.0,
2526
instrumentation_key=os.getenv('APPINSIGHTS_INSTRUMENTATIONKEY', None),
27+
max_batch_size=100,
2628
minimum_retry_interval=60, # minimum retry interval in seconds
2729
proxy=None,
2830
storage_maintenance_period=60,
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright 2019, OpenCensus Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import atexit
16+
import threading
17+
import time
18+
19+
from opencensus.common.schedule import Queue
20+
from opencensus.common.schedule import QueueEvent
21+
from opencensus.ext.azure.common import Options
22+
23+
24+
class BaseExporter(object):
25+
def __init__(self, **options):
26+
options = Options(**options)
27+
self.export_interval = options.export_interval
28+
self.max_batch_size = options.max_batch_size
29+
# TODO: queue should be moved to tracer
30+
# too much refactor work, leave to the next PR
31+
self._queue = Queue(capacity=8192) # TODO: make this configurable
32+
self.EXIT_EVENT = self._queue.EXIT_EVENT
33+
# TODO: worker should not be created in the base exporter
34+
self._worker = Worker(self._queue, self)
35+
self._worker.start()
36+
atexit.register(self._worker.stop, options.grace_period)
37+
38+
# Ideally we don't want to have `emit`
39+
# Exporter will have one public method - `export`, which is a blocking
40+
# method, running inside worker threads.
41+
def emit(self, batch, event=None):
42+
raise NotImplementedError # pragma: NO COVER
43+
44+
# TODO: we shouldn't have this at the beginning
45+
# Tracer should own the queue, exporter shouldn't even know if the
46+
# source is a queue or not.
47+
# Tracer puts span_data into the queue.
48+
# Worker gets span_data from the src (here is the queue) and feed into
49+
# the dst (exporter).
50+
# Exporter defines the MTU (max_batch_size) and export_interval.
51+
# There can be one worker for each queue, or multiple workers for each
52+
# queue, or shared workers among queues (e.g. queue for traces, queue
53+
# for logs).
54+
def export(self, items):
55+
self._queue.puts(items, block=False) # pragma: NO COVER
56+
57+
58+
class Worker(threading.Thread):
59+
daemon = True
60+
61+
def __init__(self, src, dst):
62+
self.src = src
63+
self.dst = dst
64+
self._stopping = False
65+
super(Worker, self).__init__()
66+
67+
def run(self): # pragma: NO COVER
68+
src = self.src
69+
dst = self.dst
70+
while True:
71+
batch = src.gets(dst.max_batch_size, dst.export_interval)
72+
if batch and isinstance(batch[-1], QueueEvent):
73+
dst.emit(batch[:-1], event=batch[-1])
74+
if batch[-1] is src.EXIT_EVENT:
75+
break
76+
else:
77+
continue
78+
dst.emit(batch)
79+
80+
def stop(self, timeout=None): # pragma: NO COVER
81+
start_time = time.time()
82+
wait_time = timeout
83+
if self.is_alive() and not self._stopping:
84+
self._stopping = True
85+
self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time)
86+
elapsed_time = time.time() - start_time
87+
wait_time = timeout and max(timeout - elapsed_time, 0)
88+
if self.src.EXIT_EVENT.wait(timeout=wait_time):
89+
return time.time() - start_time # time taken to stop

contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,14 @@
1616
import json
1717
import requests
1818

19-
from opencensus.common.transports.async_ import AsyncTransport
20-
from opencensus.common.schedule import PeriodicTask
2119
from opencensus.ext.azure.common import Options
2220
from opencensus.ext.azure.common import utils
21+
from opencensus.ext.azure.common.exporter import BaseExporter
2322
from opencensus.ext.azure.common.protocol import Data
2423
from opencensus.ext.azure.common.protocol import Envelope
2524
from opencensus.ext.azure.common.protocol import RemoteDependency
2625
from opencensus.ext.azure.common.protocol import Request
2726
from opencensus.ext.azure.common.storage import LocalFileStorage
28-
from opencensus.trace import base_exporter
2927
from opencensus.trace import execution_context
3028
from opencensus.trace.span import SpanKind
3129

@@ -34,7 +32,7 @@
3432
__all__ = ['AzureExporter']
3533

3634

37-
class AzureExporter(base_exporter.Exporter):
35+
class AzureExporter(BaseExporter):
3836
"""An exporter that sends traces to Microsoft Azure Monitor.
3937
4038
:type options: dict
@@ -51,17 +49,7 @@ def __init__(self, **options):
5149
maintenance_period=self.options.storage_maintenance_period,
5250
retention_period=self.options.storage_retention_period,
5351
)
54-
self.transport = AsyncTransport(
55-
self,
56-
max_batch_size=100,
57-
wait_period=self.options.export_interval,
58-
)
59-
self._transmission_task = PeriodicTask(
60-
interval=self.options.storage_maintenance_period,
61-
function=self._transmission_routine,
62-
)
63-
self._transmission_task.daemon = True
64-
self._transmission_task.start()
52+
super(AzureExporter, self).__init__(**options)
6553

6654
def span_data_to_envelope(self, sd):
6755
envelope = Envelope(
@@ -124,7 +112,7 @@ def span_data_to_envelope(self, sd):
124112
# TODO: links, tracestate, tags, attrs
125113
return envelope
126114

127-
def _transmission_routine(self):
115+
def _transmit_from_storage(self):
128116
for blob in self.storage.gets():
129117
if blob.lease(self.options.timeout + 5):
130118
envelopes = blob.get() # TODO: handle error
@@ -142,8 +130,6 @@ def _transmit(self, envelopes):
142130
Return the next retry time in seconds for retryable failure.
143131
This function should never throw exception.
144132
"""
145-
if not envelopes:
146-
return 0
147133
# TODO: prevent requests being tracked
148134
blacklist_hostnames = execution_context.get_opencensus_attr(
149135
'blacklist_hostnames',
@@ -236,23 +222,23 @@ def _transmit(self, envelopes):
236222
# server side error (non-retryable)
237223
return -response.status_code
238224

239-
def emit(self, span_datas):
240-
"""
241-
:type span_datas: list of :class:
242-
`~opencensus.trace.span_data.SpanData`
243-
:param list of opencensus.trace.span_data.SpanData span_datas:
244-
SpanData tuples to emit
245-
"""
246-
envelopes = [self.span_data_to_envelope(sd) for sd in span_datas]
247-
result = self._transmit(envelopes)
248-
if result > 0:
249-
self.storage.put(envelopes, result)
225+
def emit(self, batch, event=None):
226+
try:
227+
if batch:
228+
envelopes = [self.span_data_to_envelope(sd) for sd in batch]
229+
result = self._transmit(envelopes)
230+
if result > 0:
231+
self.storage.put(envelopes, result)
232+
if event:
233+
if event is self.EXIT_EVENT:
234+
self._transmit_from_storage() # send files before exit
235+
event.set()
236+
return
237+
if len(batch) < self.options.max_batch_size:
238+
self._transmit_from_storage()
239+
except Exception as ex:
240+
logger.exception('Transmission exception: %s.', ex)
250241

251-
def export(self, span_datas):
252-
"""
253-
:type span_datas: list of :class:
254-
`~opencensus.trace.span_data.SpanData`
255-
:param list of opencensus.trace.span_data.SpanData span_datas:
256-
SpanData tuples to export
257-
"""
258-
self.transport.export(span_datas)
242+
def _stop(self, timeout=None):
243+
self.storage.close()
244+
return self._worker.stop(timeout)

0 commit comments

Comments
 (0)