Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
478d1f3
Initial commit to add timeout as a parm to export, make retries encom…
DylanRussell Apr 30, 2025
ccdd224
Fix lint issues
DylanRussell Apr 30, 2025
5bc8894
Fix a bunch of failing style/lint/spellcheck checks
DylanRussell May 1, 2025
ba92c5a
Remove timeout param from the export calls.
DylanRussell May 2, 2025
29144a1
Fix flaky windows test ?
DylanRussell May 2, 2025
838d7d9
Merge branch 'main' into retry2
DylanRussell May 6, 2025
66a4ebe
Merge branch 'main' into retry2
DylanRussell May 8, 2025
95ccfea
Respond to review comments..
DylanRussell May 9, 2025
d5ca894
Merge branch 'main' of github.com:DylanRussell/opentelemetry-python i…
DylanRussell May 12, 2025
8770e15
Delete exponential backoff code that is now unused
DylanRussell May 13, 2025
4c74411
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 13, 2025
f373caa
Add changelog and remove some unused imports..
DylanRussell May 13, 2025
d1e04e1
fix typo and unit test flaking on windows
DylanRussell May 13, 2025
f42ecd3
Refactor tests, HTTP exporters a bit
DylanRussell May 22, 2025
096b9f8
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
8673b45
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
46e15f1
Remove unneeded test reqs
DylanRussell May 22, 2025
dcba91a
Remove gRPC retry config
DylanRussell Jun 5, 2025
d506d54
Merge remote-tracking branch 'origin' into retry2
DylanRussell Jun 5, 2025
71b77e1
Tweak backoff calculation
DylanRussell Jun 5, 2025
2ae79bb
Lint and precommit
DylanRussell Jun 5, 2025
553ea3e
Empty commit
DylanRussell Jun 5, 2025
28b9399
Another empty commit
DylanRussell Jun 5, 2025
b4df54a
Calculate backoff in 1 place instead of 2
DylanRussell Jun 5, 2025
9e1ba28
Update changelog
DylanRussell Jun 5, 2025
0b54090
Update changelog
DylanRussell Jun 5, 2025
bc3110a
Make new _common directory in the http exporter for shared code
DylanRussell Jun 5, 2025
4bbecf8
precommit
DylanRussell Jun 5, 2025
ec3fe9c
Respond to comments on PR
DylanRussell Jun 9, 2025
d82b183
Merge branch 'main' into retry2
DylanRussell Jun 9, 2025
267b9a9
Fix broken test, execute precommit
DylanRussell Jun 9, 2025
ec69083
Skip some tests on windows
DylanRussell Jun 9, 2025
49d402e
Explain why test is skipped
DylanRussell Jun 9, 2025
f917af6
Update exporter/opentelemetry-exporter-otlp-proto-grpc/src/openteleme…
DylanRussell Jun 10, 2025
e7a8356
Revert change to start respecting timeout passed into metric exporter
DylanRussell Jun 10, 2025
2ae79b5
Merge branch 'main' into retry2
DylanRussell Jun 11, 2025
e444263
Merge branch 'main' into retry2
xrmx Jun 13, 2025
7a76d39
Merge branch 'main' into retry2
xrmx Jun 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
- Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs,
and an unnecessary 32 second sleep that occurred after all retries had completed/failed was removed
([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)).


## Version 1.33.0/0.54b0 (2025-05-09)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

import logging
from collections.abc import Sequence
from itertools import count
from typing import (
Any,
Callable,
Dict,
Iterator,
List,
Mapping,
Optional,
Expand Down Expand Up @@ -177,38 +175,3 @@ def _get_resource_data(
)
)
return resource_data


def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]:
"""
Generates an infinite sequence of exponential backoff values. The sequence starts
from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified
and non-zero, the generated values will not exceed this maximum, capping at max_value
instead of growing indefinitely.

Parameters:
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
sequence grows without bound.

Returns:
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
capped at max_value.

Example:
```
gen = _create_exp_backoff_generator(max_value=10)
for _ in range(5):
print(next(gen))
```
This will print:
1
2
4
8
10

Note: this functionality used to be handled by the 'backoff' package.
"""
for i in count(0):
out = 2**i
yield min(out, max_value) if max_value else out

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -79,7 +79,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

"""OTLP Exporter"""

import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand All @@ -35,7 +35,6 @@
from urllib.parse import urlparse

from deprecated import deprecated
from google.rpc.error_details_pb2 import RetryInfo

from grpc import (
ChannelCredentials,
Expand All @@ -47,7 +46,6 @@
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
Expand All @@ -74,6 +72,34 @@
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers

_JSON_CONFIG = json.dumps(
{
"methodConfig": [
{
"name": [dict()],
"retryPolicy": {
# 5 is the maximum allowable attempts allowed by grpc retry policy.
# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt.
# Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting
# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the
# timeout, and all fail. See https://grpc.io/docs/guides/retry/ for more details.
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "9s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"CANCELLED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"DATA_LOSS",
],
},
}
]
}
)
logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
ResourceDataT = TypeVar("ResourceDataT")
Expand Down Expand Up @@ -197,7 +223,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
super().__init__()
Expand Down Expand Up @@ -234,7 +260,7 @@ def __init__(
else:
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)

self._timeout = timeout or int(
self._timeout = timeout or float(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
)
self._collector_kwargs = None
Expand All @@ -247,7 +273,11 @@ def __init__(

if insecure:
self._channel = insecure_channel(
self._endpoint, compression=compression
self._endpoint,
compression=compression,
options=[
("grpc.service_config", _JSON_CONFIG),
],
)
else:
credentials = _get_credentials(
Expand All @@ -257,7 +287,12 @@ def __init__(
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)
self._channel = secure_channel(
self._endpoint, credentials, compression=compression
self._endpoint,
credentials,
compression=compression,
options=[
("grpc.service_config", _JSON_CONFIG),
],
)
self._client = self._stub(self._channel)

Expand All @@ -271,91 +306,34 @@ def _translate_data(
pass

def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
self,
data: Union[TypingSequence[ReadableSpan], MetricsData],
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
# logger.warning(
# "Transient error %s encountered while exporting %s, retrying in %ss.",
# error.code(),
# data.__class__.__name__,
# delay,
# )
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):
if delay == self._MAX_RETRY_TIMEOUT or self._shutdown:
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
return self._result.SUCCESS
except RpcError as error:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
credentials: ChannelCredentials | None = None,
headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str]
| None = None,
timeout: int | None = None,
timeout: float | None = None,
compression: Compression | None = None,
preferred_temporality: dict[type, AggregationTemporality]
| None = None,
Expand All @@ -124,7 +124,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down Expand Up @@ -172,7 +172,6 @@ def export(

if split_export_result is MetricExportResult.FAILURE:
export_result = MetricExportResult.FAILURE

return export_result

def _split_metrics_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
headers: Optional[
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
] = None,
timeout: Optional[int] = None,
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
Expand All @@ -112,7 +112,7 @@ def __init__(

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
float(environ_timeout) if environ_timeout is not None else None
)

compression = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ asgiref==3.7.2
Deprecated==1.2.14
googleapis-common-protos==1.63.2
grpcio==1.66.2
grpcio-status==1.66.0
importlib-metadata==6.11.0
iniconfig==2.0.0
packaging==24.0
Expand Down
Loading
Loading