Skip to content

Commit 97bf034

Browse files
authored
OpAMP improvements (#401)
* opamp: log transport send exception * distro: message handler should handle remote config decode exceptions So that we don't go into a loop if we don't ackwnoledge the message to the server.
1 parent 9d1ff4b commit 97bf034

File tree

4 files changed

+81
-14
lines changed

4 files changed

+81
-14
lines changed

src/elasticotel/distro/config.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
from opentelemetry._opamp import messages
2525
from opentelemetry._opamp.agent import OpAMPAgent
2626
from opentelemetry._opamp.client import OpAMPClient
27+
from opentelemetry._opamp.exceptions import (
28+
OpAMPRemoteConfigDecodeException,
29+
OpAMPRemoteConfigParseException,
30+
)
2731
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
2832
from opentelemetry.sdk.environment_variables import OTEL_LOG_LEVEL, OTEL_TRACES_SAMPLER_ARG
2933
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
@@ -191,17 +195,21 @@ def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.Ser
191195

192196
_config = _get_config()
193197
error_messages = []
194-
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
195-
# we don't have standardized config values so limit to configs coming from our backend
196-
if config_filename == "elastic":
197-
logger.debug("Config %s: %s", config_filename, remote_config)
198-
config_update = _handle_logging_level(remote_config)
199-
if config_update.error_message:
200-
error_messages.append(config_update.error_message)
201-
202-
config_update = _handle_sampling_rate(remote_config)
203-
if config_update.error_message:
204-
error_messages.append(config_update.error_message)
198+
try:
199+
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
200+
# we don't have standardized config values so limit to configs coming from our backend
201+
if config_filename == "elastic":
202+
logger.debug("Config %s: %s", config_filename, remote_config)
203+
config_update = _handle_logging_level(remote_config)
204+
if config_update.error_message:
205+
error_messages.append(config_update.error_message)
206+
207+
config_update = _handle_sampling_rate(remote_config)
208+
if config_update.error_message:
209+
error_messages.append(config_update.error_message)
210+
except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc:
211+
logger.error(str(exc))
212+
error_messages.append(str(exc))
205213

206214
error_message = "\n".join(error_messages)
207215
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED

src/opentelemetry/_opamp/messages.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,8 @@ def _decode_remote_config(remote_config: opamp_pb2.AgentRemoteConfig) -> Generat
157157
config_data = json.loads(body)
158158
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
159159
raise OpAMPRemoteConfigDecodeException(
160-
f"Failed to decode {config_file} with content type {config_file.content_type}: {exc}"
160+
f"Failed to decode {config_file_name} with content type {config_file.content_type}: {exc}"
161161
)
162-
continue
163162

164163
yield config_file_name, config_data
165164
else:

src/opentelemetry/_opamp/transport/requests.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
import logging
1718
from typing import Mapping
1819

1920
import requests
@@ -22,6 +23,8 @@
2223
from opentelemetry._opamp.transport.exceptions import OpAMPException
2324
from opentelemetry._opamp.transport.base import HttpTransport, base_headers
2425

26+
logger = logging.getLogger(__name__)
27+
2528

2629
class RequestsTransport(HttpTransport):
2730
# TODO: move some stuff here instead of send?
@@ -35,7 +38,8 @@ def send(self, url: str, headers: Mapping[str, str], data: bytes, timeout_millis
3538
try:
3639
response = self.session.post(url, headers=headers, data=data, timeout=timeout)
3740
response.raise_for_status()
38-
except Exception:
41+
except Exception as exc:
42+
logger.error(str(exc))
3943
raise OpAMPException
4044

4145
message = messages._decode_message(response.content)

tests/distro/test_distro.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,62 @@ def test_ignores_non_elastic_filename(self, get_logger_mock, handle_logging_mock
309309
agent.send.assert_called_once_with(payload=mock.ANY)
310310
client._build_full_state_message.assert_not_called()
311311

312+
@mock.patch("elasticotel.distro.config._get_config")
313+
@mock.patch("elasticotel.distro.config.logger")
314+
def test_fails_if_cannot_decode_elastic_config_json(self, logger_mock, get_config_mock):
315+
get_config_mock.return_value = Config()
316+
agent = mock.Mock()
317+
client = mock.Mock()
318+
config = opamp_pb2.AgentConfigMap()
319+
config.config_map["elastic"].body = b"{"
320+
config.config_map["elastic"].content_type = "application/json"
321+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
322+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
323+
opamp_handler(agent, client, message)
324+
325+
error_message = "Failed to decode elastic with content type application/json: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)"
326+
logger_mock.error.assert_called_once_with(error_message)
327+
328+
client._update_remote_config_status.assert_called_once_with(
329+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_FAILED, error_message=error_message
330+
)
331+
client._update_effective_config.assert_called_once_with(
332+
{"elastic": {"logging_level": "info", "sampling_rate": "1.0"}}
333+
)
334+
client._build_remote_config_status_response_message.assert_called_once_with(
335+
client._update_remote_config_status()
336+
)
337+
agent.send.assert_called_once_with(payload=mock.ANY)
338+
client._build_full_state_message.assert_not_called()
339+
340+
@mock.patch("elasticotel.distro.config._get_config")
341+
@mock.patch("elasticotel.distro.config.logger")
342+
def test_fails_if_elastic_config_is_not_json(self, logger_mock, get_config_mock):
343+
get_config_mock.return_value = Config()
344+
agent = mock.Mock()
345+
client = mock.Mock()
346+
config = opamp_pb2.AgentConfigMap()
347+
config.config_map["elastic"].body = b"not-json"
348+
config.config_map["elastic"].content_type = "not/json"
349+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
350+
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
351+
opamp_handler(agent, client, message)
352+
353+
error_message = "Cannot parse elastic with content type not/json"
354+
logger_mock.error.assert_called_once_with(error_message)
355+
356+
client._update_remote_config_status.assert_called_once_with(
357+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_FAILED, error_message=error_message
358+
)
359+
client._update_effective_config.assert_called_once_with(
360+
{"elastic": {"logging_level": "info", "sampling_rate": "1.0"}}
361+
)
362+
client._build_remote_config_status_response_message.assert_called_once_with(
363+
client._update_remote_config_status()
364+
)
365+
agent.send.assert_called_once_with(payload=mock.ANY)
366+
client._build_full_state_message.assert_not_called()
367+
312368
@mock.patch("elasticotel.distro.config._get_config")
313369
@mock.patch.object(logging, "getLogger")
314370
def test_sets_matching_logging_level(self, get_logger_mock, get_config_mock):

0 commit comments

Comments
 (0)