Skip to content

Commit 9b65c62

Browse files
Http MetricExporter split_metric_data based on metric type via HasField
1 parent 15f16a0 commit 9b65c62

File tree

1 file changed

+297
-4
lines changed
  • exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter

1 file changed

+297
-4
lines changed

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 297 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,303 @@ def export(
297297

298298
def _split_metrics_data(
299299
self,
300-
metrics_data: MetricsData,
301-
) -> Iterable[MetricsData]:
302-
# TODO
303-
return []
300+
metrics_data: pb2.MetricsData,
301+
) -> Iterable[pb2.MetricsData]:
302+
"""Splits metrics data into several metrics data
303+
304+
Args:
305+
metrics_data: metrics object based on HTTP protocol buffer definition
306+
"""
307+
batch_size: int = 0
308+
split_resource_metrics: List[pb2.ResourceMetrics] = []
309+
310+
for resource_metrics in metrics_data.resource_metrics:
311+
split_scope_metrics: List[pb2.ScopeMetrics] = []
312+
split_resource_metrics.append(
313+
pb2.ResourceMetrics(
314+
resource=resource_metrics.resource,
315+
scope_metrics=split_scope_metrics,
316+
schema_url=resource_metrics.schema_url,
317+
)
318+
)
319+
for scope_metrics in resource_metrics.scope_metrics:
320+
split_metrics: List[pb2.Metric] = []
321+
split_scope_metrics.append(
322+
pb2.ScopeMetrics(
323+
scope=scope_metrics.scope,
324+
metrics=split_metrics,
325+
schema_url=scope_metrics.schema_url,
326+
)
327+
)
328+
for metric in scope_metrics.metrics:
329+
# protobuf requires specific metrics types (e.g. Sum, Histogram)
330+
# without definition of DataPointT like gRPC
331+
332+
if metric.HasField("sum"):
333+
split_data_points = []
334+
split_metrics.append(
335+
pb2.Metric(
336+
name=metric.name,
337+
description=metric.description,
338+
unit=metric.unit,
339+
sum=pb2.Sum(
340+
data_points=split_data_points,
341+
aggregation_temporality=metric.sum.aggregation_temporality,
342+
is_monotonic=metric.sum.is_monotonic
343+
)
344+
)
345+
)
346+
for data_point in metric.sum.data_points:
347+
split_data_points.append(data_point)
348+
batch_size += 1
349+
350+
if batch_size >= self._max_export_batch_size:
351+
yield pb2.MetricsData(
352+
resource_metrics=split_resource_metrics
353+
)
354+
# Reset all the variables
355+
batch_size = 0
356+
split_data_points = []
357+
split_metrics = [
358+
pb2.Metric(
359+
name=metric.name,
360+
description=metric.description,
361+
unit=metric.unit,
362+
sum=pb2.Sum(
363+
data_points=split_data_points,
364+
aggregation_temporality=metric.sum.aggregation_temporality,
365+
is_monotonic=metric.sum.is_monotonic
366+
)
367+
)
368+
]
369+
split_scope_metrics = [
370+
pb2.ScopeMetrics(
371+
scope=scope_metrics.scope,
372+
metrics=split_metrics,
373+
schema_url=scope_metrics.schema_url,
374+
)
375+
]
376+
split_resource_metrics = [
377+
pb2.ResourceMetrics(
378+
resource=resource_metrics.resource,
379+
scope_metrics=split_scope_metrics,
380+
schema_url=resource_metrics.schema_url,
381+
)
382+
]
383+
384+
elif metric.HasField("histogram"):
385+
split_data_points = []
386+
split_metrics.append(
387+
pb2.Metric(
388+
name=metric.name,
389+
description=metric.description,
390+
unit=metric.unit,
391+
histogram=pb2.Histogram(
392+
data_points=split_data_points,
393+
aggregation_temporality=metric.histogram.aggregation_temporality,
394+
),
395+
)
396+
)
397+
for data_point in metric.histogram.data_points:
398+
split_data_points.append(data_point)
399+
batch_size += 1
400+
401+
if batch_size >= self._max_export_batch_size:
402+
yield pb2.MetricsData(
403+
resource_metrics=split_resource_metrics
404+
)
405+
# Reset all the variables
406+
batch_size = 0
407+
split_data_points = []
408+
split_metrics = [
409+
pb2.Metric(
410+
name=metric.name,
411+
description=metric.description,
412+
unit=metric.unit,
413+
histogram=pb2.Histogram(
414+
data_points=split_data_points,
415+
aggregation_temporality=metric.histogram.aggregation_temporality,
416+
)
417+
)
418+
]
419+
split_scope_metrics = [
420+
pb2.ScopeMetrics(
421+
scope=scope_metrics.scope,
422+
metrics=split_metrics,
423+
schema_url=scope_metrics.schema_url,
424+
)
425+
]
426+
split_resource_metrics = [
427+
pb2.ResourceMetrics(
428+
resource=resource_metrics.resource,
429+
scope_metrics=split_scope_metrics,
430+
schema_url=resource_metrics.schema_url,
431+
)
432+
]
433+
434+
elif metric.HasField("exponential_histogram"):
435+
split_data_points = []
436+
split_metrics.append(
437+
pb2.Metric(
438+
name=metric.name,
439+
description=metric.description,
440+
unit=metric.unit,
441+
exponential_histogram=pb2.ExponentialHistogram(
442+
data_points=split_data_points,
443+
aggregation_temporality=metric.exponential_histogram.aggregation_temporality,
444+
),
445+
)
446+
)
447+
for data_point in metric.exponential_histogram.data_points:
448+
split_data_points.append(data_point)
449+
batch_size += 1
450+
451+
if batch_size >= self._max_export_batch_size:
452+
yield pb2.MetricsData(
453+
resource_metrics=split_resource_metrics
454+
)
455+
# Reset all the variables
456+
batch_size = 0
457+
split_data_points = []
458+
split_metrics = [
459+
pb2.Metric(
460+
name=metric.name,
461+
description=metric.description,
462+
unit=metric.unit,
463+
exponential_histogram=pb2.ExponentialHistogram(
464+
data_points=split_data_points,
465+
aggregation_temporality=metric.exponential_histogram.aggregation_temporality,
466+
)
467+
)
468+
]
469+
split_scope_metrics = [
470+
pb2.ScopeMetrics(
471+
scope=scope_metrics.scope,
472+
metrics=split_metrics,
473+
schema_url=scope_metrics.schema_url,
474+
)
475+
]
476+
split_resource_metrics = [
477+
pb2.ResourceMetrics(
478+
resource=resource_metrics.resource,
479+
scope_metrics=split_scope_metrics,
480+
schema_url=resource_metrics.schema_url,
481+
)
482+
]
483+
484+
elif metric.HasField("gauge"):
485+
split_data_points = []
486+
split_metrics.append(
487+
pb2.Metric(
488+
name=metric.name,
489+
description=metric.description,
490+
unit=metric.unit,
491+
gauge=pb2.Gauge(
492+
data_points=split_data_points,
493+
)
494+
)
495+
)
496+
for data_point in metric.gauge.data_points:
497+
split_data_points.append(data_point)
498+
batch_size += 1
499+
500+
if batch_size >= self._max_export_batch_size:
501+
yield pb2.MetricsData(
502+
resource_metrics=split_resource_metrics
503+
)
504+
# Reset all the variables
505+
batch_size = 0
506+
split_data_points = []
507+
split_metrics = [
508+
pb2.Metric(
509+
name=metric.name,
510+
description=metric.description,
511+
unit=metric.unit,
512+
gauge=pb2.Gauge(
513+
data_points=split_data_points,
514+
),
515+
)
516+
]
517+
split_scope_metrics = [
518+
pb2.ScopeMetrics(
519+
scope=scope_metrics.scope,
520+
metrics=split_metrics,
521+
schema_url=scope_metrics.schema_url,
522+
)
523+
]
524+
split_resource_metrics = [
525+
pb2.ResourceMetrics(
526+
resource=resource_metrics.resource,
527+
scope_metrics=split_scope_metrics,
528+
schema_url=resource_metrics.schema_url,
529+
)
530+
]
531+
532+
elif metric.HasField("summary"):
533+
split_data_points = []
534+
split_metrics.append(
535+
pb2.Metric(
536+
name=metric.name,
537+
description=metric.description,
538+
unit=metric.unit,
539+
summary=pb2.Summary(
540+
data_points=split_data_points,
541+
)
542+
)
543+
)
544+
for data_point in metric.summary.data_points:
545+
split_data_points.append(data_point)
546+
batch_size += 1
547+
548+
if batch_size >= self._max_export_batch_size:
549+
yield pb2.MetricsData(
550+
resource_metrics=split_resource_metrics
551+
)
552+
# Reset all the variables
553+
batch_size = 0
554+
split_data_points = []
555+
split_metrics = [
556+
pb2.Metric(
557+
name=metric.name,
558+
description=metric.description,
559+
unit=metric.unit,
560+
summary=pb2.Summary(
561+
data_points=split_data_points,
562+
),
563+
)
564+
]
565+
split_scope_metrics = [
566+
pb2.ScopeMetrics(
567+
scope=scope_metrics.scope,
568+
metrics=split_metrics,
569+
schema_url=scope_metrics.schema_url,
570+
)
571+
]
572+
split_resource_metrics = [
573+
pb2.ResourceMetrics(
574+
resource=resource_metrics.resource,
575+
scope_metrics=split_scope_metrics,
576+
schema_url=resource_metrics.schema_url,
577+
)
578+
]
579+
580+
else:
581+
_logger.warning("Tried to split and export an unsupported metric type.")
582+
583+
if not split_data_points:
584+
# If data_points is empty remove the whole metric
585+
split_metrics.pop()
586+
587+
if not split_metrics:
588+
# If metrics is empty remove the whole scope_metrics
589+
split_scope_metrics.pop()
590+
591+
if not split_scope_metrics:
592+
# If scope_metrics is empty remove the whole resource_metrics
593+
split_resource_metrics.pop()
594+
595+
if batch_size > 0:
596+
yield pb2.MetricsData(resource_metrics=split_resource_metrics)
304597

305598
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
306599
pass

0 commit comments

Comments
 (0)