|
12 | 12 | # limitations under the License. |
13 | 13 | from __future__ import annotations |
14 | 14 |
|
| 15 | +from dataclasses import replace |
15 | 16 | import gzip |
16 | 17 | import logging |
17 | 18 | import zlib |
@@ -116,7 +117,29 @@ def __init__( |
116 | 117 | preferred_temporality: dict[type, AggregationTemporality] |
117 | 118 | | None = None, |
118 | 119 | preferred_aggregation: dict[type, Aggregation] | None = None, |
| 120 | + max_export_batch_size: int | None = None, |
119 | 121 | ): |
| 122 | + """OTLP HTTP metrics exporter |
| 123 | +
|
| 124 | + Args: |
| 125 | + endpoint: Target URL to which the exporter is going to send metrics |
| 126 | + certificate_file: Path to the certificate file to use for any TLS |
| 127 | + client_key_file: Path to the client key file to use for any TLS |
| 128 | + client_certificate_file: Path to the client certificate file to use for any TLS |
| 129 | + headers: Headers to be sent with HTTP requests at export |
| 130 | + timeout: Timeout in seconds for export |
| 131 | + compression: Compression to use; one of none, gzip, deflate |
| 132 | + session: Requests session to use at export |
| 133 | + preferred_temporality: Map of preferred temporality for each metric type. |
| 134 | + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what |
| 135 | + preferred temporality is. |
| 136 | + preferred_aggregation: Map of preferred aggregation for each metric type. |
| 137 | + See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what |
| 138 | + preferred aggregation is. |
| 139 | + max_export_batch_size: Maximum number of data points to export in a single request. |
| 140 | + If not set there is no limit to the number of data points in a request. |
| 141 | + If it is set and the number of data points exceeds the max, the request will be split. |
| 142 | + """ |
120 | 143 | self._endpoint = endpoint or environ.get( |
121 | 144 | OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, |
122 | 145 | _append_metrics_path( |
@@ -165,6 +188,7 @@ def __init__( |
165 | 188 | self._common_configuration( |
166 | 189 | preferred_temporality, preferred_aggregation |
167 | 190 | ) |
| 191 | + self._max_export_batch_size: int | None = max_export_batch_size |
168 | 192 |
|
169 | 193 | def _export(self, serialized_data: bytes): |
170 | 194 | data = serialized_data |
@@ -219,27 +243,65 @@ def export( |
219 | 243 | if delay == self._MAX_RETRY_TIMEOUT: |
220 | 244 | return MetricExportResult.FAILURE |
221 | 245 |
|
222 | | - resp = self._export(serialized_data.SerializeToString()) |
223 | | - # pylint: disable=no-else-return |
224 | | - if resp.ok: |
225 | | - return MetricExportResult.SUCCESS |
226 | | - elif self._retryable(resp): |
227 | | - _logger.warning( |
228 | | - "Transient error %s encountered while exporting metric batch, retrying in %ss.", |
229 | | - resp.reason, |
230 | | - delay, |
231 | | - ) |
232 | | - sleep(delay) |
233 | | - continue |
| 246 | + if self._max_export_batch_size is None: |
| 247 | + resp = self._export(serialized_data.SerializeToString()) |
| 248 | + # pylint: disable=no-else-return |
| 249 | + if resp.ok: |
| 250 | + return MetricExportResult.SUCCESS |
| 251 | + elif self._retryable(resp): |
| 252 | + _logger.warning( |
| 253 | + "Transient error %s encountered while exporting metric batch, retrying in %ss.", |
| 254 | + resp.reason, |
| 255 | + delay, |
| 256 | + ) |
| 257 | + sleep(delay) |
| 258 | + continue |
| 259 | + else: |
| 260 | + _logger.error( |
| 261 | + "Failed to export batch code: %s, reason: %s", |
| 262 | + resp.status_code, |
| 263 | + resp.text, |
| 264 | + ) |
| 265 | + return MetricExportResult.FAILURE |
| 266 | + |
| 267 | + # Else, attempt export in batches for this retry |
234 | 268 | else: |
235 | | - _logger.error( |
236 | | - "Failed to export batch code: %s, reason: %s", |
237 | | - resp.status_code, |
238 | | - resp.text, |
239 | | - ) |
240 | | - return MetricExportResult.FAILURE |
| 269 | + export_result = MetricExportResult.SUCCESS |
| 270 | + for split_metrics_data in self._split_metrics_data(serialized_data): |
| 271 | + split_resp = self._export( |
| 272 | + data=split_metrics_data.SerializeToString() |
| 273 | + ) |
| 274 | + |
| 275 | + if split_resp.ok: |
| 276 | + export_result = MetricExportResult.SUCCESS |
| 277 | + elif self._retryable(split_resp): |
| 278 | + _logger.warning( |
| 279 | + "Transient error %s encountered while exporting metric batch, retrying in %ss.", |
| 280 | + split_resp.reason, |
| 281 | + delay, |
| 282 | + ) |
| 283 | + sleep(delay) |
| 284 | + continue |
| 285 | + else: |
| 286 | + _logger.error( |
| 287 | + "Failed to export batch code: %s, reason: %s", |
| 288 | + split_resp.status_code, |
| 289 | + split_resp.text, |
| 290 | + ) |
| 291 | + export_result = MetricExportResult.FAILURE |
| 292 | + |
| 293 | + # Return result after all batches are attempted |
| 294 | + return export_result |
| 295 | + |
241 | 296 | return MetricExportResult.FAILURE |
242 | 297 |
|
| 298 | + def _split_metrics_data( |
| 299 | + self, |
| 300 | + metrics_data: MetricsData, |
| 301 | + ) -> Iterable[MetricsData]: |
| 302 | + # TODO |
| 303 | + return [] |
| 304 | + |
243 | 305 | def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: |
244 | 306 | pass |
245 | 307 |
|
|
0 commit comments