Skip to content

Commit 6fb4918

Browse files
kelseymaxrmx
andauthored
Update OpAMP client to have additional callback methods (open-telemetry#4322)
* Update callbacks to use format similar to existing java and go clients * Add changelog entry * Update Callbacks to use ABC, add MessageData class and ReportFullState flag handling * Fix docs build * Check error_response before processing other ServerToAgent fields per OpAMP spec --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 3384f74 commit 6fb4918

File tree

10 files changed

+463
-83
lines changed

10 files changed

+463
-83
lines changed

docs/conf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@
160160
"py:class",
161161
"opamp_pb2.EffectiveConfig",
162162
),
163+
(
164+
"py:class",
165+
"opamp_pb2.AgentRemoteConfig",
166+
),
163167
]
164168

165169
cfg = ConfigParser()

opamp/opentelemetry-opamp-client/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
- Initial implementation
1111
([#3635](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3635))
12+
- Update client to have additional callback methods
13+
([#4322](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4322))

opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
3636
Since OpAMP APIs, config options or environment variables are not standardizes the distros are required
3737
to provide code doing so.
38-
OTel Python distros would need to provide their own message handler callback that implements the actual
38+
OTel Python distros would need to provide their own Callbacks subclass that implements the actual
3939
change of whatever configuration their backends sends.
4040
4141
Please note that the API is not finalized yet and so the name is called ``_opamp`` with the underscore.
@@ -48,15 +48,18 @@
4848
import os
4949
5050
from opentelemetry._opamp.agent import OpAMPAgent
51+
from opentelemetry._opamp.callbacks import Callbacks
5152
from opentelemetry._opamp.client import OpAMPClient
52-
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
5353
from opentelemetry.sdk._configuration import _OTelSDKConfigurator
5454
from opentelemetry.sdk.resources import OTELResourceDetector
5555
5656
57-
def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent):
58-
for config_filename, config in message.remote_config.config.config_map.items():
59-
print("do something")
57+
class MyCallbacks(Callbacks):
58+
def on_message(self, agent, client, message):
59+
if message.remote_config is None:
60+
return
61+
for config_filename, config in message.remote_config.config.config_map.items():
62+
print("do something")
6063
6164
6265
class MyOpenTelemetryConfigurator(_OTelSDKConfigurator):
@@ -79,7 +82,7 @@ def _configure(self, **kwargs):
7982
)
8083
opamp_agent = OpAMPAgent(
8184
interval=30,
82-
message_handler=opamp_handler,
85+
callbacks=MyCallbacks(),
8386
client=opamp_client,
8487
)
8588
opamp_agent.start()
@@ -90,6 +93,7 @@ def _configure(self, **kwargs):
9093
"""
9194

9295
from opentelemetry._opamp.agent import OpAMPAgent
96+
from opentelemetry._opamp.callbacks import Callbacks, MessageData
9397
from opentelemetry._opamp.client import OpAMPClient
9498

95-
__all__ = ["OpAMPAgent", "OpAMPClient"]
99+
__all__ = ["Callbacks", "MessageData", "OpAMPAgent", "OpAMPClient"]

opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,24 @@
2121
import threading
2222
from typing import Any, Callable
2323

24+
from opentelemetry._opamp.callbacks import Callbacks, MessageData
2425
from opentelemetry._opamp.client import OpAMPClient
2526
from opentelemetry._opamp.proto import opamp_pb2
2627

2728
logger = logging.getLogger(__name__)
2829

2930

31+
def _safe_invoke(function: Callable[..., Any], *args: Any) -> None:
32+
function_name = "<unknown>"
33+
try:
34+
function_name = function.__name__
35+
function(*args)
36+
except Exception as exc: # pylint: disable=broad-exception-caught
37+
logger.error(
38+
"Error when invoking function '%s'", function_name, exc_info=exc
39+
)
40+
41+
3042
class _Job:
3143
"""
3244
Represents a single request job, with retry/backoff metadata.
@@ -73,24 +85,22 @@ def __init__(
7385
self,
7486
*,
7587
interval: float = 30,
76-
message_handler: Callable[
77-
["OpAMPAgent", OpAMPClient, opamp_pb2.ServerToAgent], None
78-
],
88+
callbacks: Callbacks,
7989
max_retries: int = 10,
8090
heartbeat_max_retries: int = 1,
8191
initial_backoff: float = 1.0,
8292
client: OpAMPClient,
8393
):
8494
"""
8595
:param interval: seconds between heartbeat calls
86-
:param message_handler: user provided function that takes the received ServerToAgent message
96+
:param callbacks: Callbacks instance for receiving client events
8797
:param max_retries: how many times to retry a failed job for ad-hoc messages
8898
:param heartbeat_max_retries: how many times to retry an heartbeat failed job
8999
:param initial_backoff: base seconds for exponential backoff
90100
:param client: an OpAMPClient instance
91101
"""
92102
self._interval = interval
93-
self._handler = message_handler
103+
self._callbacks = callbacks
94104
self._max_retries = max_retries
95105
self._heartbeat_max_retries = heartbeat_max_retries
96106
self._initial_backoff = initial_backoff
@@ -186,23 +196,31 @@ def _run_worker(self) -> None:
186196
while job.should_retry() and not self._stop.is_set():
187197
try:
188198
message = self._client.send(job.payload)
199+
_safe_invoke(
200+
self._callbacks.on_connect, self, self._client
201+
)
189202
logger.debug("Job succeeded: %r", job.payload)
190203
break
191204
except Exception as exc:
192205
job.attempt += 1
206+
_safe_invoke(
207+
self._callbacks.on_connect_failed,
208+
self,
209+
self._client,
210+
exc,
211+
)
193212
logger.warning(
194213
"Job %r failed attempt %d/%d: %s",
195214
job.payload,
196215
job.attempt,
197-
job.max_retries,
216+
job.max_retries + 1,
198217
exc,
199218
)
200219

201220
if not job.should_retry():
202221
logger.error(
203222
"Job %r dropped after max retries", job.payload
204223
)
205-
logger.exception(exc)
206224
break
207225

208226
# exponential backoff with +/- 20% jitter, interruptible by stop event
@@ -216,14 +234,7 @@ def _run_worker(self) -> None:
216234
break
217235

218236
if message is not None:
219-
# we can't do much if the handler fails other than logging
220-
try:
221-
self._handler(self, self._client, message)
222-
logger.debug("Called Job message handler for: %r", message)
223-
except Exception as exc:
224-
logger.warning(
225-
"Job %r handler failed with: %s", job.payload, exc
226-
)
237+
self._process_message(message)
227238

228239
try:
229240
if job.callback is not None:
@@ -233,6 +244,29 @@ def _run_worker(self) -> None:
233244
finally:
234245
self._queue.task_done()
235246

247+
def _process_message(self, message: opamp_pb2.ServerToAgent) -> None:
248+
if message.HasField("error_response"):
249+
_safe_invoke(
250+
self._callbacks.on_error,
251+
self,
252+
self._client,
253+
message.error_response,
254+
)
255+
return
256+
257+
if message.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
258+
logger.debug("Server requested full state report")
259+
payload = self._client.build_full_state_message()
260+
self.send(payload)
261+
262+
msg_data = MessageData.from_server_message(message)
263+
_safe_invoke(
264+
self._callbacks.on_message,
265+
self,
266+
self._client,
267+
msg_data,
268+
)
269+
236270
def stop(self, timeout: float | None = None) -> None:
237271
"""
238272
Signal server we are disconnecting and then threads to exit
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from abc import ABC
18+
from dataclasses import dataclass
19+
from typing import TYPE_CHECKING
20+
21+
from opentelemetry._opamp.proto import opamp_pb2
22+
23+
if TYPE_CHECKING:
24+
from opentelemetry._opamp.agent import OpAMPAgent
25+
from opentelemetry._opamp.client import OpAMPClient
26+
27+
28+
@dataclass
29+
class MessageData:
30+
"""Structured view of a ServerToAgent message for callback consumption.
31+
32+
Only fields the agent is expected to act on are exposed. Flags and
33+
error_response are handled internally by the client before this
34+
object reaches the callback.
35+
"""
36+
37+
remote_config: opamp_pb2.AgentRemoteConfig | None = None
38+
39+
@classmethod
40+
def from_server_message(
41+
cls, message: opamp_pb2.ServerToAgent
42+
) -> MessageData:
43+
return cls(
44+
remote_config=message.remote_config
45+
if message.HasField("remote_config")
46+
else None,
47+
)
48+
49+
50+
class Callbacks(ABC):
51+
"""OpAMP client callbacks with no-op defaults.
52+
53+
All methods have no-op defaults so that subclasses only need to
54+
override the callbacks they care about. New callbacks can be added
55+
in the future without breaking existing subclasses.
56+
"""
57+
58+
def on_connect(self, agent: OpAMPAgent, client: OpAMPClient) -> None:
59+
"""Called when the connection is successfully established to the
60+
Server. For HTTP clients this is called for any request if the
61+
response status is OK.
62+
"""
63+
64+
def on_connect_failed(
65+
self,
66+
agent: OpAMPAgent,
67+
client: OpAMPClient,
68+
error: Exception,
69+
) -> None:
70+
"""Called when the connection to the Server cannot be established.
71+
May also be called if the connection is lost and reconnection
72+
attempt fails.
73+
"""
74+
75+
def on_error(
76+
self,
77+
agent: OpAMPAgent,
78+
client: OpAMPClient,
79+
error_response: opamp_pb2.ServerErrorResponse,
80+
) -> None:
81+
"""Called when the Server reports an error in response to a
82+
previously sent request. Useful for logging purposes. The Agent
83+
should not attempt to process the error by reconnecting or
84+
retrying previous operations. The client handles the UNAVAILABLE
85+
case internally by performing retries as necessary.
86+
"""
87+
88+
def on_message(
89+
self,
90+
agent: OpAMPAgent,
91+
client: OpAMPClient,
92+
message: MessageData,
93+
) -> None:
94+
"""Called when the Agent receives a message that needs processing."""

opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/transport/requests.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ def send(
5959
)
6060
response.raise_for_status()
6161
except Exception as exc:
62-
logger.error(str(exc))
63-
raise OpAMPException
62+
raise OpAMPException(str(exc)) from exc
6463

6564
message = messages.decode_message(response.content)
6665

opamp/opentelemetry-opamp-client/tests/opamp/cassettes/test_connection_remote_config_status_heartbeat_disconnection.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ interactions:
5656
body:
5757
string: !!binary |
5858
ChABnL5b5k5046/FazbAJ7r4GioKBgoECgASABIg47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZ
59-
G3hSuFUwAVIA
59+
G3hSuFVSAA==
6060
headers:
6161
Content-Length:
62-
- '66'
62+
- '64'
6363
Content-Type:
6464
- application/x-protobuf
6565
Date:
@@ -89,10 +89,10 @@ interactions:
8989
body:
9090
string: !!binary |
9191
ChABnL5b5k5046/FazbAJ7r4GioKBgoECgASABIg47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZ
92-
G3hSuFUwAVIA
92+
G3hSuFVSAA==
9393
headers:
9494
Content-Length:
95-
- '66'
95+
- '64'
9696
Content-Type:
9797
- application/x-protobuf
9898
Date:
@@ -122,10 +122,10 @@ interactions:
122122
body:
123123
string: !!binary |
124124
ChABnL5b5k5046/FazbAJ7r4GioKBgoECgASABIg47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZ
125-
G3hSuFUwAVIA
125+
G3hSuFVSAA==
126126
headers:
127127
Content-Length:
128-
- '66'
128+
- '64'
129129
Content-Type:
130130
- application/x-protobuf
131131
Date:

0 commit comments

Comments
 (0)