Skip to content

Commit 5a6bcfd

Browse files
authored
[Geneva Exporter] Add support for exporting metrics to different combinations of account and namespace (#280)
1 parent 711814b commit 5a6bcfd

File tree

5 files changed

+263
-120
lines changed

5 files changed

+263
-120
lines changed

exporters/geneva/include/opentelemetry/exporters/geneva/metrics/exporter.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ constexpr uint64_t kSecondsToUnixTime =
3333
// 1601-01-01T00:00:00Z and UNIX/Linux epoch
3434
// (1970-01-01T00:00:00Z)
3535

36+
const std::string kAttributeNamespaceKey = "_microsoft_metrics_namespace";
37+
const std::string kAttributeAccountKey = "_microsoft_metrics_account";
38+
3639
using ValueType = nostd::variant<int64_t, double>;
3740

3841
/**
@@ -65,13 +68,8 @@ class Exporter final : public opentelemetry::sdk::metrics::PushMetricExporter {
6568
std::unique_ptr<DataTransport> data_transport_;
6669

6770
// metrics storage
68-
char buffer_non_histogram_[kBufferSize];
69-
char buffer_histogram_[kBufferSize];
70-
uint64_t buffer_index_non_histogram_;
71-
uint64_t buffer_index_histogram_;
71+
char buffer_[kBufferSize];
7272

73-
size_t InitializeBufferForNonHistogramData();
74-
size_t InitiaizeBufferForHistogramData();
7573
size_t SerializeNonHistogramMetrics(sdk::metrics::AggregationType,
7674
MetricsEventType,
7775
const sdk::metrics::ValueType &,

exporters/geneva/src/exporter.cc

Lines changed: 127 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ namespace geneva {
1919
namespace metrics {
2020
Exporter::Exporter(const ExporterOptions &options)
2121
: options_(options), connection_string_parser_(options_.connection_string),
22-
data_transport_{nullptr}, buffer_index_histogram_(0),
23-
buffer_index_non_histogram_(0) {
22+
data_transport_{nullptr} {
2423
if (connection_string_parser_.IsValid()) {
2524
if (connection_string_parser_.transport_protocol_ ==
2625
TransportProtocol::kUNIX) {
@@ -43,11 +42,6 @@ Exporter::Exporter(const ExporterOptions &options)
4342
is_shutdown_ = true;
4443
return;
4544
}
46-
// Initialize non-histogram buffer
47-
buffer_index_non_histogram_ = InitializeBufferForNonHistogramData();
48-
49-
// Initialize histogram buffer
50-
buffer_index_histogram_ = InitiaizeBufferForHistogramData();
5145
}
5246

5347
sdk::metrics::AggregationTemporality Exporter::GetAggregationTemporality(
@@ -106,7 +100,7 @@ opentelemetry::sdk::common::ExportResult Exporter::Export(
106100
sdk::metrics::AggregationType::kSum, event_type, new_value,
107101
metric_data.end_ts, metric_data.instrument_descriptor.name_,
108102
point_data_with_attributes.attributes);
109-
data_transport_->Send(event_type, buffer_non_histogram_,
103+
data_transport_->Send(event_type, buffer_,
110104
body_length + kBinaryHeaderSize);
111105

112106
} else if (nostd::holds_alternative<sdk::metrics::LastValuePointData>(
@@ -127,7 +121,7 @@ opentelemetry::sdk::common::ExportResult Exporter::Export(
127121
sdk::metrics::AggregationType::kLastValue, event_type, new_value,
128122
metric_data.end_ts, metric_data.instrument_descriptor.name_,
129123
point_data_with_attributes.attributes);
130-
data_transport_->Send(event_type, buffer_non_histogram_,
124+
data_transport_->Send(event_type, buffer_,
131125
body_length + kBinaryHeaderSize);
132126
} else if (nostd::holds_alternative<sdk::metrics::HistogramPointData>(
133127
point_data_with_attributes.point_data)) {
@@ -156,7 +150,7 @@ opentelemetry::sdk::common::ExportResult Exporter::Export(
156150
.counts_,
157151
metric_data.end_ts, metric_data.instrument_descriptor.name_,
158152
point_data_with_attributes.attributes);
159-
data_transport_->Send(event_type, buffer_histogram_,
153+
data_transport_->Send(event_type, buffer_,
160154
body_length + kBinaryHeaderSize);
161155
}
162156
}
@@ -175,57 +169,75 @@ bool Exporter::Shutdown(std::chrono::microseconds timeout) noexcept {
175169
return true;
176170
}
177171

178-
size_t Exporter::InitializeBufferForNonHistogramData() {
172+
size_t Exporter::SerializeNonHistogramMetrics(
173+
sdk::metrics::AggregationType agg_type, MetricsEventType event_type,
174+
const sdk::metrics::ValueType &value, common::SystemTimestamp ts,
175+
const std::string &metric_name,
176+
const sdk::metrics::PointAttributes &attributes) {
177+
179178
// The buffer format is as follows:
180179
// -- BinaryHeader
181180
// -- MetricPayload
182181
// -- Variable length content
183182

184183
// Leave enough space for the header and fixed payload
185184
auto bufferIndex = kBinaryHeaderSize + kMetricPayloadSize;
186-
SerializeString(buffer_non_histogram_, bufferIndex,
187-
connection_string_parser_.account_);
188-
SerializeString(buffer_non_histogram_, bufferIndex,
189-
connection_string_parser_.namespace_);
190-
return bufferIndex;
191-
}
192185

193-
size_t Exporter::InitiaizeBufferForHistogramData() {
194-
// The buffer format is as follows:
195-
// -- BinaryHeader
196-
// -- ExternalPayload
197-
// -- Variable length content
186+
auto account_name = connection_string_parser_.account_;
187+
auto account_namespace = connection_string_parser_.namespace_;
198188

199-
// Leave enough space for the header and fixed payload
200-
auto bufferIndex = kBinaryHeaderSize + kExternalPayloadSize;
201-
SerializeString(buffer_histogram_, bufferIndex,
202-
connection_string_parser_.account_);
203-
SerializeString(buffer_histogram_, bufferIndex,
204-
connection_string_parser_.namespace_);
205-
return bufferIndex;
206-
}
189+
// try reading namespace and/or account from attributes
190+
// TBD = This can be avoided by migrating to the
191+
// TLV binary format
192+
for (const auto &kv : attributes) {
193+
if (kv.first == kAttributeAccountKey){
194+
account_name = AttributeValueToString(kv.second);
195+
}
196+
else if (kv.first == kAttributeNamespaceKey) {
197+
account_namespace = AttributeValueToString(kv.second);
198+
}
199+
}
207200

208-
size_t Exporter::SerializeNonHistogramMetrics(
209-
sdk::metrics::AggregationType agg_type, MetricsEventType event_type,
210-
const sdk::metrics::ValueType &value, common::SystemTimestamp ts,
211-
const std::string &metric_name,
212-
const sdk::metrics::PointAttributes &attributes) {
213-
auto bufferIndex = buffer_index_non_histogram_;
214-
SerializeString(buffer_non_histogram_, bufferIndex, metric_name);
201+
// account name
202+
SerializeString(buffer_, bufferIndex, account_name);
203+
// namespace
204+
SerializeString(buffer_, bufferIndex, account_namespace);
205+
// metric name
206+
SerializeString(buffer_, bufferIndex, metric_name);
207+
208+
uint16_t attributes_size = 0;
215209
for (const auto &kv : attributes) {
216210
if (kv.first.size() > kMaxDimensionNameSize) {
217-
LOG_WARN("Dimension name limit overflow: %s Limit %d", kv.first.c_str(),
211+
LOG_WARN("Dimension name limit overflow: %s Limit: %d", kv.first.c_str(),
218212
kMaxDimensionNameSize);
219213
continue;
220214
}
221-
SerializeString(buffer_non_histogram_, bufferIndex, kv.first);
215+
if (kv.first == kAttributeAccountKey ||
216+
kv.first == kAttributeNamespaceKey)
217+
{
218+
// custom namespace and account name should't be exported
219+
continue;
220+
}
221+
attributes_size++;
222+
SerializeString(buffer_, bufferIndex, kv.first);
222223
}
223224
for (const auto &kv : attributes) {
225+
if (kv.first.size() > kMaxDimensionNameSize) {
226+
LOG_WARN("Dimension name limit overflow: %s Limit: %d", kv.first.c_str(),
227+
kMaxDimensionNameSize);
228+
continue;
229+
}
230+
if (kv.first == kAttributeAccountKey ||
231+
kv.first == kAttributeNamespaceKey)
232+
{
233+
// custom namespace and account name should't be exported
234+
continue;
235+
}
224236
auto attr_value = AttributeValueToString(kv.second);
225-
SerializeString(buffer_non_histogram_, bufferIndex, attr_value);
237+
SerializeString(buffer_, bufferIndex, attr_value);
226238
}
227239
// length zero for auto-pilot
228-
SerializeInt<uint16_t>(buffer_non_histogram_, bufferIndex, 0);
240+
SerializeInt<uint16_t>(buffer_, bufferIndex, 0);
229241

230242
// get final size of payload to be added in front of buffer
231243
uint16_t body_length = bufferIndex - kBinaryHeaderSize;
@@ -234,36 +246,36 @@ size_t Exporter::SerializeNonHistogramMetrics(
234246
bufferIndex = 0;
235247

236248
// event_type
237-
SerializeInt<uint16_t>(buffer_non_histogram_, bufferIndex,
249+
SerializeInt<uint16_t>(buffer_, bufferIndex,
238250
static_cast<uint16_t>(event_type));
239251

240252
// body length
241-
SerializeInt<uint16_t>(buffer_non_histogram_, bufferIndex,
253+
SerializeInt<uint16_t>(buffer_, bufferIndex,
242254
static_cast<uint16_t>(body_length));
243255

244256
// count of dimensions.
245-
SerializeInt<uint16_t>(buffer_non_histogram_, bufferIndex,
246-
static_cast<uint16_t>(attributes.size()));
257+
SerializeInt<uint16_t>(buffer_, bufferIndex,
258+
static_cast<uint16_t>(attributes_size));
247259

248260
// reserverd word (2 bytes)
249-
SerializeInt<uint16_t>(buffer_non_histogram_, bufferIndex, 0);
261+
SerializeInt<uint16_t>(buffer_, bufferIndex, 0);
250262

251263
// reserved word (4 bytes)
252-
SerializeInt<uint32_t>(buffer_non_histogram_, bufferIndex, 0);
264+
SerializeInt<uint32_t>(buffer_, bufferIndex, 0);
253265

254266
// timestamp utc (8 bytes)
255267
auto windows_ticks = UnixTimeToWindowsTicks(
256268
std::chrono::duration_cast<std::chrono::duration<std::uint64_t>>(
257269
ts.time_since_epoch())
258270
.count());
259271

260-
SerializeInt<uint64_t>(buffer_non_histogram_, bufferIndex, windows_ticks);
272+
SerializeInt<uint64_t>(buffer_, bufferIndex, windows_ticks);
261273
if (event_type == MetricsEventType::Uint64Metric) {
262-
SerializeInt<uint64_t>(buffer_non_histogram_, bufferIndex,
274+
SerializeInt<uint64_t>(buffer_, bufferIndex,
263275
static_cast<uint64_t>(nostd::get<int64_t>(value)));
264276
} else if (event_type == MetricsEventType::DoubleMetric) {
265277
SerializeInt<uint64_t>(
266-
buffer_non_histogram_, bufferIndex,
278+
buffer_, bufferIndex,
267279
*(reinterpret_cast<const uint64_t *>(&(nostd::get<double>(value)))));
268280
} else {
269281
// Won't reach here.
@@ -279,40 +291,84 @@ size_t Exporter::SerializeHistogramMetrics(
279291
common::SystemTimestamp ts, const std::string &metric_name,
280292
const sdk::metrics::PointAttributes &attributes) {
281293

282-
auto bufferIndex = buffer_index_histogram_;
294+
// The buffer format is as follows:
295+
// -- BinaryHeader
296+
// -- ExternalPayload
297+
// -- Variable length content
298+
299+
// Leave enough space for the header and fixed payload
300+
auto bufferIndex = kBinaryHeaderSize + kExternalPayloadSize;
301+
302+
auto account_name = connection_string_parser_.account_;
303+
auto account_namespace = connection_string_parser_.namespace_;
304+
305+
// try reading namespace and/or account from attributes
306+
// TODO: This can be avoided by migrating to the
307+
// TLV binary format
308+
for (const auto &kv : attributes) {
309+
if (kv.first == kAttributeAccountKey){
310+
account_name = AttributeValueToString(kv.second);
311+
}
312+
else if (kv.first == kAttributeNamespaceKey) {
313+
account_namespace = AttributeValueToString(kv.second);
314+
}
315+
}
316+
317+
// account name
318+
SerializeString(buffer_, bufferIndex, account_name);
319+
// namespace
320+
SerializeString(buffer_, bufferIndex, account_namespace);
283321
// metric name
284-
SerializeString(buffer_histogram_, bufferIndex, metric_name);
322+
SerializeString(buffer_, bufferIndex, metric_name);
285323

324+
uint16_t attributes_size = 0;
286325
// dimentions - name
287326
for (const auto &kv : attributes) {
288327
if (kv.first.size() > kMaxDimensionNameSize) {
289328
LOG_WARN("Dimension name limit overflow: %s Limit: %d", kv.first.c_str(),
290329
kMaxDimensionNameSize);
291330
continue;
292331
}
293-
SerializeString(buffer_histogram_, bufferIndex, kv.first);
332+
if (kv.first == kAttributeAccountKey ||
333+
kv.first == kAttributeNamespaceKey)
334+
{
335+
// custom namespace and account name should't be exported
336+
continue;
337+
}
338+
attributes_size++;
339+
SerializeString(buffer_, bufferIndex, kv.first);
294340
}
295341

296342
// dimentions - value
297343
for (const auto &kv : attributes) {
344+
if (kv.first.size() > kMaxDimensionNameSize) {
345+
// warning is already logged earlier, no logging again
346+
continue;
347+
}
348+
if (kv.first == kAttributeAccountKey ||
349+
kv.first == kAttributeNamespaceKey)
350+
{
351+
// custom namespace and account name should't be exported
352+
continue;
353+
}
298354
auto attr_value = AttributeValueToString(kv.second);
299-
SerializeString(buffer_histogram_, bufferIndex, attr_value);
355+
SerializeString(buffer_, bufferIndex, attr_value);
300356
}
301357

302358
// two bytes padding for auto-pilot
303-
SerializeInt<uint16_t>(buffer_histogram_, bufferIndex, 0);
359+
SerializeInt<uint16_t>(buffer_, bufferIndex, 0);
304360

305361
// version - set as 0
306-
SerializeInt<uint8_t>(buffer_histogram_, bufferIndex, 0);
362+
SerializeInt<uint8_t>(buffer_, bufferIndex, 0);
307363

308364
// Meta-data
309365
// Value-count pairs is associated with the constant value of 2 in the
310366
// distribution_type enum.
311-
SerializeInt<uint8_t>(buffer_histogram_, bufferIndex, 2);
367+
SerializeInt<uint8_t>(buffer_, bufferIndex, 2);
312368

313369
// Keep a position to record how many buckets are added
314370
auto itemsWrittenIndex = bufferIndex;
315-
SerializeInt<uint16_t>(buffer_histogram_, bufferIndex, 0);
371+
SerializeInt<uint16_t>(buffer_, bufferIndex, 0);
316372

317373
// bucket values
318374
size_t index = 0;
@@ -321,9 +377,9 @@ size_t Exporter::SerializeHistogramMetrics(
321377
MetricsEventType::ExternallyAggregatedUlongDistributionMetric) {
322378
for (auto boundary : boundaries) {
323379
if (counts[index] > 0) {
324-
SerializeInt<uint64_t>(buffer_histogram_, bufferIndex,
380+
SerializeInt<uint64_t>(buffer_, bufferIndex,
325381
static_cast<uint64_t>(boundary));
326-
SerializeInt<uint32_t>(buffer_histogram_, bufferIndex,
382+
SerializeInt<uint32_t>(buffer_, bufferIndex,
327383
(uint32_t)(counts[index]));
328384
bucket_count++;
329385
}
@@ -332,7 +388,7 @@ size_t Exporter::SerializeHistogramMetrics(
332388
}
333389

334390
// write bucket count to previous preserved index
335-
SerializeInt<uint16_t>(buffer_histogram_, itemsWrittenIndex, bucket_count);
391+
SerializeInt<uint16_t>(buffer_, itemsWrittenIndex, bucket_count);
336392

337393
// get final size of payload to be added in front of buffer
338394
uint16_t body_length = bufferIndex - kBinaryHeaderSize;
@@ -341,42 +397,42 @@ size_t Exporter::SerializeHistogramMetrics(
341397
bufferIndex = 0;
342398

343399
// event_type
344-
SerializeInt<uint16_t>(buffer_histogram_, bufferIndex,
400+
SerializeInt<uint16_t>(buffer_, bufferIndex,
345401
static_cast<uint16_t>(event_type));
346402

347403
// body length
348-
SerializeInt<uint16_t>(buffer_histogram_, bufferIndex,
404+
SerializeInt<uint16_t>(buffer_, bufferIndex,
349405
static_cast<uint16_t>(body_length));
350406

351407
// count of dimensions.
352-
SerializeInt<uint16_t>(buffer_histogram_, bufferIndex,
353-
static_cast<uint16_t>(attributes.size()));
408+
SerializeInt<uint16_t>(buffer_, bufferIndex,
409+
static_cast<uint16_t>(attributes_size));
354410

355411
// reserverd word (2 bytes)
356-
SerializeInt<uint16_t>(buffer_histogram_, bufferIndex, 0);
412+
SerializeInt<uint16_t>(buffer_, bufferIndex, 0);
357413

358414
// count of events
359-
SerializeInt<uint32_t>(buffer_histogram_, bufferIndex, count);
415+
SerializeInt<uint32_t>(buffer_, bufferIndex, count);
360416

361417
// timestamp utc (8 bytes)
362418
auto windows_ticks = UnixTimeToWindowsTicks(
363419
std::chrono::duration_cast<std::chrono::duration<std::uint64_t>>(
364420
ts.time_since_epoch())
365421
.count());
366-
SerializeInt<uint64_t>(buffer_histogram_, bufferIndex, windows_ticks);
422+
SerializeInt<uint64_t>(buffer_, bufferIndex, windows_ticks);
367423

368424
// sum, min, max
369425

370426
if (event_type ==
371427
MetricsEventType::ExternallyAggregatedUlongDistributionMetric) {
372428
// sum
373-
SerializeInt<uint64_t>(buffer_histogram_, bufferIndex,
429+
SerializeInt<uint64_t>(buffer_, bufferIndex,
374430
static_cast<uint64_t>(nostd::get<int64_t>(sum)));
375431
// min
376-
SerializeInt<uint64_t>(buffer_histogram_, bufferIndex,
432+
SerializeInt<uint64_t>(buffer_, bufferIndex,
377433
static_cast<uint64_t>(nostd::get<int64_t>(min)));
378434
// max
379-
SerializeInt<uint64_t>(buffer_histogram_, bufferIndex,
435+
SerializeInt<uint64_t>(buffer_, bufferIndex,
380436
static_cast<uint64_t>(nostd::get<int64_t>(max)));
381437
} else {
382438
// won't reach here.

0 commit comments

Comments
 (0)