Skip to content

Commit 6b91fcb

Browse files
WIP rm split_*, add direct new_* checks and updates
1 parent e4b26ed commit 6b91fcb

File tree

1 file changed

+47
-45
lines changed
  • exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter

1 file changed

+47
-45
lines changed

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 47 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -305,35 +305,32 @@ def _split_metrics_data(
305305
metrics_data: metrics object based on HTTP protocol buffer definition
306306
"""
307307
batch_size: int = 0
308-
split_resource_metrics: List[pb2.ResourceMetrics] = []
308+
# TODO: Account for multiple ResourceMetrics in original MetricsData
309309

310310
for resource_metrics in metrics_data.resource_metrics:
311-
split_scope_metrics: List[pb2.ScopeMetrics] = []
312311
new_resource_metrics = pb2.ResourceMetrics(
313312
resource=resource_metrics.resource,
314313
scope_metrics=[],
315314
schema_url=resource_metrics.schema_url,
316315
)
317-
split_resource_metrics.append(new_resource_metrics)
318316

319317
for scope_metrics in resource_metrics.scope_metrics:
320-
split_metrics: List[pb2.Metric] = []
321318
new_scope_metrics = pb2.ScopeMetrics(
322319
scope=scope_metrics.scope,
323320
metrics=[],
324321
schema_url=scope_metrics.schema_url,
325322
)
326-
split_scope_metrics.append(new_scope_metrics)
323+
added_to_resource = False
327324

328325
for metric in scope_metrics.metrics:
329326
# protobuf specifies metrics types (e.g. Sum, Histogram)
330327
# without definition of DataPointT like gRPC
331-
metric_data_points = []
328+
current_data_points = []
332329
new_metric = None
333-
split_data_points = []
330+
added_to_scope = False
334331

335332
if metric.HasField("sum"):
336-
metric_data_points = metric.sum.data_points
333+
current_data_points = metric.sum.data_points
337334
new_metric = pb2.Metric(
338335
name=metric.name,
339336
description=metric.description,
@@ -345,7 +342,7 @@ def _split_metrics_data(
345342
)
346343
)
347344
elif metric.HasField("histogram"):
348-
metric_data_points = metric.histogram.data_points
345+
current_data_points = metric.histogram.data_points
349346
new_metric = pb2.Metric(
350347
name=metric.name,
351348
description=metric.description,
@@ -356,7 +353,7 @@ def _split_metrics_data(
356353
),
357354
)
358355
elif metric.HasField("exponential_histogram"):
359-
metric_data_points = metric.exponential_histogram.data_points
356+
current_data_points = metric.exponential_histogram.data_points
360357
new_metric = pb2.Metric(
361358
name=metric.name,
362359
description=metric.description,
@@ -367,7 +364,7 @@ def _split_metrics_data(
367364
),
368365
)
369366
elif metric.HasField("gauge"):
370-
metric_data_points = metric.gauge.data_points
367+
current_data_points = metric.gauge.data_points
371368
new_metric = pb2.Metric(
372369
name=metric.name,
373370
description=metric.description,
@@ -377,7 +374,7 @@ def _split_metrics_data(
377374
)
378375
)
379376
elif metric.HasField("summary"):
380-
metric_data_points = metric.summary.data_points
377+
current_data_points = metric.summary.data_points
381378
new_metric = pb2.Metric(
382379
name=metric.name,
383380
description=metric.description,
@@ -389,12 +386,7 @@ def _split_metrics_data(
389386
else:
390387
_logger.warning("Tried to split and export an unsupported metric type.")
391388

392-
if new_metric is not None:
393-
split_metrics.append(new_metric)
394-
395-
for data_point in metric_data_points:
396-
split_data_points.append(data_point)
397-
389+
for data_point in current_data_points:
398390
if metric.HasField("sum"):
399391
new_metric.sum.data_points.append(data_point)
400392
elif metric.HasField("histogram"):
@@ -412,16 +404,18 @@ def _split_metrics_data(
412404
# Update scope metrics, resource metrics after all data_points (so far) added to metric
413405
# and yield this batch
414406
new_scope_metrics.metrics.append(new_metric)
407+
added_to_scope = True
415408
new_resource_metrics.scope_metrics.append(new_scope_metrics)
409+
added_to_resource = True
416410

417411
yield pb2.MetricsData(
418-
resource_metrics=split_resource_metrics
412+
resource_metrics=[new_resource_metrics]
419413
)
420414

421415
# Reset all the variables with current metrics_data position
422-
# minus yielded datapoints
416+
# minus yielded data_points. Need to clear data_points and keep metric
417+
# to avoid duplicate data_point export
423418
batch_size = 0
424-
split_data_points = []
425419

426420
new_metric = None
427421
if metric.HasField("sum"):
@@ -476,45 +470,53 @@ def _split_metrics_data(
476470
else:
477471
_logger.warning("Tried to split and export an unsupported metric type.")
478472

479-
if new_metric is not None:
480-
split_metrics = [new_metric]
481-
else:
482-
split_metrics = []
483-
484473
new_scope_metrics = pb2.ScopeMetrics(
485474
scope=scope_metrics.scope,
486-
metrics=split_metrics,
475+
metrics=[new_metric],
487476
schema_url=scope_metrics.schema_url,
488477
)
489-
split_scope_metrics = [new_scope_metrics]
490-
491478
new_resource_metrics = pb2.ResourceMetrics(
492479
resource=resource_metrics.resource,
493-
scope_metrics=split_scope_metrics,
480+
scope_metrics=[new_scope_metrics],
494481
schema_url=resource_metrics.schema_url,
495482
)
496-
split_resource_metrics = [new_resource_metrics]
497483

498-
if not split_data_points:
499-
# If data_points is empty remove the whole metric
500-
split_metrics.pop()
501-
else:
502-
# Update scope metrics after all data_points added to metric
484+
has_data_points = False
485+
if new_metric.HasField("sum"):
486+
if new_metric.sum.data_points:
487+
has_data_points = True
488+
elif new_metric.HasField("histogram"):
489+
if new_metric.histogram.data_points:
490+
has_data_points = True
491+
elif new_metric.HasField("exponential_histogram"):
492+
if new_metric.exponential_histogram.data_points:
493+
has_data_points = True
494+
elif new_metric.HasField("gauge"):
495+
if new_metric.gauge.data_points:
496+
has_data_points = True
497+
elif new_metric.HasField("summary"):
498+
if new_metric.summary.data_points:
499+
has_data_points = True
500+
501+
# If already added as part of previous batch
502+
# but data_points empty, remove from scope and resource
503+
if added_to_scope and not has_data_points:
504+
del new_scope_metrics.metrics[-1]
505+
if added_to_resource and not has_data_points:
506+
del new_resource_metrics.scope_metrics[-1]
507+
508+
# If not added to scope and has data_points, add
509+
if not added_to_scope and has_data_points:
503510
new_scope_metrics.metrics.append(new_metric)
504511

505-
if not split_metrics:
506-
# If metrics is empty remove the whole scope_metrics
507-
split_scope_metrics.pop()
508-
else:
509-
# Update resource_metrics after all scope_metrics updated with metrics, data_points
512+
if new_scope_metrics.metrics:
510513
new_resource_metrics.scope_metrics.append(new_scope_metrics)
511514

512-
if not split_scope_metrics:
513-
# If scope_metrics is empty remove the whole resource_metrics
514-
split_resource_metrics.pop()
515+
# TODO: Account for multiple ResourceMetrics in original MetricsData
515516

516517
if batch_size > 0:
517-
yield pb2.MetricsData(resource_metrics=split_resource_metrics)
518+
# TODO: Account for multiple ResourceMetrics in original MetricsData
519+
yield pb2.MetricsData(resource_metrics=[new_resource_metrics])
518520

519521
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
520522
pass

0 commit comments

Comments
 (0)