Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
478d1f3
Initial commit to add timeout as a parm to export, make retries encom…
DylanRussell Apr 30, 2025
ccdd224
Fix lint issues
DylanRussell Apr 30, 2025
5bc8894
Fix a bunch of failing style/lint/spellcheck checks
DylanRussell May 1, 2025
ba92c5a
Remove timeout param from the export calls.
DylanRussell May 2, 2025
29144a1
Fix flaky windows test ?
DylanRussell May 2, 2025
838d7d9
Merge branch 'main' into retry2
DylanRussell May 6, 2025
66a4ebe
Merge branch 'main' into retry2
DylanRussell May 8, 2025
95ccfea
Respond to review comments..
DylanRussell May 9, 2025
d5ca894
Merge branch 'main' of github.com:DylanRussell/opentelemetry-python i…
DylanRussell May 12, 2025
8770e15
Delete exponential backoff code that is now unused
DylanRussell May 13, 2025
4c74411
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 13, 2025
f373caa
Add changelog and remove some unused imports..
DylanRussell May 13, 2025
d1e04e1
fix typo and unit test flaking on windows
DylanRussell May 13, 2025
f42ecd3
Refactor tests, HTTP exporters a bit
DylanRussell May 22, 2025
096b9f8
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
8673b45
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
46e15f1
Remove unneeded test reqs
DylanRussell May 22, 2025
dcba91a
Remove gRPC retry config
DylanRussell Jun 5, 2025
d506d54
Merge remote-tracking branch 'origin' into retry2
DylanRussell Jun 5, 2025
71b77e1
Tweak backoff calculation
DylanRussell Jun 5, 2025
2ae79bb
Lint and precommit
DylanRussell Jun 5, 2025
553ea3e
Empty commit
DylanRussell Jun 5, 2025
28b9399
Another empty commit
DylanRussell Jun 5, 2025
b4df54a
Calculate backoff in 1 place instead of 2
DylanRussell Jun 5, 2025
9e1ba28
Update changelog
DylanRussell Jun 5, 2025
0b54090
Update changelog
DylanRussell Jun 5, 2025
bc3110a
Make new _common directory in the http exporter for shared code
DylanRussell Jun 5, 2025
4bbecf8
precommit
DylanRussell Jun 5, 2025
ec3fe9c
Respond to comments on PR
DylanRussell Jun 9, 2025
d82b183
Merge branch 'main' into retry2
DylanRussell Jun 9, 2025
267b9a9
Fix broken test, execute precommit
DylanRussell Jun 9, 2025
ec69083
Skip some tests on windows
DylanRussell Jun 9, 2025
49d402e
Explain why test is skipped
DylanRussell Jun 9, 2025
f917af6
Update exporter/opentelemetry-exporter-otlp-proto-grpc/src/openteleme…
DylanRussell Jun 10, 2025
e7a8356
Revert change to start respecting timeout passed into metric exporter
DylanRussell Jun 10, 2025
2ae79b5
Merge branch 'main' into retry2
DylanRussell Jun 11, 2025
e444263
Merge branch 'main' into retry2
xrmx Jun 13, 2025
7a76d39
Merge branch 'main' into retry2
xrmx Jun 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self.host_name = host_name
self.node = utils.get_node(service_name, host_name)

# pylint: disable=arguments-differ
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
# Populate service_name from first span
# We restrict any SpanProcessor to be only associated with a single
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -79,7 +79,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down Expand Up @@ -107,8 +107,12 @@ def _translate_data(
) -> ExportLogsServiceRequest:
return encode_logs(data)

def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._export(batch)
def export(
self, batch: Sequence[LogData], timeout_millis: Optional[int] = None
) -> LogExportResult:
return self._export(
batch, timeout_millis / 1e3 if timeout_millis else None
)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

"""OTLP Exporter"""

import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand All @@ -35,7 +35,6 @@
from urllib.parse import urlparse

from deprecated import deprecated
from google.rpc.error_details_pb2 import RetryInfo

from grpc import (
ChannelCredentials,
Expand All @@ -47,7 +46,6 @@
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
Expand All @@ -74,6 +72,29 @@
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers

_JSON_CONFIG = json.dumps(
{
"methodConfig": [
{
"name": [dict()],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "64s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"CANCELLED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"DATA_LOSS",
],
},
}
]
}
)
logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
ResourceDataT = TypeVar("ResourceDataT")
Expand Down Expand Up @@ -195,7 +216,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
super().__init__()
Expand Down Expand Up @@ -232,7 +253,7 @@ def __init__(
else:
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)

self._timeout = timeout or int(
self._timeout = timeout or float(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
)
self._collector_kwargs = None
Expand All @@ -245,7 +266,11 @@ def __init__(

if insecure:
self._channel = insecure_channel(
self._endpoint, compression=compression
self._endpoint,
compression=compression,
options=[
("grpc.service_config", _JSON_CONFIG),
],
)
else:
credentials = _get_credentials(
Expand All @@ -255,7 +280,12 @@ def __init__(
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)
self._channel = secure_channel(
self._endpoint, credentials, compression=compression
self._endpoint,
credentials,
compression=compression,
options=[
("grpc.service_config", _JSON_CONFIG),
],
)
self._client = self._stub(self._channel)

Expand All @@ -269,90 +299,35 @@ def _translate_data(
pass

def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
self,
data: Union[TypingSequence[ReadableSpan], MetricsData],
timeout_sec: Optional[float] = None,
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
# logger.warning(
# "Transient error %s encountered while exporting %s, retrying in %ss.",
# error.code(),
# data.__class__.__name__,
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=(timeout_sec or self._timeout),
)
return self._result.SUCCESS
except RpcError as error:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

from __future__ import annotations

import time
from dataclasses import replace
from logging import getLogger
from os import environ
from typing import Iterable, List, Tuple, Union
from typing import Iterable, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence

from grpc import ChannelCredentials, Compression
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(
credentials: ChannelCredentials | None = None,
headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str]
| None = None,
timeout: int | None = None,
timeout: float | None = None,
compression: Compression | None = None,
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
Expand All @@ -124,7 +125,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down Expand Up @@ -158,17 +159,22 @@ def _translate_data(
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
timeout_millis: Optional[int] = None,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
timeout_sec = (
timeout_millis / 1e3 if timeout_millis else self._timeout # pylint: disable=protected-access
)
if self._max_export_batch_size is None:
return self._export(data=metrics_data)
return self._export(metrics_data, timeout_sec)

export_result = MetricExportResult.SUCCESS

deadline_sec = time.time() + timeout_sec
for split_metrics_data in self._split_metrics_data(metrics_data):
split_export_result = self._export(data=split_metrics_data)
time_remaining_sec = deadline_sec - time.time()
split_export_result = self._export(
split_metrics_data, time_remaining_sec
)

if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -112,7 +112,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand All @@ -139,8 +139,14 @@ def _translate_data(
) -> ExportTraceServiceRequest:
return encode_spans(data)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)
def export(
self,
spans: Sequence[ReadableSpan],
timeout_millis: Optional[int] = None,
) -> SpanExportResult:
return self._export(
spans, timeout_millis / 1e3 if timeout_millis else None
)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ asgiref==3.7.2
Deprecated==1.2.14
googleapis-common-protos==1.63.2
grpcio==1.66.2
grpcio-status==1.66.0
importlib-metadata==6.11.0
iniconfig==2.0.0
packaging==24.0
Expand Down
Loading
Loading