Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit e693d50

Browse files
authored
[Stats] Implement network stats - requests success count (#1059)
1 parent 932ca62 commit e693d50

File tree

7 files changed

+308
-89
lines changed

7 files changed

+308
-89
lines changed

contrib/opencensus-ext-azure/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
([#1021](https://github.com/census-instrumentation/opencensus-python/pull/1021))
77
- Implement attach rate metrics via Statbeat
88
([#1053](https://github.com/census-instrumentation/opencensus-python/pull/1053))
9+
- Implement network metrics via Statbeat - Success count
10+
([#1059](https://github.com/census-instrumentation/opencensus-python/pull/1059))
911

1012
## 1.0.8
1113
Released 2021-05-13

contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,23 @@
1414

1515
import json
1616
import logging
17+
import threading
1718

1819
import requests
1920
from azure.core.exceptions import ClientAuthenticationError
2021
from azure.identity._exceptions import CredentialUnavailableError
2122

2223
logger = logging.getLogger(__name__)
2324
_MONITOR_OAUTH_SCOPE = "https://monitor.azure.com//.default"
25+
_requests_lock = threading.Lock()
26+
_requests_map = {}
2427

2528

2629
class TransportMixin(object):
30+
def _check_stats_collection(self):
31+
return self.options.enable_stats_metrics and \
32+
(not hasattr(self, '_is_stats') or not self._is_stats)
33+
2734
def _transmit_from_storage(self):
2835
if self.storage:
2936
for blob in self.storage.gets():
@@ -100,6 +107,9 @@ def _transmit(self, envelopes):
100107
except Exception:
101108
pass
102109
if response.status_code == 200:
110+
if self._check_stats_collection():
111+
with _requests_lock:
112+
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
103113
return 0
104114
if response.status_code == 206: # Partial Content
105115
if data:

contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/statsbeat_metrics/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from opencensus.ext.azure.metrics_exporter import MetricsExporter
1818
from opencensus.ext.azure.metrics_exporter.statsbeat_metrics.statsbeat import (
1919
_STATS_CONNECTION_STRING,
20-
_STATS_LONG_EXPORT_INTERVAL,
20+
_STATS_SHORT_EXPORT_INTERVAL,
2121
_StatsbeatMetrics,
2222
)
2323
from opencensus.metrics import transport
@@ -36,7 +36,7 @@ def collect_statsbeat_metrics(ikey):
3636
is_stats=True,
3737
connection_string=_STATS_CONNECTION_STRING,
3838
enable_standard_metrics=False,
39-
export_interval=_STATS_LONG_EXPORT_INTERVAL, # 24h by default
39+
export_interval=_STATS_SHORT_EXPORT_INTERVAL, # 15m by default
4040
)
4141
# The user's ikey is the one being tracked
4242
producer = _AzureStatsbeatMetricsProducer(

contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/statsbeat_metrics/statsbeat.py

Lines changed: 113 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414

1515
import datetime
1616
import json
17+
import logging
1718
import os
1819
import platform
20+
import threading
1921

2022
import requests
2123

24+
from opencensus.ext.azure.common.transport import _requests_lock, _requests_map
2225
from opencensus.ext.azure.common.version import __version__ as ext_version
23-
from opencensus.metrics.export.gauge import LongGauge
26+
from opencensus.metrics.export.gauge import DerivedLongGauge, LongGauge
2427
from opencensus.metrics.label_key import LabelKey
2528
from opencensus.metrics.label_value import LabelValue
2629

@@ -33,9 +36,12 @@
3336
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 86400 # 24 hours
3437

3538
_ATTACH_METRIC_NAME = "Attach"
39+
_REQ_SUC_COUNT_NAME = "Request Success Count"
3640

3741
_RP_NAMES = ["appsvc", "function", "vm", "unknown"]
3842

43+
_logger = logging.getLogger(__name__)
44+
3945

4046
def _get_stats_connection_string():
4147
cs_env = os.environ.get("APPLICATION_INSIGHTS_STATS_CONNECTION_STRING")
@@ -48,29 +54,29 @@ def _get_stats_connection_string():
4854
def _get_stats_short_export_interval():
4955
ei_env = os.environ.get("APPLICATION_INSIGHTS_STATS_SHORT_EXPORT_INTERVAL")
5056
if ei_env:
51-
return ei_env
57+
return int(ei_env)
5258
else:
5359
return _DEFAULT_STATS_SHORT_EXPORT_INTERVAL
5460

5561

5662
def _get_stats_long_export_interval():
5763
ei_env = os.environ.get("APPLICATION_INSIGHTS_STATS_LONG_EXPORT_INTERVAL")
5864
if ei_env:
59-
return ei_env
65+
return int(ei_env)
6066
else:
6167
return _DEFAULT_STATS_LONG_EXPORT_INTERVAL
6268

6369

6470
_STATS_CONNECTION_STRING = _get_stats_connection_string()
6571
_STATS_SHORT_EXPORT_INTERVAL = _get_stats_short_export_interval()
6672
_STATS_LONG_EXPORT_INTERVAL = _get_stats_long_export_interval()
73+
_STATS_LONG_INTERVAL_THRESHOLD = _STATS_LONG_EXPORT_INTERVAL / _STATS_SHORT_EXPORT_INTERVAL # noqa: E501
6774

6875

69-
def _get_attach_properties():
76+
def _get_common_properties():
7077
properties = []
7178
properties.append(
7279
LabelKey("rp", 'name of the rp, e.g. appsvc, vm, function, aks, etc.'))
73-
properties.append(LabelKey("rpid", 'unique id of rp'))
7480
properties.append(LabelKey("attach", 'codeless or sdk'))
7581
properties.append(LabelKey("cikey", 'customer ikey'))
7682
properties.append(LabelKey("runtimeVersion", 'Python version'))
@@ -80,78 +86,142 @@ def _get_attach_properties():
8086
return properties
8187

8288

89+
def _get_attach_properties():
90+
properties = _get_common_properties()
91+
properties.insert(1, LabelKey("rpid", 'unique id of rp'))
92+
return properties
93+
94+
95+
def _get_network_properties():
96+
properties = _get_common_properties()
97+
return properties
98+
99+
100+
def _get_success_count_value():
101+
with _requests_lock:
102+
interval_count = _requests_map.get('success', 0) \
103+
- _requests_map.get('last_success', 0)
104+
_requests_map['last_success'] = _requests_map.get('success', 0)
105+
return interval_count
106+
107+
83108
class _StatsbeatMetrics:
84109

85110
def __init__(self, instrumentation_key):
86111
self._instrumentation_key = instrumentation_key
87-
self.vm_data = {}
88-
self.vm_retry = True
112+
self._stats_lock = threading.Lock()
113+
self._vm_data = {}
114+
self._vm_retry = True
115+
self._rp = _RP_NAMES[3]
116+
self._os_type = platform.system()
89117
# Attach metrics - metrics related to rp (resource provider)
90118
self._attach_metric = LongGauge(
91119
_ATTACH_METRIC_NAME,
92120
'Statsbeat metric related to rp integrations',
93121
'count',
94122
_get_attach_properties(),
95123
)
124+
# Keep track of how many iterations until long export
125+
self._long_threshold_count = 0
126+
# Network metrics - metrics related to request calls to Breeze
127+
self._network_metrics = {}
128+
# Map of gauge function -> metric
129+
# Gauge function is the callback used to populate the metric value
130+
self._network_metrics[_get_success_count_value] = DerivedLongGauge(
131+
_REQ_SUC_COUNT_NAME,
132+
'Request success count',
133+
'count',
134+
_get_network_properties(),
135+
)
96136

137+
# Metrics that are sent on application start
97138
def get_initial_metrics(self):
98139
stats_metrics = []
99140
if self._attach_metric:
100-
attach_metric = self._get_attach_metric(self._attach_metric)
141+
attach_metric = self._get_attach_metric()
101142
if attach_metric:
102143
stats_metrics.append(attach_metric)
103144
return stats_metrics
104145

146+
# Metrics sent every statsbeat interval
105147
def get_metrics(self):
106-
stats_metrics = self.get_initial_metrics()
107-
108-
return stats_metrics
109-
110-
def _get_attach_metric(self, metric):
148+
metrics = []
149+
try:
150+
# Initial metrics use the long export interval
151+
# Only export once long count hits threshold
152+
with self._stats_lock:
153+
self._long_threshold_count = self._long_threshold_count + 1
154+
if self._long_threshold_count >= _STATS_LONG_INTERVAL_THRESHOLD: # noqa: E501
155+
metrics.extend(self.get_initial_metrics())
156+
self._long_threshold_count = 0
157+
network_metrics = self._get_network_metrics()
158+
metrics.extend(network_metrics)
159+
except Exception as ex:
160+
_logger.warning('Error while exporting stats metrics %s.', ex)
161+
162+
return metrics
163+
164+
def _get_network_metrics(self):
165+
properties = self._get_common_properties()
166+
metrics = []
167+
for fn, metric in self._network_metrics.items():
168+
# NOTE: A time series is a set of unique label values
169+
# If the label values ever change, a separate time series will be
170+
# created, however, `_get_properties()` should never change
171+
metric.create_time_series(properties, fn)
172+
stats_metric = metric.get_metric(datetime.datetime.utcnow())
173+
# Don't export if value is 0
174+
if stats_metric.time_series[0].points[0].value.value != 0:
175+
metrics.append(stats_metric)
176+
return metrics
177+
178+
def _get_attach_metric(self):
111179
properties = []
112-
vm_os_type = ''
113-
# rpId
180+
rp = ''
181+
rpId = ''
182+
# rp, rpId
114183
if os.environ.get("WEBSITE_SITE_NAME") is not None:
115184
# Web apps
116-
properties.append(LabelValue(_RP_NAMES[0]))
117-
properties.append(
118-
LabelValue(
119-
'{}/{}'.format(
185+
rp = _RP_NAMES[0]
186+
rpId = '{}/{}'.format(
120187
os.environ.get("WEBSITE_SITE_NAME"),
121-
os.environ.get("WEBSITE_HOME_STAMPNAME", '')),
122-
)
188+
os.environ.get("WEBSITE_HOME_STAMPNAME", '')
123189
)
124190
elif os.environ.get("FUNCTIONS_WORKER_RUNTIME") is not None:
125191
# Function apps
126-
properties.append(LabelValue(_RP_NAMES[1]))
127-
properties.append(LabelValue(os.environ.get("WEBSITE_HOSTNAME")))
128-
elif self.vm_retry and self._get_azure_compute_metadata():
192+
rp = _RP_NAMES[1]
193+
rpId = os.environ.get("WEBSITE_HOSTNAME")
194+
elif self._vm_retry and self._get_azure_compute_metadata():
129195
# VM
130-
properties.append(LabelValue(_RP_NAMES[2]))
131-
properties.append(
132-
LabelValue(
133-
'{}//{}'.format(
134-
self.vm_data.get("vmId", ''),
135-
self.vm_data.get("subscriptionId", '')),
136-
)
137-
)
138-
vm_os_type = self.vm_data.get("osType", '')
196+
rp = _RP_NAMES[2]
197+
rpId = '{}//{}'.format(
198+
self._vm_data.get("vmId", ''),
199+
self._vm_data.get("subscriptionId", ''))
200+
self._os_type = self._vm_data.get("osType", '')
139201
else:
140202
# Not in any rp or VM metadata failed
141-
properties.append(LabelValue(_RP_NAMES[3]))
142-
properties.append(LabelValue(_RP_NAMES[3]))
203+
rp = _RP_NAMES[3]
204+
rpId = _RP_NAMES[3]
143205

206+
self._rp = rp
207+
properties.extend(self._get_common_properties())
208+
properties.insert(1, LabelValue(rpId)) # rpid
209+
self._attach_metric.get_or_create_time_series(properties)
210+
return self._attach_metric.get_metric(datetime.datetime.utcnow())
211+
212+
def _get_common_properties(self):
213+
properties = []
214+
properties.append(LabelValue(self._rp)) # rp
144215
properties.append(LabelValue("sdk")) # attach type
145216
properties.append(LabelValue(self._instrumentation_key)) # cikey
146217
# runTimeVersion
147218
properties.append(LabelValue(platform.python_version()))
148-
properties.append(LabelValue(vm_os_type or platform.system())) # os
219+
properties.append(LabelValue(self._os_type or platform.system())) # os
149220
properties.append(LabelValue("python")) # language
150221
# version
151222
properties.append(
152223
LabelValue(ext_version))
153-
metric.get_or_create_time_series(properties)
154-
return metric.get_metric(datetime.datetime.utcnow())
224+
return properties
155225

156226
def _get_azure_compute_metadata(self):
157227
try:
@@ -161,20 +231,20 @@ def _get_azure_compute_metadata(self):
161231
request_url, headers={"MetaData": "True"}, timeout=5.0)
162232
except (requests.exceptions.ConnectionError, requests.Timeout):
163233
# Not in VM
164-
self.vm_retry = False
234+
self._vm_retry = False
165235
return False
166236
except requests.exceptions.RequestException:
167-
self.vm_retry = True # retry
237+
self._vm_retry = True # retry
168238
return False
169239

170240
try:
171241
text = response.text
172-
self.vm_data = json.loads(text)
242+
self._vm_data = json.loads(text)
173243
except Exception: # pylint: disable=broad-except
174244
# Error in reading response body, retry
175-
self.vm_retry = True
245+
self._vm_retry = True
176246
return False
177247

178248
# Vm data is perpetually updated
179-
self.vm_retry = True
249+
self._vm_retry = True
180250
return True

0 commit comments

Comments
 (0)