Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions gvm/protocols/gmp/_gmpnext.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Mapping, Optional, Sequence
from typing import Any, Mapping, Optional, Sequence

from gvm.protocols.gmp.requests import EntityID

Expand Down Expand Up @@ -114,28 +114,23 @@ def modify_agents(
agent_ids: list[EntityID],
*,
authorized: Optional[bool] = None,
min_interval: Optional[int] = None,
heartbeat_interval: Optional[int] = None,
schedule: Optional[str] = None,
config: Optional[Mapping[str, Any]] = None,
comment: Optional[str] = None,
) -> T:
"""Modify multiple agents
"""
Modify multiple agents.

Args:
agent_ids: List of agent UUIDs to modify
authorized: Whether the agent is authorized
min_interval: Minimum scan interval
heartbeat_interval: Interval for sending heartbeats
schedule: Cron-style schedule for agent
comment: Comment for the agents
agent_ids: List of agent UUIDs to modify.
authorized: Whether the agent is authorized.
config: Nested config for Agent Controller.
comment: Optional comment for the change.
"""
return self._send_request_and_transform_response(
Agents.modify_agents(
agent_ids=agent_ids,
authorized=authorized,
min_interval=min_interval,
heartbeat_interval=heartbeat_interval,
schedule=schedule,
config=config,
comment=comment,
)
)
Expand All @@ -150,6 +145,25 @@ def delete_agents(self, agent_ids: list[EntityID]) -> T:
Agents.delete_agents(agent_ids=agent_ids)
)

def modify_agent_control_scan_config(
self,
agent_control_id: EntityID,
config: Mapping[str, Any],
) -> T:
"""
Modify agent control scan config.

Args:
agent_control_id: The agent control UUID.
config: Nested config for Agent Controller.
"""
return self._send_request_and_transform_response(
Agents.modify_agent_control_scan_config(
agent_control_id=agent_control_id,
config=config,
)
)

def get_agent_groups(
self,
*,
Expand Down
254 changes: 237 additions & 17 deletions gvm/protocols/gmp/requests/next/_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Optional
from typing import Any, Mapping, Optional, Sequence

from gvm.errors import RequiredArgument
from gvm.protocols.core import Request
Expand All @@ -12,6 +12,153 @@


class Agents:

@staticmethod
def _add_element(element, name: str, value: Any) -> None:
"""
Helper to add a sub-element with a value if the value is not None.

Args:
element: The XML parent element to which the new element is added.
name: Name of the sub-element to create.
value: Value to set as the text of the sub-element. If None, the
element will not be created.
"""
if value is not None:
element.add_element(name, str(value))

@classmethod
def _validate_agent_config(
cls, config: Mapping[str, Any], *, caller: str
) -> None:
"""Ensure all required fields exist, are well-shaped, and non-empty."""

def valid_map(d: Any, key: str, path: str) -> Mapping[str, Any]:
if not isinstance(d, Mapping):
raise RequiredArgument(
function=caller, argument=f"config.{path.rstrip('.')}"
)
v = d.get(key)
if not isinstance(v, Mapping):
raise RequiredArgument(
function=caller, argument=f"config.{path}{key}"
)
return v

def valid_value(d: Mapping[str, Any], key: str, path: str) -> Any:
v = d.get(key)
if v is None or (isinstance(v, str) and v.strip() == ""):
raise RequiredArgument(
function=caller, argument=f"config.{path}{key}"
)
return v

# agent_control.retry
ac = valid_map(config, "agent_control", "")
retry = valid_map(ac, "retry", "agent_control.")
valid_value(retry, "attempts", "agent_control.retry.")
valid_value(retry, "delay_in_seconds", "agent_control.retry.")
valid_value(retry, "max_jitter_in_seconds", "agent_control.retry.")

# agent_script_executor
se = valid_map(config, "agent_script_executor", "")
valid_value(se, "bulk_size", "agent_script_executor.")
valid_value(se, "bulk_throttle_time_in_ms", "agent_script_executor.")
valid_value(se, "indexer_dir_depth", "agent_script_executor.")

sched = se.get("scheduler_cron_time")
if isinstance(sched, Sequence) and not isinstance(sched, (str, bytes)):
items = [str(x) for x in sched]
else:
items = []
if not items or any(not str(x).strip() for x in items):
raise RequiredArgument(
function=caller,
argument="config.agent_script_executor.scheduler_cron_time",
)

# heartbeat
hb = valid_map(config, "heartbeat", "")
valid_value(hb, "interval_in_seconds", "heartbeat.")
valid_value(hb, "miss_until_inactive", "heartbeat.")

@classmethod
def _append_agent_config(cls, parent, config: Mapping[str, Any]) -> None:
"""
Append an agent configuration block to the given XML parent element.

Expected config structure::

{
"agent_control": {
"retry": {
"attempts": 6,
"delay_in_seconds": 60,
"max_jitter_in_seconds": 10
}
},
"agent_script_executor": {
"bulk_size": 2,
"bulk_throttle_time_in_ms": 300,
"indexer_dir_depth": 100,
"scheduler_cron_time": ["0 */12 * * *"]
},
"heartbeat": {
"interval_in_seconds": 300,
"miss_until_inactive": 1
}
}

Args:
parent: The XML parent element to which the `<config>` element
should be appended.
config: Mapping containing the agent configuration fields to
serialize.
"""
xml_config = parent.add_element("config")

# agent_control.retry
ac = config["agent_control"]
retry = ac["retry"]
xml_ac = xml_config.add_element("agent_control")
xml_retry = xml_ac.add_element("retry")
cls._add_element(xml_retry, "attempts", retry.get("attempts"))
cls._add_element(
xml_retry, "delay_in_seconds", retry.get("delay_in_seconds")
)
cls._add_element(
xml_retry,
"max_jitter_in_seconds",
retry.get("max_jitter_in_seconds"),
)

# agent_script_executor
se = config["agent_script_executor"]
xml_se = xml_config.add_element("agent_script_executor")
cls._add_element(xml_se, "bulk_size", se.get("bulk_size"))
cls._add_element(
xml_se,
"bulk_throttle_time_in_ms",
se.get("bulk_throttle_time_in_ms"),
)
cls._add_element(
xml_se, "indexer_dir_depth", se.get("indexer_dir_depth")
)
sched = se.get("scheduler_cron_time")
xml_sched = xml_se.add_element("scheduler_cron_time")
for item in sched:
xml_sched.add_element("item", str(item))

# heartbeat
hb = config["heartbeat"]
xml_hb = xml_config.add_element("heartbeat")
cls._add_element(
xml_hb, "interval_in_seconds", hb.get("interval_in_seconds")
)
cls._add_element(
xml_hb, "miss_until_inactive", hb.get("miss_until_inactive")
)

@classmethod
def get_agents(
cls,
Expand Down Expand Up @@ -41,20 +188,36 @@ def modify_agents(
agent_ids: list[EntityID],
*,
authorized: Optional[bool] = None,
min_interval: Optional[int] = None,
heartbeat_interval: Optional[int] = None,
schedule: Optional[str] = None,
config: Optional[Mapping[str, Any]] = None,
comment: Optional[str] = None,
) -> Request:
"""Modify multiple agents
"""
Modify multiple agents.

Args:
agent_ids: List of agent UUIDs to modify
authorized: Whether the agent is authorized
min_interval: Minimum scan interval
heartbeat_interval: Interval for sending heartbeats
schedule: Cron-style schedule for agent
comment: Comment for the agents
agent_ids: List of agent UUIDs to modify.
authorized: Whether the agent is authorized.
config: Nested config, e.g.:
{
"agent_control": {
"retry": {
"attempts": 6,
"delay_in_seconds": 60,
"max_jitter_in_seconds": 10,
}
},
"agent_script_executor": {
"bulk_size": 2,
"bulk_throttle_time_in_ms": 300,
"indexer_dir_depth": 100,
"scheduler_cron_time": ["0 */12 * * *"], # str or list[str]
},
"heartbeat": {
"interval_in_seconds": 300,
"miss_until_inactive": 1,
},
}
comment: Optional comment for the change.
"""
if not agent_ids:
raise RequiredArgument(
Expand All @@ -69,12 +232,13 @@ def modify_agents(

if authorized is not None:
cmd.add_element("authorized", to_bool(authorized))
if min_interval is not None:
cmd.add_element("min_interval", str(min_interval))
if heartbeat_interval is not None:
cmd.add_element("heartbeat_interval", str(heartbeat_interval))
if schedule:
cmd.add_element("schedule", schedule)

if config is not None:
cls._validate_agent_config(
config, caller=cls.modify_agents.__name__
)
cls._append_agent_config(cmd, config)

if comment:
cmd.add_element("comment", comment)

Expand All @@ -99,3 +263,59 @@ def delete_agents(cls, agent_ids: list[EntityID]) -> Request:
xml_agents.add_element("agent", attrs={"id": agent_id})

return cmd

@classmethod
def modify_agent_control_scan_config(
cls,
agent_control_id: EntityID,
config: Mapping[str, Any],
) -> Request:
"""
Modify agent control scan config.

Args:
agent_control_id: The agent control UUID.
config: Nested config, e.g.:
{
"agent_control": {
"retry": {
"attempts": 6,
"delay_in_seconds": 60,
"max_jitter_in_seconds": 10,
}
},
"agent_script_executor": {
"bulk_size": 2,
"bulk_throttle_time_in_ms": 300,
"indexer_dir_depth": 100,
"scheduler_cron_time": ["0 */12 * * *"], # str or list[str]
},
"heartbeat": {
"interval_in_seconds": 300,
"miss_until_inactive": 1,
},
}
"""
if not agent_control_id:
raise RequiredArgument(
function=cls.modify_agent_control_scan_config.__name__,
argument="agent_control_id",
)
if not config:
raise RequiredArgument(
function=cls.modify_agent_control_scan_config.__name__,
argument="config",
)

cls._validate_agent_config(
config, caller=cls.modify_agent_control_scan_config.__name__
)

cmd = XmlCommand(
"modify_agent_control_scan_config",
)
cmd.set_attribute("agent_control_id", str(agent_control_id))

cls._append_agent_config(cmd, config)

return cmd
Loading