Skip to content

Commit 3cfa7f9

Browse files
authored
opamp: ack remote config status changes (#340)
* opamp: ack remote config status changes Start reporting ReportsRemoteConfig agent capability and sending an ack to the server when we update our config based on the remote one. This rework the opamp handler signature to also take the agent as we need to enqueue a remote config status message to ack the updated configuration.
1 parent f1e9af5 commit 3cfa7f9

File tree

10 files changed

+302
-73
lines changed

10 files changed

+302
-73
lines changed

src/elasticotel/distro/config.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import logging
1818

1919
from opentelemetry._opamp import messages
20+
from opentelemetry._opamp.agent import OpAMPAgent
2021
from opentelemetry._opamp.client import OpAMPClient
2122
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
2223

@@ -34,10 +35,12 @@
3435
}
3536

3637

37-
def opamp_handler(client: OpAMPClient, message: opamp_pb2.ServerToAgent):
38-
if not message.remote_config:
38+
def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent):
39+
# we check config_hash because we need to track last received config and remote_config seems to be always truthy
40+
if not message.remote_config or not message.remote_config.config_hash:
3941
return
4042

43+
error_message = ""
4144
for config_filename, config in messages._decode_remote_config(message.remote_config):
4245
# we don't have standardized config values so limit to configs coming from our backend
4346
if config_filename == "elastic":
@@ -51,7 +54,17 @@ def opamp_handler(client: OpAMPClient, message: opamp_pb2.ServerToAgent):
5154

5255
if logging_level is None:
5356
logger.warning("Logging level not handled: %s", config_logging_level)
57+
error_message = f"Logging level not handled: {config_logging_level}"
5458
else:
5559
# update upstream and distro logging levels
5660
logging.getLogger("opentelemetry").setLevel(logging_level)
5761
logging.getLogger("elasticotel").setLevel(logging_level)
62+
63+
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
64+
updated_remote_config = client._update_remote_config_status(
65+
remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message
66+
)
67+
# if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response
68+
if updated_remote_config is not None:
69+
payload = client._build_remote_config_status_response_message(updated_remote_config)
70+
agent.send(payload=payload)

src/opentelemetry/_opamp/agent.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __init__(
7171
self,
7272
*,
7373
interval: float,
74-
message_handler: Callable[[OpAMPClient, opamp_pb2.ServerToAgent], None],
74+
message_handler: Callable[["OpAMPAgent", OpAMPClient, opamp_pb2.ServerToAgent], None],
7575
max_retries: int = 10,
7676
heartbeat_max_retries: int = 1,
7777
initial_backoff: float = 1.0,
@@ -178,10 +178,10 @@ def _run_worker(self) -> None:
178178
logger.debug("Stop signaled, abandoning job %r", job.payload)
179179
break
180180

181-
# we can't do much if the handler fails other than logging
182181
if message is not None:
182+
# we can't do much if the handler fails other than logging
183183
try:
184-
self._handler(self._client, message)
184+
self._handler(self, self._client, message)
185185
logger.debug("Called Job message handler for: %r", message)
186186
except Exception as exc:
187187
logger.warning("Job %r handler failed with: %s", job.payload, exc)

src/opentelemetry/_opamp/client.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsStatus
4242
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsHeartbeat
4343
| opamp_pb2.AgentCapabilities.AgentCapabilities_AcceptsRemoteConfig
44+
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsRemoteConfig
4445
)
4546

4647

@@ -67,6 +68,7 @@ def __init__(
6768
)
6869
self._sequence_num: int = 0
6970
self._instance_uid: bytes = uuid7().bytes
71+
self._remote_config_status: opamp_pb2.RemoteConfigStatus | None = None
7072

7173
def _build_connection_message(self) -> bytes:
7274
message = messages._build_presentation_message(
@@ -94,6 +96,37 @@ def _build_heartbeat_message(self) -> bytes:
9496
data = messages._encode_message(message)
9597
return data
9698

99+
def _update_remote_config_status(
100+
self, remote_config_hash: bytes, status: opamp_pb2.RemoteConfigStatuses.ValueType, error_message: str = ""
101+
) -> opamp_pb2.RemoteConfigStatus | None:
102+
status_changed = (
103+
not self._remote_config_status
104+
or self._remote_config_status.last_remote_config_hash != remote_config_hash
105+
or self._remote_config_status.status != status
106+
or self._remote_config_status.error_message != error_message
107+
)
108+
# if the status changed update we return the RemoteConfigStatus message so that we can send it to the server
109+
if status_changed:
110+
_logger.debug("Update remote config status changed for %s", remote_config_hash)
111+
self._remote_config_status = messages._build_remote_config_status_message(
112+
last_remote_config_hash=remote_config_hash,
113+
status=status,
114+
error_message=error_message,
115+
)
116+
return self._remote_config_status
117+
else:
118+
return None
119+
120+
def _build_remote_config_status_response_message(self, remote_config_status: opamp_pb2.RemoteConfigStatus) -> bytes:
121+
message = messages._build_remote_config_status_response_message(
122+
instance_uid=self._instance_uid,
123+
sequence_num=self._sequence_num,
124+
capabilities=_HANDLED_CAPABILITIES,
125+
remote_config_status=remote_config_status,
126+
)
127+
data = messages._encode_message(message)
128+
return data
129+
97130
def _send(self, data: bytes):
98131
try:
99132
response = self._transport.send(

src/opentelemetry/_opamp/messages.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,28 @@ def _build_agent_disconnect_message(
9393
return command
9494

9595

96+
def _build_remote_config_status_message(
97+
last_remote_config_hash: bytes, status: opamp_pb2.RemoteConfigStatuses.ValueType, error_message: str = ""
98+
) -> opamp_pb2.RemoteConfigStatus:
99+
return opamp_pb2.RemoteConfigStatus(
100+
last_remote_config_hash=last_remote_config_hash,
101+
status=status,
102+
error_message=error_message,
103+
)
104+
105+
106+
def _build_remote_config_status_response_message(
107+
instance_uid: bytes, sequence_num: int, capabilities: int, remote_config_status: opamp_pb2.RemoteConfigStatus
108+
) -> opamp_pb2.AgentToServer:
109+
command = opamp_pb2.AgentToServer(
110+
instance_uid=instance_uid,
111+
sequence_num=sequence_num,
112+
remote_config_status=remote_config_status,
113+
capabilities=capabilities,
114+
)
115+
return command
116+
117+
96118
def _encode_message(data: opamp_pb2.AgentToServer) -> bytes:
97119
return data.SerializeToString()
98120

tests/distro/test_distro.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -217,48 +217,68 @@ def test_configurator_ignores_opamp_without_endpoint(self, client_mock, agent_mo
217217

218218
class TestOpAMPHandler(TestCase):
219219
@mock.patch.object(logging, "getLogger")
220-
def test_does_not_nothing_without_remote_config(self, get_logger_mock):
220+
def test_does_nothing_without_remote_config(self, get_logger_mock):
221221
message = opamp_pb2.ServerToAgent()
222+
agent = mock.Mock()
222223
client = mock.Mock()
223-
opamp_handler(client, message)
224+
opamp_handler(agent, client, message)
224225

225226
get_logger_mock.assert_not_called()
226227

227228
@mock.patch.object(logging, "getLogger")
228229
def test_ignores_non_elastic_filename(self, get_logger_mock):
230+
agent = mock.Mock()
229231
client = mock.Mock()
230232
config = opamp_pb2.AgentConfigMap()
231233
config.config_map["non-elastic"].body = json.dumps({"logging_level": "trace"}).encode()
232234
config.config_map["non-elastic"].content_type = "application/json"
233-
remote_config = opamp_pb2.AgentRemoteConfig(config=config)
235+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
234236
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
235-
opamp_handler(client, message)
237+
opamp_handler(agent, client, message)
236238

237239
get_logger_mock.assert_not_called()
238240

241+
client._update_remote_config_status.assert_called_once_with(
242+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
243+
)
244+
client._build_remote_config_status_response_message.assert_called_once_with(
245+
client._update_remote_config_status()
246+
)
247+
agent.send.assert_called_once_with(payload=mock.ANY)
248+
239249
@mock.patch.object(logging, "getLogger")
240250
def test_sets_matching_logging_level(self, get_logger_mock):
251+
agent = mock.Mock()
241252
client = mock.Mock()
242253
config = opamp_pb2.AgentConfigMap()
243254
config.config_map["elastic"].body = json.dumps({"logging_level": "trace"}).encode()
244255
config.config_map["elastic"].content_type = "application/json"
245-
remote_config = opamp_pb2.AgentRemoteConfig(config=config)
256+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
246257
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
247-
opamp_handler(client, message)
258+
opamp_handler(agent, client, message)
248259

249260
get_logger_mock.assert_has_calls(
250261
[mock.call("opentelemetry"), mock.call().setLevel(5), mock.call("elasticotel"), mock.call().setLevel(5)]
251262
)
252263

264+
client._update_remote_config_status.assert_called_once_with(
265+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
266+
)
267+
client._build_remote_config_status_response_message.assert_called_once_with(
268+
client._update_remote_config_status()
269+
)
270+
agent.send.assert_called_once_with(payload=mock.ANY)
271+
253272
@mock.patch.object(logging, "getLogger")
254273
def test_sets_logging_to_default_info_without_logging_level_entry_in_config(self, get_logger_mock):
274+
agent = mock.Mock()
255275
client = mock.Mock()
256276
config = opamp_pb2.AgentConfigMap()
257277
config.config_map["elastic"].body = json.dumps({}).encode()
258278
config.config_map["elastic"].content_type = "application/json"
259-
remote_config = opamp_pb2.AgentRemoteConfig(config=config)
279+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
260280
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
261-
opamp_handler(client, message)
281+
opamp_handler(agent, client, message)
262282

263283
get_logger_mock.assert_has_calls(
264284
[
@@ -269,14 +289,28 @@ def test_sets_logging_to_default_info_without_logging_level_entry_in_config(self
269289
]
270290
)
271291

292+
client._update_remote_config_status.assert_called_once_with(
293+
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
294+
)
295+
client._build_remote_config_status_response_message.assert_called_once_with(
296+
client._update_remote_config_status()
297+
)
298+
agent.send.assert_called_once_with(payload=mock.ANY)
299+
272300
@mock.patch.object(logging, "getLogger")
273301
def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock):
302+
agent = mock.Mock()
274303
client = mock.Mock()
275304
config = opamp_pb2.AgentConfigMap()
276305
config.config_map["elastic"].body = json.dumps({"logging_level": "unexpected"}).encode()
277306
config.config_map["elastic"].content_type = "application/json"
278-
remote_config = opamp_pb2.AgentRemoteConfig(config=config)
307+
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
279308
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
280309

281310
with self.assertLogs(config_logger, logging.WARNING):
282-
opamp_handler(client, message)
311+
opamp_handler(agent, client, message)
312+
313+
client._build_remote_config_status_response_message.assert_called_once_with(
314+
client._update_remote_config_status()
315+
)
316+
agent.send.assert_called_once_with(payload=mock.ANY)

tests/opamp/cassettes/test_connection_heartbeat_disconnection.yaml renamed to tests/opamp/cassettes/test_connection_remote_config_status_heartbeat_disconnection.yaml

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
interactions:
22
- request:
33
body: !!binary |
4-
ChABl44aq8F1QqCWeJJQNmiKGj0KFQoMc2VydmljZS5uYW1lEgUKA2ZvbwokChtkZXBsb3ltZW50
5-
LmVudmlyb25tZW50Lm5hbWUSBQoDZm9vIINA
4+
ChABl89ktxVxE7d60nbzJJzzGj0KFQoMc2VydmljZS5uYW1lEgUKA2ZvbwokChtkZXBsb3ltZW50
5+
LmVudmlyb25tZW50Lm5hbWUSBQoDZm9vIINg
66
headers:
77
Accept:
88
- '*/*'
@@ -21,22 +21,23 @@ interactions:
2121
response:
2222
body:
2323
string: !!binary |
24-
ChABl44aq8F1QqCWeJJQNmiKGmYKOgo4CgdlbGFzdGljEi0KGXsibG9nZ2luZ19sZXZlbCI6ImRl
25-
YnVnIn0SEGFwcGxpY2F0aW9uL2pzb24SKGJmOThhY2M2NDNhMzJjNTcyZWFhYmVhNGIwZWU5ZTEw
26-
NmFkYWRiZDY4AlIA
24+
ChABl89ktxVxE7d60nbzJJzzGmYKOgo4CgdlbGFzdGljEi0KGXsibG9nZ2luZ19sZXZlbCI6ImRl
25+
YnVnIn0SEGFwcGxpY2F0aW9uL2pzb24SKGY3MzA5M2VjZDEyNjkzZGMxNDUxYWQ2MjdlZDA2MWJl
26+
ZWM5ZjU1OWM4AlIA
2727
headers:
2828
Content-Length:
2929
- '126'
3030
Content-Type:
3131
- application/x-protobuf
3232
Date:
33-
- Fri, 20 Jun 2025 16:10:02 GMT
33+
- Thu, 03 Jul 2025 08:26:13 GMT
3434
status:
3535
code: 200
3636
message: OK
3737
- request:
3838
body: !!binary |
39-
ChABl44aq8F1QqCWeJJQNmiKEAE=
39+
ChABl89ktxVxE7d60nbzJJzzEAEgg2A6LAooZjczMDkzZWNkMTI2OTNkYzE0NTFhZDYyN2VkMDYx
40+
YmVlYzlmNTU5YxAB
4041
headers:
4142
Accept:
4243
- '*/*'
@@ -45,7 +46,7 @@ interactions:
4546
Connection:
4647
- keep-alive
4748
Content-Length:
48-
- '20'
49+
- '69'
4950
Content-Type:
5051
- application/x-protobuf
5152
User-Agent:
@@ -55,48 +56,78 @@ interactions:
5556
response:
5657
body:
5758
string: !!binary |
58-
ChABl44aq8F1QqCWeJJQNmiKGmYKOgo4CgdlbGFzdGljEi0KGXsibG9nZ2luZ19sZXZlbCI6ImRl
59-
YnVnIn0SEGFwcGxpY2F0aW9uL2pzb24SKGJmOThhY2M2NDNhMzJjNTcyZWFhYmVhNGIwZWU5ZTEw
60-
NmFkYWRiZDY4AlIA
59+
ChABl89ktxVxE7d60nbzJJzzOAJSAA==
6160
headers:
6261
Content-Length:
63-
- '126'
62+
- '22'
6463
Content-Type:
6564
- application/x-protobuf
6665
Date:
67-
- Fri, 20 Jun 2025 16:10:03 GMT
66+
- Thu, 03 Jul 2025 08:26:13 GMT
6867
status:
6968
code: 200
7069
message: OK
7170
- request:
7271
body: !!binary |
73-
ChABl44aq8F1QqCWeJJQNmiKEAJKAA==
72+
ChABl89ktxVxE7d60nbzJJzzEAIgg2A=
7473
headers:
7574
Accept:
7675
- '*/*'
7776
Accept-Encoding:
7877
- gzip, deflate
7978
Connection:
8079
- keep-alive
80+
Content-Length:
81+
- '23'
82+
Content-Type:
83+
- application/x-protobuf
84+
User-Agent:
85+
- OTel-OpAMP-Python/0.0.1
86+
method: POST
87+
uri: http://localhost:4320/v1/opamp
88+
response:
89+
body:
90+
string: !!binary |
91+
ChABl89ktxVxE7d60nbzJJzzOAJSAA==
92+
headers:
8193
Content-Length:
8294
- '22'
8395
Content-Type:
8496
- application/x-protobuf
97+
Date:
98+
- Thu, 03 Jul 2025 08:26:14 GMT
99+
status:
100+
code: 200
101+
message: OK
102+
- request:
103+
body: !!binary |
104+
ChABl89ktxVxE7d60nbzJJzzEAMgg2BKAA==
105+
headers:
106+
Accept:
107+
- '*/*'
108+
Accept-Encoding:
109+
- gzip, deflate
110+
Connection:
111+
- keep-alive
112+
Content-Length:
113+
- '25'
114+
Content-Type:
115+
- application/x-protobuf
85116
User-Agent:
86117
- OTel-OpAMP-Python/0.0.1
87118
method: POST
88119
uri: http://localhost:4320/v1/opamp
89120
response:
90121
body:
91122
string: !!binary |
92-
ChABl44aq8F1QqCWeJJQNmiKOAJSAA==
123+
ChABl89ktxVxE7d60nbzJJzzOAJSAA==
93124
headers:
94125
Content-Length:
95126
- '22'
96127
Content-Type:
97128
- application/x-protobuf
98129
Date:
99-
- Fri, 20 Jun 2025 16:10:03 GMT
130+
- Thu, 03 Jul 2025 08:26:15 GMT
100131
status:
101132
code: 200
102133
message: OK

0 commit comments

Comments
 (0)