Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 7877337

Browse files
author
Alex Amato
authored
Update ViewData to store a separate start_timestamp for each tag map. (#444)
Update the stackdriver_exporter to use a separate timestamp for each timeseries/tag map.
1 parent 0ec2c1b commit 7877337

File tree

11 files changed

+192
-29
lines changed

11 files changed

+192
-29
lines changed

opencensus/exporters/stats/stackdriver/internal/stackdriver_utils.cc

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ template <typename DataValueT>
132132
std::vector<google::monitoring::v3::TimeSeries> DataToTimeSeries(
133133
const opencensus::stats::ViewDescriptor& view_descriptor,
134134
const opencensus::stats::ViewData::DataMap<DataValueT>& data,
135+
const opencensus::stats::ViewData::DataMap<absl::Time>& start_times,
135136
const google::monitoring::v3::TimeSeries& base_time_series) {
136137
const google::api::MetricDescriptor::ValueType type =
137138
GetValueType(view_descriptor);
@@ -147,6 +148,17 @@ std::vector<google::monitoring::v3::TimeSeries> DataToTimeSeries(
147148
// The point is already created in the base_time_series to set the times.
148149
SetTypedValue(row.second, type,
149150
time_series.mutable_points(0)->mutable_value());
151+
152+
// Stackdriver doesn't like start_time and end_time being different for
153+
// GAUGE metrics. Don't set the start time for GAUGE.
154+
if (view_descriptor.aggregation().type() !=
155+
opencensus::stats::Aggregation::Type::kLastValue) {
156+
// Use the start time stored for the specific row's tags.
157+
auto* interval = time_series.mutable_points(0)->mutable_interval();
158+
absl::Time start_time = start_times.at(row.first);
159+
opencensus::common::SetTimestamp(start_time,
160+
interval->mutable_start_time());
161+
}
150162
}
151163
return vector;
152164
}
@@ -223,13 +235,6 @@ std::vector<google::monitoring::v3::TimeSeries> MakeTimeSeries(
223235
*base_time_series.mutable_resource() = *monitored_resource_for_view;
224236
}
225237
auto* interval = base_time_series.add_points()->mutable_interval();
226-
// Stackdriver doesn't like start_time and end_time being different for GAUGE
227-
// metrics.
228-
if (view_descriptor.aggregation().type() !=
229-
opencensus::stats::Aggregation::Type::kLastValue) {
230-
opencensus::common::SetTimestamp(data.start_time(),
231-
interval->mutable_start_time());
232-
}
233238
opencensus::common::SetTimestamp(data.end_time(),
234239
interval->mutable_end_time());
235240
if (add_task_label) {
@@ -239,13 +244,13 @@ std::vector<google::monitoring::v3::TimeSeries> MakeTimeSeries(
239244
switch (data.type()) {
240245
case opencensus::stats::ViewData::Type::kDouble:
241246
return DataToTimeSeries(view_descriptor, data.double_data(),
242-
base_time_series);
247+
data.start_times(), base_time_series);
243248
case opencensus::stats::ViewData::Type::kInt64:
244249
return DataToTimeSeries(view_descriptor, data.int_data(),
245-
base_time_series);
250+
data.start_times(), base_time_series);
246251
case opencensus::stats::ViewData::Type::kDistribution:
247252
return DataToTimeSeries(view_descriptor, data.distribution_data(),
248-
base_time_series);
253+
data.start_times(), base_time_series);
249254
}
250255
ABSL_ASSERT(false && "Bad ViewData.type().");
251256
return {};

opencensus/exporters/stats/stackdriver/internal/stackdriver_utils_test.cc

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "opencensus/stats/testing/test_utils.h"
3232

3333
using opencensus::stats::testing::TestUtils;
34+
using opencensus::stats::testing::TestViewValue;
3435

3536
namespace opencensus {
3637
namespace exporters {
@@ -67,6 +68,19 @@ DefaultPerMetricResource() {
6768
return std::unordered_map<std::string, google::api::MonitoredResource>();
6869
}
6970

71+
std::vector<std::string> ToDataMapKeys(
72+
const std::initializer_list<std::string> label_names,
73+
const google::monitoring::v3::TimeSeries& time_series) {
74+
// label_names defines the key order and is needed because
75+
// time_series.metric().labels() does not maintain the original order.
76+
std::vector<std::string> data_map_keys;
77+
const auto& label_map = time_series.metric().labels();
78+
for (const auto& label_name : label_names) {
79+
data_map_keys.push_back(label_map.at(label_name));
80+
}
81+
return data_map_keys;
82+
}
83+
7084
// TODO: MonitoredResourceForView
7185
TEST(StackdriverUtilsTest, MonitoredResourceForViewIsDefault) {
7286
const auto view_descriptor =
@@ -368,6 +382,9 @@ TEST(StackdriverUtilsTest, MakeTimeSeriesSumDoubleAndTypes) {
368382
for (const auto& ts : time_series) {
369383
EXPECT_EQ("custom.googleapis.com/test/test_view", ts.metric().type());
370384
ASSERT_EQ(1, ts.points_size());
385+
const auto& tags = ToDataMapKeys({"foo", "bar"}, ts);
386+
EXPECT_EQ(absl::ToUnixSeconds(data.start_times().at(tags)),
387+
ts.points(0).interval().start_time().seconds());
371388
EXPECT_EQ(absl::ToUnixSeconds(data.start_time()),
372389
ts.points(0).interval().start_time().seconds());
373390
EXPECT_EQ(absl::ToUnixSeconds(data.end_time()),
@@ -400,19 +417,42 @@ TEST(StackdriverUtilsTest, MakeTimeSeriesSumInt) {
400417
.set_aggregation(opencensus::stats::Aggregation::Sum())
401418
.add_column(tag_key_1)
402419
.add_column(tag_key_2);
403-
const opencensus::stats::ViewData data = TestUtils::MakeViewData(
404-
view_descriptor, {{{"v1", "v1"}, 1.0}, {{"v1", "v2"}, 2.0}});
420+
absl::Time t1 = absl::UnixEpoch();
421+
absl::Time t2 = t1 + absl::Seconds(15);
422+
std::map<std::vector<std::string>, absl::Time> start_times{
423+
{{"v1", "v1"}, t1},
424+
{{"v1", "v2"}, t2},
425+
};
426+
427+
std::vector<TestViewValue> view_values;
428+
TestViewValue view_value1;
429+
view_value1.tag_values = {"v1", "v1"};
430+
view_value1.value = 1.0;
431+
view_value1.start_time = t1;
432+
view_values.push_back(view_value1);
433+
434+
TestViewValue view_value2;
435+
view_value2.tag_values = {"v1", "v2"};
436+
view_value2.value = 2.0;
437+
view_value2.start_time = t2;
438+
view_values.push_back(view_value2);
439+
440+
const opencensus::stats::ViewData data =
441+
TestUtils::MakeViewDataWithStartTimes(view_descriptor, view_values);
405442
const std::vector<google::monitoring::v3::TimeSeries> time_series =
406443
MakeTimeSeries(kMetricNamePrefix, kDefaultResource, view_descriptor, data,
407444
kAddTaskLabel, task);
408445

409446
for (const auto& ts : time_series) {
410447
ASSERT_EQ(1, ts.points_size());
411-
EXPECT_EQ(absl::ToUnixSeconds(data.start_time()),
448+
const auto& tags = ToDataMapKeys({"foo", "bar"}, ts);
449+
EXPECT_EQ(absl::ToUnixSeconds(start_times[tags]),
412450
ts.points(0).interval().start_time().seconds());
413451
EXPECT_EQ(absl::ToUnixSeconds(data.end_time()),
414452
ts.points(0).interval().end_time().seconds());
415453
}
454+
EXPECT_EQ(absl::ToUnixSeconds(data.start_time()),
455+
absl::ToUnixSeconds(absl::UnixEpoch()));
416456

417457
EXPECT_THAT(time_series,
418458
::testing::UnorderedElementsAre(
@@ -449,6 +489,9 @@ TEST(StackdriverUtilsTest, MakeTimeSeriesCountDouble) {
449489

450490
for (const auto& ts : time_series) {
451491
ASSERT_EQ(1, ts.points_size());
492+
const auto& tags = ToDataMapKeys({"foo", "bar"}, ts);
493+
EXPECT_EQ(absl::ToUnixSeconds(data.start_times().at(tags)),
494+
ts.points(0).interval().start_time().seconds());
452495
EXPECT_EQ(absl::ToUnixSeconds(data.start_time()),
453496
ts.points(0).interval().start_time().seconds());
454497
EXPECT_EQ(absl::ToUnixSeconds(data.end_time()),
@@ -493,6 +536,9 @@ TEST(StackdriverUtilsTest, MakeTimeSeriesDistributionDouble) {
493536

494537
for (const auto& ts : time_series) {
495538
ASSERT_EQ(1, ts.points_size());
539+
const auto& tags = ToDataMapKeys({"foo", "bar"}, ts);
540+
EXPECT_EQ(absl::ToUnixSeconds(data.start_times().at(tags)),
541+
ts.points(0).interval().start_time().seconds());
496542
EXPECT_EQ(absl::ToUnixSeconds(data.start_time()),
497543
ts.points(0).interval().start_time().seconds());
498544
EXPECT_EQ(absl::ToUnixSeconds(data.end_time()),
@@ -540,6 +586,9 @@ TEST(StackdriverUtilsTest, MakeTimeSeriesLastValueInt) {
540586

541587
for (const auto& ts : time_series) {
542588
ASSERT_EQ(1, ts.points_size());
589+
const auto& tags = ToDataMapKeys({"foo", "bar"}, ts);
590+
EXPECT_EQ(absl::ToUnixSeconds(data.start_times().at(tags)),
591+
ts.points(0).interval().start_time().seconds());
543592
EXPECT_FALSE(ts.points(0).interval().has_start_time());
544593
EXPECT_EQ(absl::ToUnixSeconds(data.end_time()),
545594
ts.points(0).interval().end_time().seconds());

opencensus/exporters/stats/stdout/internal/stdout_exporter.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ class Handler : public opencensus::stats::StatsExporter::Handler {
6161
template <typename DataValueT>
6262
void ExportViewDataImpl(
6363
const opencensus::stats::ViewDescriptor& descriptor,
64-
absl::Time start_time, absl::Time end_time,
64+
const ::opencensus::stats::ViewData::DataMap<absl::Time>& start_times,
65+
absl::Time end_time,
6566
const opencensus::stats::ViewData::DataMap<DataValueT>& data);
6667

6768
std::ostream* stream_;
@@ -74,15 +75,15 @@ void Handler::ExportViewData(
7475
const auto& view_data = datum.second;
7576
switch (view_data.type()) {
7677
case opencensus::stats::ViewData::Type::kDouble:
77-
ExportViewDataImpl(datum.first, view_data.start_time(),
78+
ExportViewDataImpl(datum.first, view_data.start_times(),
7879
view_data.end_time(), view_data.double_data());
7980
break;
8081
case opencensus::stats::ViewData::Type::kInt64:
81-
ExportViewDataImpl(datum.first, view_data.start_time(),
82+
ExportViewDataImpl(datum.first, view_data.start_times(),
8283
view_data.end_time(), view_data.int_data());
8384
break;
8485
case opencensus::stats::ViewData::Type::kDistribution:
85-
ExportViewDataImpl(datum.first, view_data.start_time(),
86+
ExportViewDataImpl(datum.first, view_data.start_times(),
8687
view_data.end_time(), view_data.distribution_data());
8788
break;
8889
}
@@ -92,20 +93,23 @@ void Handler::ExportViewData(
9293

9394
template <typename DataValueT>
9495
void Handler::ExportViewDataImpl(
95-
const opencensus::stats::ViewDescriptor& descriptor, absl::Time start_time,
96+
const opencensus::stats::ViewDescriptor& descriptor,
97+
const ::opencensus::stats::ViewData::DataMap<absl::Time>& start_times,
9698
absl::Time end_time,
9799
const opencensus::stats::ViewData::DataMap<DataValueT>& data) {
98100
if (data.empty()) {
99-
*stream_ << absl::StrCat("No data for view \"", descriptor.name(),
100-
"\" from ", absl::FormatTime(start_time), ".\n\n");
101+
*stream_ << absl::StrCat("No data for view \"", descriptor.name(), "\".\n");
101102
return;
102103
}
103104
// Build a string so we can write it in one shot to minimize crosstalk if
104105
// multiple threads write to stream_ simultaneously.
105-
std::string output = absl::StrCat("Data for view \"", descriptor.name(),
106-
"\" from ", absl::FormatTime(start_time),
107-
" to ", absl::FormatTime(end_time), ":\n");
106+
std::string output =
107+
absl::StrCat("Data for view \"", descriptor.name(), "\":\n");
108+
108109
for (const auto& row : data) {
110+
auto start_time = start_times.at(row.first);
111+
absl::StrAppend(&output, "Row data from ", absl::FormatTime(start_time),
112+
" to ", absl::FormatTime(end_time), ":\n");
109113
absl::StrAppend(&output, " ");
110114
for (int i = 0; i < descriptor.columns().size(); ++i) {
111115
absl::StrAppend(&output, descriptor.columns()[i].name(), "=",

opencensus/stats/examples/exporter_example.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ class ExampleExporter : public opencensus::stats::StatsExporter::Handler {
6060
return;
6161
}
6262
std::string output;
63-
absl::StrAppend(&output, "\nData for view \"", descriptor.name(),
64-
"\" from ", absl::FormatTime(view_data.start_time()),
65-
" to ", absl::FormatTime(view_data.end_time()), ":\n");
63+
absl::StrAppend(&output, "\nData for view \"", descriptor.name(), "\n");
6664
for (const auto& row : view_data.double_data()) {
65+
auto start_time = view_data.start_times().at(row.first);
66+
absl::StrAppend(&output, "\nRow data from ",
67+
absl::FormatTime(start_time), " to ",
68+
absl::FormatTime(view_data.end_time()), ":\n");
6769
for (int i = 0; i < descriptor.columns().size(); ++i) {
6870
absl::StrAppend(&output, descriptor.columns()[i].name(), ":",
6971
row.first[i], ", ");

opencensus/stats/internal/view_data.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ const ViewData::DataMap<Distribution>& ViewData::distribution_data() const {
8181
}
8282

8383
absl::Time ViewData::start_time() const { return impl_->start_time(); }
84+
85+
const ViewData::DataMap<absl::Time>& ViewData::start_times() const {
86+
return impl_->start_times();
87+
}
88+
8489
absl::Time ViewData::end_time() const { return end_time_; }
8590

8691
ViewData::ViewData(const ViewData& other)

opencensus/stats/internal/view_data_impl.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ ViewDataImpl::ViewDataImpl(absl::Time start_time,
6464
: aggregation_(descriptor.aggregation()),
6565
aggregation_window_(descriptor.aggregation_window_),
6666
type_(TypeForDescriptor(descriptor)),
67+
start_times_(),
6768
start_time_(start_time) {
6869
switch (type_) {
6970
case Type::kDouble: {
@@ -91,9 +92,18 @@ ViewDataImpl::ViewDataImpl(const ViewDataImpl& other, absl::Time now)
9192
type_(other.aggregation().type() == Aggregation::Type::kDistribution
9293
? Type::kDistribution
9394
: Type::kDouble),
95+
start_times_(),
9496
start_time_(std::max(other.start_time(),
9597
now - other.aggregation_window().duration())) {
98+
// Intentionally reset the source with a new start time.
99+
for (const auto& it : other.start_times_) {
100+
auto new_start_time =
101+
std::max(it.second, now - other.aggregation_window().duration());
102+
start_times_[it.first] = new_start_time;
103+
}
104+
96105
ABSL_ASSERT(aggregation_window_.type() == AggregationWindow::Type::kInterval);
106+
97107
switch (aggregation_.type()) {
98108
case Aggregation::Type::kSum:
99109
case Aggregation::Type::kCount: {
@@ -156,6 +166,7 @@ ViewDataImpl::ViewDataImpl(const ViewDataImpl& other)
156166
: aggregation_(other.aggregation_),
157167
aggregation_window_(other.aggregation_window_),
158168
type_(other.type()),
169+
start_times_(other.start_times_),
159170
start_time_(other.start_time_) {
160171
switch (type_) {
161172
case Type::kDouble: {
@@ -182,6 +193,8 @@ ViewDataImpl::ViewDataImpl(const ViewDataImpl& other)
182193

183194
void ViewDataImpl::Merge(const std::vector<std::string>& tag_values,
184195
const MeasureData& data, absl::Time now) {
196+
// A value is set here. Set a start time if it is unset.
197+
SetStartTimeIfUnset(tag_values, now);
185198
switch (type_) {
186199
case Type::kDouble: {
187200
if (aggregation_.type() == Aggregation::Type::kSum) {
@@ -256,6 +269,7 @@ ViewDataImpl::ViewDataImpl(ViewDataImpl* source, absl::Time now)
256269
: aggregation_(source->aggregation_),
257270
aggregation_window_(source->aggregation_window_),
258271
type_(source->type_),
272+
start_times_(source->start_times_),
259273
start_time_(source->start_time_) {
260274
switch (type_) {
261275
case Type::kDouble: {
@@ -280,7 +294,12 @@ ViewDataImpl::ViewDataImpl(ViewDataImpl* source, absl::Time now)
280294
break;
281295
}
282296
}
297+
// Intentionally reset the source with new start times.
283298
source->start_time_ = now;
299+
300+
for (const auto& it : source->start_times_) {
301+
source->start_times_[it.first] = now;
302+
}
284303
}
285304

286305
} // namespace stats

opencensus/stats/internal/view_data_impl.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ class ViewDataImpl {
103103
return interval_data_;
104104
}
105105

106+
// Returns a start time for each timeseries/tag map.
107+
const DataMap<absl::Time>& start_times() const { return start_times_; }
108+
109+
// DEPRECATED: Legacy start_time_ for the entire view.
110+
// This should be deleted if custom exporters are updated to
111+
// use start_times_ and stop depending on this field.
106112
absl::Time start_time() const { return start_time_; }
107113

108114
// Merges bulk data for the given tag values at 'now'. tag_values must be
@@ -120,6 +126,14 @@ class ViewDataImpl {
120126

121127
Type TypeForDescriptor(const ViewDescriptor& descriptor);
122128

129+
void SetStartTimeIfUnset(const std::vector<std::string>& tag_values,
130+
absl::Time now) {
131+
// If the time is not set.
132+
if (start_times_.find(tag_values) == start_times_.end()) {
133+
start_times_[tag_values] = now;
134+
}
135+
}
136+
123137
const Aggregation aggregation_;
124138
const AggregationWindow aggregation_window_;
125139
const Type type_;
@@ -129,6 +143,13 @@ class ViewDataImpl {
129143
DataMap<Distribution> distribution_data_;
130144
DataMap<IntervalStatsObject> interval_data_;
131145
};
146+
147+
// A start time for each timeseries/tag map.
148+
DataMap<absl::Time> start_times_;
149+
150+
// DEPRECATED: Legacy start_time_ for the entire view.
151+
// This should be deleted if custom exporters are updated to
152+
// use start_times_ and stop depending on this field
132153
absl::Time start_time_;
133154
};
134155

0 commit comments

Comments
 (0)