Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit de44767

Browse files
authored
Implement exceptionType and statusCode for network statsbeat (#1138)
1 parent 00a817c commit de44767

File tree

6 files changed

+239
-123
lines changed

6 files changed

+239
-123
lines changed

contrib/opencensus-ext-azure/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## Unreleased
44

5+
- Add statusCode and exceptionType to network statsbeat
6+
([#1138](https://github.com/census-instrumentation/opencensus-python/pull/1138))
7+
58
## 1.1.5
69
Released 2022-07-05
710

contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@
3636
_requests_lock = threading.Lock()
3737
_requests_map = {}
3838
_REACHED_INGESTION_STATUS_CODES = (200, 206, 402, 408, 429, 439, 500)
39+
REDIRECT_STATUS_CODES = (307, 308)
40+
RETRYABLE_STATUS_CODES = (
41+
401, # Unauthorized
42+
403, # Forbidden
43+
408, # Request Timeout
44+
429, # Too many requests
45+
500, # Internal server error
46+
503, # Service unavailable
47+
)
48+
THROTTLE_STATUS_CODES = (402, 439)
3949

4050

4151
class TransportStatusCode:
@@ -82,6 +92,7 @@ def _transmit(self, envelopes):
8292
if not envelopes:
8393
return 0
8494
status = None
95+
exception = None
8596
try:
8697
start_time = time.time()
8798
headers = {
@@ -93,9 +104,6 @@ def _transmit(self, envelopes):
93104
token = self.options.credential.get_token(_MONITOR_OAUTH_SCOPE)
94105
headers["Authorization"] = "Bearer {}".format(token.token)
95106
endpoint += '/v2.1/track'
96-
if self._check_stats_collection():
97-
with _requests_lock:
98-
_requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501
99107
response = requests.post(
100108
url=endpoint,
101109
data=json.dumps(envelopes),
@@ -104,52 +112,52 @@ def _transmit(self, envelopes):
104112
proxies=json.loads(self.options.proxies),
105113
allow_redirects=False,
106114
)
107-
except requests.Timeout:
115+
except requests.Timeout as ex:
108116
if not self._is_stats_exporter():
109117
logger.warning(
110118
'Request time out. Ingestion may be backed up. Retrying.')
111119
status = TransportStatusCode.RETRY
120+
exception = ex
112121
except requests.RequestException as ex:
113122
if not self._is_stats_exporter():
114123
logger.warning(
115124
'Retrying due to transient client side error %s.', ex)
116-
# client side error (retryable)
117125
status = TransportStatusCode.RETRY
126+
exception = ex
118127
except CredentialUnavailableError as ex:
119128
if not self._is_stats_exporter():
120129
logger.warning('Credential error. %s. Dropping telemetry.', ex)
121130
status = TransportStatusCode.DROP
131+
exception = ex
122132
except ClientAuthenticationError as ex:
123133
if not self._is_stats_exporter():
124134
logger.warning('Authentication error %s', ex)
125135
status = TransportStatusCode.RETRY
136+
exception = ex
126137
except Exception as ex:
127138
if not self._is_stats_exporter():
128139
logger.warning(
129140
'Error when sending request %s. Dropping telemetry.', ex)
130-
# Extraneous error (non-retryable)
131141
status = TransportStatusCode.DROP
142+
exception = ex
132143
finally:
144+
if self._check_stats_collection():
145+
_update_requests_map('count')
133146
end_time = time.time()
134147
if self._check_stats_collection():
135-
with _requests_lock:
136-
duration = _requests_map.get('duration', 0)
137-
_requests_map['duration'] = duration + (end_time - start_time) # noqa: E501
138-
if status is not None:
148+
_update_requests_map('duration', value=end_time-start_time)
149+
150+
if status is not None and exception is not None:
139151
if self._check_stats_collection():
140-
with _requests_lock:
141-
if status is TransportStatusCode.RETRY:
142-
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
143-
else:
144-
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
152+
_update_requests_map('exception', value=exception.__class__.__name__) # noqa: E501
153+
return status
145154
if self._is_stats_exporter() and \
146155
not state.get_statsbeat_shutdown() and \
147156
not state.get_statsbeat_initial_success():
148157
# If ingestion threshold during statsbeat initialization is
149158
# reached, return back code to shut it down
150159
if _statsbeat_failure_reached_threshold():
151160
return TransportStatusCode.STATSBEAT_SHUTDOWN
152-
return status
153161

154162
text = 'N/A'
155163
status_code = 0
@@ -160,7 +168,7 @@ def _transmit(self, envelopes):
160168
if not self._is_stats_exporter():
161169
logger.warning('Error while reading response body %s.', ex)
162170
if self._check_stats_collection():
163-
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
171+
_update_requests_map('exception', value=ex.__class__.__name__)
164172
return TransportStatusCode.DROP
165173

166174
if self._is_stats_exporter() and \
@@ -178,8 +186,7 @@ def _transmit(self, envelopes):
178186
if status_code == 200: # Success
179187
self._consecutive_redirects = 0
180188
if self._check_stats_collection():
181-
with _requests_lock:
182-
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
189+
_update_requests_map('success')
183190
return TransportStatusCode.SUCCESS
184191
elif _status_code_is_redirect(status_code): # Redirect
185192
# for statsbeat, these are not tracked as success nor failures
@@ -206,21 +213,25 @@ def _transmit(self, envelopes):
206213
)
207214
# If redirect but did not return, exception occured
208215
if self._check_stats_collection():
209-
with _requests_lock:
210-
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
216+
_update_requests_map('exception', value="Circular Redirect")
211217
return TransportStatusCode.DROP
212218
elif _status_code_is_throttle(status_code): # Throttle
213219
if self._check_stats_collection():
214220
# 402: Monthly Quota Exceeded (new SDK)
215221
# 439: Monthly Quota Exceeded (old SDK) <- Currently OC SDK
216-
with _requests_lock:
217-
_requests_map['throttle'] = _requests_map.get('throttle', 0) + 1 # noqa: E501
222+
_update_requests_map('throttle', value=status_code)
223+
if not self._is_stats_exporter():
224+
logger.warning(
225+
'Telemetry was throttled %s: %s.',
226+
status_code,
227+
text,
228+
)
218229
return TransportStatusCode.DROP
219230
elif _status_code_is_retryable(status_code): # Retry
220231
if not self._is_stats_exporter():
221232
if status_code == 401: # Authentication error
222233
logger.warning(
223-
'Authentication error %s: %s.',
234+
'Authentication error %s: %s. Retrying.',
224235
status_code,
225236
text,
226237
)
@@ -229,7 +240,7 @@ def _transmit(self, envelopes):
229240
# Can occur when v2 endpoint is used while AI resource is configured # noqa: E501
230241
# with disableLocalAuth
231242
logger.warning(
232-
'Forbidden error %s: %s.',
243+
'Forbidden error %s: %s. Retrying.',
233244
status_code,
234245
text,
235246
)
@@ -240,8 +251,7 @@ def _transmit(self, envelopes):
240251
text,
241252
)
242253
if self._check_stats_collection():
243-
with _requests_lock:
244-
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
254+
_update_requests_map('retry', value=status_code)
245255
return TransportStatusCode.RETRY
246256
elif status_code == 206: # Partial Content
247257
data = None
@@ -251,7 +261,7 @@ def _transmit(self, envelopes):
251261
if not self._is_stats_exporter():
252262
logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501
253263
if self._check_stats_collection():
254-
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
264+
_update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501
255265
return TransportStatusCode.DROP
256266
if data:
257267
try:
@@ -260,8 +270,7 @@ def _transmit(self, envelopes):
260270
if _status_code_is_retryable(error['statusCode']):
261271
resend_envelopes.append(envelopes[error['index']])
262272
if self._check_stats_collection():
263-
with _requests_lock:
264-
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
273+
_update_requests_map('retry', value=error['statusCode']) # noqa: E501
265274
else:
266275
logger.error(
267276
'Data drop %s: %s %s.',
@@ -280,16 +289,15 @@ def _transmit(self, envelopes):
280289
ex,
281290
)
282291
if self._check_stats_collection():
283-
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
292+
_update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501
284293
return TransportStatusCode.DROP
285294
# cannot parse response body, fallback to retry
286295
else:
287296
# 400 and 404 will be tracked as failure count
288297
# 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey) # noqa: E501
289298
# 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string # noqa: E501
290299
if self._check_stats_collection():
291-
with _requests_lock:
292-
_requests_map['failure'] = _requests_map.get('failure', 0) + 1 # noqa: E501
300+
_update_requests_map('failure', value=status_code)
293301
# Other, server side error (non-retryable)
294302
if not self._is_stats_exporter():
295303
logger.error(
@@ -301,21 +309,15 @@ def _transmit(self, envelopes):
301309

302310

303311
def _status_code_is_redirect(status_code):
304-
return status_code in (307, 308)
312+
return status_code in REDIRECT_STATUS_CODES
305313

306314

307315
def _status_code_is_throttle(status_code):
308-
return status_code in (402, 439)
316+
return status_code in THROTTLE_STATUS_CODES
309317

310318

311319
def _status_code_is_retryable(status_code):
312-
return status_code in (
313-
401, # Unauthorized
314-
403, # Forbidden
315-
429, # Too many requests
316-
500, # Internal server error
317-
503, # Service unavailable
318-
)
320+
return status_code in RETRYABLE_STATUS_CODES
319321

320322

321323
def _reached_ingestion_status_code(status_code):
@@ -326,3 +328,22 @@ def _statsbeat_failure_reached_threshold():
326328
# increment failure counter for sending statsbeat if in initialization
327329
state.increment_statsbeat_initial_failure_count()
328330
return state.get_statsbeat_initial_failure_count() >= 3
331+
332+
333+
def _update_requests_map(type, value=None):
334+
if value is None:
335+
value = 0 # error state
336+
with _requests_lock:
337+
if type == "count":
338+
_requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501
339+
elif type == "duration":
340+
_requests_map['duration'] = _requests_map.get('duration', 0) + value # noqa: E501
341+
elif type == "success":
342+
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
343+
else:
344+
prev = 0
345+
if _requests_map.get(type):
346+
prev = _requests_map.get(type).get(value, 0)
347+
else:
348+
_requests_map[type] = {}
349+
_requests_map[type][value] = prev + 1

contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def export_metrics(self, metrics):
7575
batch = self.apply_telemetry_processors(batch)
7676
result = self._transmit(batch)
7777
# If statsbeat exporter and received signal to shutdown
78-
if self._is_stats_exporter() and result is \
78+
if self._is_stats and result is \
7979
TransportStatusCode.STATSBEAT_SHUTDOWN:
8080
from opencensus.ext.azure.statsbeat import statsbeat
8181
statsbeat.shutdown_statsbeat_metrics()
@@ -102,6 +102,9 @@ def metric_to_envelopes(self, metric):
102102
# point which contains the aggregated value
103103
data_point = self._create_data_points(
104104
time_series, md)[0]
105+
# if statsbeat exporter, ignore points with 0 value
106+
if self._is_stats and data_point.value == 0:
107+
continue
105108
# The timestamp is when the metric was recorded
106109
timestamp = time_series.points[0].timestamp
107110
# Get the properties using label keys from metric

0 commit comments

Comments
 (0)