Skip to content

Commit df6b365

Browse files
Move grpc split_metrics_data to common
1 parent e46db88 commit df6b365

File tree

3 files changed

+406
-79
lines changed
  • exporter
    • opentelemetry-exporter-otlp-proto-common
    • opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter

3 files changed

+406
-79
lines changed

exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16+
from dataclasses import replace
1617
import logging
1718
from os import environ
1819
from typing import Dict, List
@@ -171,6 +172,88 @@ def _get_aggregation(
171172

172173
return instrument_class_aggregation
173174

175+
def split_metrics_data(
176+
metrics_data: MetricsData,
177+
max_export_batch_size: int,
178+
) -> Iterable[MetricsData]:
179+
batch_size: int = 0
180+
split_resource_metrics: List[ResourceMetrics] = []
181+
182+
for resource_metrics in metrics_data.resource_metrics:
183+
split_scope_metrics: List[ScopeMetrics] = []
184+
split_resource_metrics.append(
185+
replace(
186+
resource_metrics,
187+
scope_metrics=split_scope_metrics,
188+
)
189+
)
190+
for scope_metrics in resource_metrics.scope_metrics:
191+
split_metrics: List[Metric] = []
192+
split_scope_metrics.append(
193+
replace(
194+
scope_metrics,
195+
metrics=split_metrics,
196+
)
197+
)
198+
for metric in scope_metrics.metrics:
199+
split_data_points: List[DataPointT] = []
200+
split_metrics.append(
201+
replace(
202+
metric,
203+
data=replace(
204+
metric.data,
205+
data_points=split_data_points,
206+
),
207+
)
208+
)
209+
210+
for data_point in metric.data.data_points:
211+
split_data_points.append(data_point)
212+
batch_size += 1
213+
214+
if batch_size >= max_export_batch_size:
215+
yield MetricsData(
216+
resource_metrics=split_resource_metrics
217+
)
218+
# Reset all the variables
219+
batch_size = 0
220+
split_data_points = []
221+
split_metrics = [
222+
replace(
223+
metric,
224+
data=replace(
225+
metric.data,
226+
data_points=split_data_points,
227+
),
228+
)
229+
]
230+
split_scope_metrics = [
231+
replace(
232+
scope_metrics,
233+
metrics=split_metrics,
234+
)
235+
]
236+
split_resource_metrics = [
237+
replace(
238+
resource_metrics,
239+
scope_metrics=split_scope_metrics,
240+
)
241+
]
242+
243+
if not split_data_points:
244+
# If data_points is empty remove the whole metric
245+
split_metrics.pop()
246+
247+
if not split_metrics:
248+
# If metrics is empty remove the whole scope_metrics
249+
split_scope_metrics.pop()
250+
251+
if not split_scope_metrics:
252+
# If scope_metrics is empty remove the whole resource_metrics
253+
split_resource_metrics.pop()
254+
255+
if batch_size > 0:
256+
yield MetricsData(resource_metrics=split_resource_metrics)
174257

175258
class EncodingException(Exception):
176259
"""

0 commit comments

Comments
 (0)