Skip to content

Commit 4cf66d1

Browse files
authored
config: add handling of ReportFullState ServerToAgent flag (#388)
* config: add handling of ReportFullState ServerToAgent flag This also introduces a very simplified handling of effective config * Add some notes on how to record e2e tests * Add missing cassette * Apply suggestions from code review
1 parent 09a02c5 commit 4cf66d1

File tree

8 files changed

+646
-35
lines changed

8 files changed

+646
-35
lines changed

src/elasticotel/distro/config.py

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
from __future__ import annotations
18+
1719
import logging
20+
from dataclasses import dataclass
1821

1922
from opentelemetry import trace
2023

@@ -27,7 +30,7 @@
2730

2831
logger = logging.getLogger(__name__)
2932

30-
_LOG_LEVELS_MAP = {
33+
_LOG_LEVELS_MAP: dict[str, int] = {
3134
"trace": 5,
3235
"debug": logging.DEBUG,
3336
"info": logging.INFO,
@@ -38,16 +41,40 @@
3841
}
3942

4043
DEFAULT_SAMPLING_RATE = 1.0
44+
DEFAULT_LOGGING_LEVEL = "info"
45+
46+
LOGGING_LEVEL_CONFIG_KEY = "logging_level"
47+
SAMPLING_RATE_CONFIG_KEY = "sampling_rate"
48+
49+
50+
@dataclass
51+
class ConfigItem:
52+
value: str
53+
54+
55+
@dataclass
56+
class ConfigUpdate:
57+
error_message: str = ""
58+
59+
60+
# TODO: this should grow into a proper configuration store initialized from env vars and so on
61+
@dataclass
62+
class Config:
63+
sampling_rate = ConfigItem(value=str(DEFAULT_SAMPLING_RATE))
64+
logging_level = ConfigItem(value=DEFAULT_LOGGING_LEVEL)
65+
66+
def to_dict(self):
67+
return {LOGGING_LEVEL_CONFIG_KEY: self.logging_level.value, SAMPLING_RATE_CONFIG_KEY: self.sampling_rate.value}
68+
4169

70+
_config = Config()
4271

43-
def _handle_logging_level(config) -> str:
44-
error_message = ""
72+
73+
def _handle_logging_level(remote_config) -> ConfigUpdate:
74+
_config = _get_config()
4575
# when config option has default value you don't get it so need to handle the default
46-
config_logging_level = config.get("logging_level")
47-
if config_logging_level is not None:
48-
logging_level = _LOG_LEVELS_MAP.get(config_logging_level) # type: ignore[reportArgumentType]
49-
else:
50-
logging_level = logging.INFO
76+
config_logging_level = remote_config.get(LOGGING_LEVEL_CONFIG_KEY, DEFAULT_LOGGING_LEVEL)
77+
logging_level = _LOG_LEVELS_MAP.get(config_logging_level)
5178

5279
if logging_level is None:
5380
logger.error("Logging level not handled: %s", config_logging_level)
@@ -56,11 +83,14 @@ def _handle_logging_level(config) -> str:
5683
# update upstream and distro logging levels
5784
logging.getLogger("opentelemetry").setLevel(logging_level)
5885
logging.getLogger("elasticotel").setLevel(logging_level)
59-
return error_message
86+
_config.logging_level = ConfigItem(value=config_logging_level)
87+
error_message = ""
88+
return ConfigUpdate(error_message=error_message)
6089

6190

62-
def _handle_sampling_rate(config) -> str:
63-
config_sampling_rate = config.get("sampling_rate")
91+
def _handle_sampling_rate(remote_config) -> ConfigUpdate:
92+
_config = _get_config()
93+
config_sampling_rate = remote_config.get(SAMPLING_RATE_CONFIG_KEY, str(DEFAULT_SAMPLING_RATE))
6494
sampling_rate = DEFAULT_SAMPLING_RATE
6595
if config_sampling_rate is not None:
6696
try:
@@ -69,17 +99,17 @@ def _handle_sampling_rate(config) -> str:
6999
raise ValueError()
70100
except ValueError:
71101
logger.error("Invalid `sampling_rate` from config `%s`", config_sampling_rate)
72-
return f"Invalid sampling_rate {config_sampling_rate}"
102+
return ConfigUpdate(error_message=f"Invalid sampling_rate {config_sampling_rate}")
73103

74104
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
75105
if sampler is None:
76106
logger.debug("Cannot get sampler from tracer provider.")
77-
return ""
107+
return ConfigUpdate()
78108

79109
# FIXME: this needs to be updated for the consistent probability samplers
80110
if not isinstance(sampler, ParentBasedTraceIdRatio):
81111
logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler))
82-
return ""
112+
return ConfigUpdate()
83113

84114
# since sampler is parent based we need to update its root sampler
85115
root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue]
@@ -88,32 +118,55 @@ def _handle_sampling_rate(config) -> str:
88118
root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue]
89119
root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue]
90120
logger.debug("Updated sampler rate to %s", sampling_rate)
91-
return ""
121+
_config.sampling_rate = ConfigItem(value=config_sampling_rate)
122+
return ConfigUpdate()
123+
124+
125+
def _report_full_state(message: opamp_pb2.ServerToAgent):
126+
return message.flags & opamp_pb2.ServerToAgentFlags_ReportFullState
127+
128+
129+
def _get_config():
130+
global _config
131+
return _config
92132

93133

94134
def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent):
135+
# server wants us to report full state as it cannot recognize us as agent because
136+
# e.g it may have been restarted and lost state.
137+
if _report_full_state(message):
138+
# here we're not returning explicitly but usually we don't get a remote config when we get the flag set
139+
payload = client._build_full_state_message()
140+
agent.send(payload=payload)
141+
95142
# we check config_hash because we need to track last received config and remote_config seems to be always truthy
96143
if not message.remote_config or not message.remote_config.config_hash:
97144
return
98145

146+
_config = _get_config()
99147
error_messages = []
100-
for config_filename, config in messages._decode_remote_config(message.remote_config):
148+
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
101149
# we don't have standardized config values so limit to configs coming from our backend
102150
if config_filename == "elastic":
103-
logger.debug("Config %s: %s", config_filename, config)
104-
error_message = _handle_logging_level(config)
105-
if error_message:
106-
error_messages.append(error_message)
151+
logger.debug("Config %s: %s", config_filename, remote_config)
152+
config_update = _handle_logging_level(remote_config)
153+
if config_update.error_message:
154+
error_messages.append(config_update.error_message)
107155

108-
error_message = _handle_sampling_rate(config)
109-
if error_message:
110-
error_messages.append(error_message)
156+
config_update = _handle_sampling_rate(remote_config)
157+
if config_update.error_message:
158+
error_messages.append(config_update.error_message)
111159

112160
error_message = "\n".join(error_messages)
113161
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
114162
updated_remote_config = client._update_remote_config_status(
115163
remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message
116164
)
165+
166+
# update the cached effective config with what we updated
167+
effective_config = {"elastic": _config.to_dict()}
168+
client._update_effective_config(effective_config)
169+
117170
# if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response
118171
if updated_remote_config is not None:
119172
payload = client._build_remote_config_status_response_message(updated_remote_config)

src/opentelemetry/_opamp/client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsHeartbeat
4848
| opamp_pb2.AgentCapabilities.AgentCapabilities_AcceptsRemoteConfig
4949
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsRemoteConfig
50+
| opamp_pb2.AgentCapabilities.AgentCapabilities_ReportsEffectiveConfig
5051
)
5152

5253

@@ -74,6 +75,7 @@ def __init__(
7475
self._sequence_num: int = 0
7576
self._instance_uid: bytes = uuid7().bytes
7677
self._remote_config_status: opamp_pb2.RemoteConfigStatus | None = None
78+
self._effective_config: opamp_pb2.EffectiveConfig | None = None
7779

7880
def _build_connection_message(self) -> bytes:
7981
message = messages._build_presentation_message(
@@ -101,6 +103,10 @@ def _build_heartbeat_message(self) -> bytes:
101103
data = messages._encode_message(message)
102104
return data
103105

106+
def _update_effective_config(self, effective_config: dict[str, dict[str, str]]) -> opamp_pb2.EffectiveConfig:
107+
self._effective_config = messages._build_effective_config_message(effective_config)
108+
return self._effective_config
109+
104110
def _update_remote_config_status(
105111
self, remote_config_hash: bytes, status: opamp_pb2.RemoteConfigStatuses.ValueType, error_message: str = ""
106112
) -> opamp_pb2.RemoteConfigStatus | None:
@@ -132,6 +138,18 @@ def _build_remote_config_status_response_message(self, remote_config_status: opa
132138
data = messages._encode_message(message)
133139
return data
134140

141+
def _build_full_state_message(self) -> bytes:
142+
message = messages._build_full_state_message(
143+
instance_uid=self._instance_uid,
144+
agent_description=self._agent_description,
145+
remote_config_status=self._remote_config_status,
146+
sequence_num=self._sequence_num,
147+
effective_config=self._effective_config,
148+
capabilities=_HANDLED_CAPABILITIES,
149+
)
150+
data = messages._encode_message(message)
151+
return data
152+
135153
def _send(self, data: bytes):
136154
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
137155
try:

src/opentelemetry/_opamp/messages.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,36 @@ def _build_remote_config_status_response_message(
115115
return command
116116

117117

118+
def _build_effective_config_message(config: dict[str, dict[str, str]]):
119+
agent_config_map = opamp_pb2.AgentConfigMap()
120+
for filename, value in config.items():
121+
body = json.dumps(value)
122+
agent_config_map.config_map[filename].body = body.encode("utf-8")
123+
agent_config_map.config_map[filename].content_type = "application/json"
124+
return opamp_pb2.EffectiveConfig(
125+
config_map=agent_config_map,
126+
)
127+
128+
129+
def _build_full_state_message(
130+
instance_uid: bytes,
131+
sequence_num: int,
132+
agent_description: opamp_pb2.AgentDescription,
133+
capabilities: int,
134+
remote_config_status: opamp_pb2.RemoteConfigStatus | None,
135+
effective_config: opamp_pb2.EffectiveConfig | None,
136+
) -> opamp_pb2.AgentToServer:
137+
command = opamp_pb2.AgentToServer(
138+
instance_uid=instance_uid,
139+
sequence_num=sequence_num,
140+
agent_description=agent_description,
141+
remote_config_status=remote_config_status,
142+
effective_config=effective_config,
143+
capabilities=capabilities,
144+
)
145+
return command
146+
147+
118148
def _encode_message(data: opamp_pb2.AgentToServer) -> bytes:
119149
return data.SerializeToString()
120150

0 commit comments

Comments
 (0)