Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit 4cd1f9d

Browse files
mayurkale22c24t
authored andcommitted
Fix: StackDriver exporter fails with "One or more Timeseries could not be written" (#374)
Fix multiple bugs in the StackdriverStatsExporter, including changing its behavior so that it emits multiple timeseries each with a single point instead of a single timeseries with multiple points.
1 parent b3c298f commit 4cd1f9d

File tree

12 files changed

+301
-112
lines changed

12 files changed

+301
-112
lines changed

examples/stats/exporter/stackdriver.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,15 @@
2121
from opencensus.stats import measure as measure_module
2222
from opencensus.stats import stats as stats_module
2323
from opencensus.stats import view as view_module
24-
from opencensus.stats.exporters import stackdriver_exporter as stackdriver
25-
from opencensus.tags import tag_key as tag_key_module
24+
from opencensus.stats.exporters import stackdriver_exporter
2625
from opencensus.tags import tag_map as tag_map_module
27-
from opencensus.tags import tag_value as tag_value_module
2826

29-
MiB = 1 << 20
30-
FRONTEND_KEY = tag_key_module.TagKey("my.org/keys/frontend")
31-
VIDEO_SIZE_MEASURE = measure_module.MeasureInt(
32-
"my.org/measure/video_size_test2", "size of processed videos", "By")
33-
VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2"
34-
VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation(
35-
[0.0, 16.0 * MiB, 256.0 * MiB])
36-
VIDEO_SIZE_VIEW = view_module.View(
37-
VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY],
38-
VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION)
27+
# Create the measures
28+
# The latency in milliseconds
29+
m_latency_ms = measure_module.MeasureFloat(
30+
"task_latency", "The task latency in milliseconds", "ms")
3931

32+
# The stats recorder
4033
stats = stats_module.Stats()
4134
view_manager = stats.view_manager
4235
stats_recorder = stats.stats_recorder
@@ -47,22 +40,35 @@
4740
raise ValueError("Couldn't find Google Cloud credentials, set the "
4841
"project ID with 'gcloud set project'")
4942

50-
exporter = stackdriver.new_stats_exporter(
51-
stackdriver.Options(project_id=project_id))
52-
view_manager.register_exporter(exporter)
43+
latency_view = view_module.View(
44+
"task_latency_distribution",
45+
"The distribution of the task latencies",
46+
[],
47+
m_latency_ms,
48+
# Latency in buckets: [>=0ms, >=100ms, >=200ms, >=400ms, >=1s, >=2s, >=4s]
49+
aggregation_module.DistributionAggregation(
50+
[100.0, 200.0, 400.0, 1000.0, 2000.0, 4000.0]))
5351

54-
# Register view.
55-
view_manager.register_view(VIDEO_SIZE_VIEW)
5652

57-
# Sleep for [0, 10] milliseconds to fake work.
58-
time.sleep(random.randint(1, 10) / 1000.0)
53+
def main():
54+
# Enable metrics
55+
exporter = stackdriver_exporter.new_stats_exporter(
56+
stackdriver_exporter.Options(project_id=project_id))
57+
view_manager.register_exporter(exporter)
5958

60-
# Process video.
61-
# Record the processed video size.
62-
tag_value = tag_value_module.TagValue(str(1200))
63-
tag_map = tag_map_module.TagMap()
64-
tag_map.insert(FRONTEND_KEY, tag_value)
65-
measure_map = stats_recorder.new_measurement_map()
66-
measure_map.measure_int_put(VIDEO_SIZE_MEASURE, 25 * MiB)
59+
view_manager.register_view(latency_view)
60+
mmap = stats_recorder.new_measurement_map()
61+
tmap = tag_map_module.TagMap()
6762

68-
measure_map.record(tag_map)
63+
for i in range(100):
64+
ms = random.random() * 5 * 1000
65+
print("Latency {0}:{1}".format(i, ms))
66+
mmap.measure_float_put(m_latency_ms, ms)
67+
mmap.record(tmap)
68+
time.sleep(1)
69+
70+
print("Done recording metrics")
71+
72+
73+
if __name__ == "__main__":
74+
main()
Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017, OpenCensus Authors
1+
# Copyright 2018, OpenCensus Authors
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -72,3 +72,17 @@ def timestamp_to_microseconds(timestamp):
7272
epoch_time_secs = calendar.timegm(timestamp_str.timetuple())
7373
epoch_time_mus = epoch_time_secs * 1e6 + timestamp_str.microsecond
7474
return epoch_time_mus
75+
76+
77+
def iuniq(ible):
78+
"""Get an iterator over unique items of `ible`."""
79+
items = set()
80+
for item in ible:
81+
if item not in items:
82+
items.add(item)
83+
yield item
84+
85+
86+
def uniq(ible):
87+
"""Get a list of unique items of `ible`."""
88+
return list(iuniq(ible))

opencensus/stats/exporters/stackdriver_exporter.py

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
1516
import os
1617
import platform
1718
import re
@@ -21,6 +22,7 @@
2122
from google.cloud import monitoring_v3
2223

2324
from opencensus.__version__ import __version__
25+
from opencensus.common import utils
2426
from opencensus.common.monitored_resource_util.monitored_resource_util \
2527
import MonitoredResourceUtil
2628
from opencensus.common.transports import async_
@@ -29,6 +31,7 @@
2931
from opencensus.stats.exporters import base
3032

3133
MAX_TIME_SERIES_PER_UPLOAD = 200
34+
OPENCENSUS_TASK = "opencensus_task"
3235
OPENCENSUS_TASK_DESCRIPTION = "Opencensus task identifier"
3336
DEFAULT_DISPLAY_NAME_PREFIX = "OpenCensus"
3437
ERROR_BLANK_PROJECT_ID = "expecting a non-blank ProjectID"
@@ -107,8 +110,8 @@ def default_monitoring_labels(self):
107110

108111

109112
class StackdriverStatsExporter(base.StatsExporter):
110-
""" StackdriverStatsExporter exports stats
111-
to the Stackdriver Monitoring."""
113+
"""Stats exporter for the Stackdriver Monitoring backend."""
114+
112115
def __init__(self,
113116
options=Options(),
114117
client=None,
@@ -164,7 +167,8 @@ def upload_stats(self, view_data):
164167
""" It receives an array of view_data object
165168
and create time series for each value
166169
"""
167-
requests = self.make_request(view_data, MAX_TIME_SERIES_PER_UPLOAD)
170+
view_data_set = utils.uniq(view_data)
171+
requests = self.make_request(view_data_set, MAX_TIME_SERIES_PER_UPLOAD)
168172
for request in requests:
169173
self.client.create_time_series(request[CONS_NAME],
170174
request[CONS_TIME_SERIES])
@@ -181,7 +185,7 @@ def make_request(self, view_data, limit):
181185
for v_data in view_data:
182186
series = self.create_time_series_list(v_data, resource,
183187
metric_prefix)
184-
time_series.append(series)
188+
time_series.extend(series)
185189

186190
project_id = self.options.project_id
187191
request = {}
@@ -197,16 +201,19 @@ def create_time_series_list(self, v_data, option_resource_type,
197201
metric_prefix):
198202
""" Create the TimeSeries object based on the view data
199203
"""
200-
series = monitoring_v3.types.TimeSeries()
201-
series.metric.type = namespaced_view_name(v_data.view.name,
202-
metric_prefix)
203-
set_monitored_resource(series, option_resource_type)
204-
204+
time_series_list = []
205205
tag_agg = v_data.tag_value_aggregation_data_map
206206
for tag_value, agg in tag_agg.items():
207+
series = monitoring_v3.types.TimeSeries()
208+
series.metric.type = namespaced_view_name(v_data.view.name,
209+
metric_prefix)
210+
set_metric_labels(series, v_data.view, tag_value)
211+
set_monitored_resource(series, option_resource_type)
212+
207213
point = series.points.add()
208-
if type(agg) is \
209-
aggregation.aggregation_data.DistributionAggregationData:
214+
if isinstance(
215+
agg,
216+
aggregation.aggregation_data.DistributionAggregationData):
210217
agg_data = tag_agg.get(tag_value)
211218
dist_value = point.value.distribution_value
212219
dist_value.count = agg_data.count_data
@@ -228,12 +235,21 @@ def create_time_series_list(self, v_data, option_resource_type,
228235
buckets.extend([0])
229236
bounds.extend(list(map(float, agg_data.bounds)))
230237
buckets.extend(list(map(int, agg_data.counts_per_bucket)))
238+
elif isinstance(agg,
239+
aggregation.aggregation_data.CountAggregationData):
240+
point.value.int64_value = agg.count_data
241+
elif isinstance(
242+
agg, aggregation.aggregation_data.SumAggregationDataFloat):
243+
point.value.double_value = agg.sum_data
244+
elif not isinstance(
245+
agg, aggregation.aggregation_data
246+
.LastValueAggregationData): # pragma: NO COVER
247+
if isinstance(v_data.view.measure, measure.MeasureInt):
248+
point.value.int64_value = int(agg.value)
249+
elif isinstance(v_data.view.measure, measure.MeasureFloat):
250+
point.value.double_value = float(agg.value)
231251
else:
232-
convFloat, isFloat = as_float(tag_value[0])
233-
if isFloat: # pragma: NO COVER
234-
point.value.double_value = convFloat
235-
else: # pragma: NO COVER
236-
point.value.string_value = str(tag_value[0])
252+
point.value.string_value = str(tag_value[0])
237253

238254
start = datetime.strptime(v_data.start_time, EPOCH_PATTERN)
239255
end = datetime.strptime(v_data.end_time, EPOCH_PATTERN)
@@ -244,7 +260,7 @@ def create_time_series_list(self, v_data, option_resource_type,
244260
point.interval.end_time.seconds = int(timestamp_end)
245261

246262
secs = point.interval.end_time.seconds
247-
point.interval.end_time.nanos = int((timestamp_end-secs)*10**9)
263+
point.interval.end_time.nanos = int((timestamp_end - secs) * 10**9)
248264

249265
if type(agg) is not aggregation.aggregation_data.\
250266
LastValueAggregationData: # pragma: NO COVER
@@ -256,7 +272,10 @@ def create_time_series_list(self, v_data, option_resource_type,
256272
start_time.seconds = int(timestamp_start)
257273
start_secs = start_time.seconds
258274
start_time.nanos = int((timestamp_start - start_secs) * 1e9)
259-
return series
275+
276+
time_series_list.append(series)
277+
278+
return time_series_list
260279

261280
def create_metric_descriptor(self, view):
262281
""" it creates a MetricDescriptor
@@ -300,8 +319,8 @@ def create_metric_descriptor(self, view):
300319
if isinstance(view_measure, measure.MeasureFloat):
301320
value_type = metric_desc.ValueType.DOUBLE
302321
else:
303-
raise Exception("unsupported aggregation type: %s"
304-
% type(view_aggregation))
322+
raise Exception(
323+
"unsupported aggregation type: %s" % type(view_aggregation))
305324

306325
display_name_prefix = DEFAULT_DISPLAY_NAME_PREFIX
307326
if self.options.metric_prefix != "":
@@ -406,11 +425,6 @@ def new_stats_exporter(options):
406425

407426
if options.default_monitoring_labels is not None:
408427
exporter.set_default_labels(options.default_monitoring_labels)
409-
else:
410-
label = {}
411-
key = remove_non_alphanumeric(get_task_value())
412-
label[key] = OPENCENSUS_TASK_DESCRIPTION
413-
exporter.set_default_labels(label)
414428
return exporter
415429

416430

@@ -446,9 +460,24 @@ def new_label_descriptors(defaults, keys):
446460
label = {}
447461
label["key"] = remove_non_alphanumeric(tag_key)
448462
label_descriptors.append(label)
463+
label_descriptors.append({"key": OPENCENSUS_TASK,
464+
"description": OPENCENSUS_TASK_DESCRIPTION})
449465
return label_descriptors
450466

451467

468+
def set_metric_labels(series, view, tag_values):
469+
if len(view.columns) != len(tag_values):
470+
logging.warning(
471+
"TagKeys and TagValues don't have same size."
472+
) # pragma: NO COVER
473+
474+
for ii, tag_value in enumerate(tag_values):
475+
if tag_value is not None:
476+
metric_label = remove_non_alphanumeric(view.columns[ii])
477+
series.metric.labels[metric_label] = tag_value
478+
series.metric.labels[OPENCENSUS_TASK] = get_task_value()
479+
480+
452481
def remove_non_alphanumeric(text):
453482
""" Remove characters not accepted in labels key
454483
"""

opencensus/trace/attributes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
import six
1515

16-
from opencensus.trace import utils
16+
from opencensus.common import utils
1717

1818

1919
def _format_attribute_value(value):

opencensus/trace/exporters/jaeger_exporter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
from thrift.protocol import TBinaryProtocol, TCompactProtocol
2121
from thrift.transport import THttpClient, TTransport
2222

23+
from opencensus.common.utils import timestamp_to_microseconds
2324
from opencensus.trace import link as link_module
2425
from opencensus.trace.exporters import base
2526
from opencensus.trace.exporters.gen.jaeger import agent, jaeger
2627
from opencensus.trace.exporters.transports import sync
27-
from opencensus.trace.utils import timestamp_to_microseconds
2828

2929
DEFAULT_HOST_NAME = 'localhost'
3030
DEFAULT_AGENT_PORT = 6831

opencensus/trace/exporters/zipkin_exporter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
import requests
2121

22+
from opencensus.common.utils import check_str_length
23+
from opencensus.common.utils import timestamp_to_microseconds
2224
from opencensus.trace.exporters import base
2325
from opencensus.trace.exporters.transports import sync
24-
from opencensus.trace.utils import check_str_length, timestamp_to_microseconds
2526

2627
DEFAULT_ENDPOINT = '/api/v2/spans'
2728
DEFAULT_HOST_NAME = 'localhost'

opencensus/trace/span.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from datetime import datetime
1616
from itertools import chain
1717

18+
from opencensus.common.utils import _get_truncatable_str
1819
from opencensus.trace import attributes
1920
from opencensus.trace import base_span
2021
from opencensus.trace import link as link_module
@@ -23,7 +24,6 @@
2324
from opencensus.trace import time_event as time_event_module
2425
from opencensus.trace.span_context import generate_span_id
2526
from opencensus.trace.tracers import base
26-
from opencensus.trace.utils import _get_truncatable_str
2727

2828

2929
class SpanKind(object):

opencensus/trace/span_data.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
# limitations under the License.
1414

1515
import collections
16-
from opencensus.trace import utils
16+
17+
from opencensus.common import utils
1718
from opencensus.trace import attributes
1819

1920
_SpanData = collections.namedtuple(

opencensus/trace/stack_trace.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import random
1818
import traceback
1919

20-
from opencensus.trace.utils import _get_truncatable_str
20+
from opencensus.common.utils import _get_truncatable_str
2121

2222
MAX_FRAMES = 128
2323

opencensus/trace/time_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from opencensus.trace.utils import _get_truncatable_str
15+
from opencensus.common.utils import _get_truncatable_str
1616

1717

1818
class Type(object):

0 commit comments

Comments
 (0)