diff --git a/gvm/protocols/gmp/_gmpnext.py b/gvm/protocols/gmp/_gmpnext.py index eb57fb99..dec6654c 100644 --- a/gvm/protocols/gmp/_gmpnext.py +++ b/gvm/protocols/gmp/_gmpnext.py @@ -2,13 +2,21 @@ # # SPDX-License-Identifier: GPL-3.0-or-later -from typing import Optional +from typing import Mapping, Optional, Sequence from gvm.protocols.gmp.requests import EntityID +from ...utils import SupportsStr from .._protocol import T from ._gmp227 import GMPv227 -from .requests.next import AgentGroups, AgentInstallers, Agents, OCIImageTargets +from .requests.next import ( + AgentGroups, + AgentInstallers, + Agents, + OCIImageTargets, + Tasks, +) +from .requests.v224 import HostsOrdering class GMPNext(GMPv227[T]): @@ -392,3 +400,281 @@ def get_oci_image_targets( tasks=tasks, ) ) + + def clone_task(self, task_id: EntityID) -> T: + """Clone an existing task + + Args: + task_id: UUID of existing task to clone from + """ + return self._send_request_and_transform_response( + Tasks.clone_task(task_id) + ) + + def create_agent_group_task( + self, + name: str, + agent_group_id: EntityID, + scanner_id: EntityID, + *, + comment: Optional[str] = None, + alterable: Optional[bool] = None, + schedule_id: Optional[EntityID] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + schedule_periods: Optional[int] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> T: + """Create a new scan task using an agent group. + + Args: + name: Name of the new task. + agent_group_id: UUID of the agent group to be scanned. + scanner_id: UUID of scanner to use for scanning the agents. + comment: Optional comment for the task. + alterable: Whether the task should be alterable. + alert_ids: List of UUIDs for alerts to be applied to the task. + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: Limit to number of scheduled runs, 0 for unlimited. + observers: List of usernames or IDs allowed to observe the task. + preferences: Scanner preferences as name/value pairs. + """ + return self._send_request_and_transform_response( + Tasks.create_agent_group_task( + name=name, + agent_group_id=agent_group_id, + scanner_id=scanner_id, + comment=comment, + alterable=alterable, + schedule_id=schedule_id, + alert_ids=alert_ids, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + ) + + def create_container_task( + self, name: str, *, comment: Optional[str] = None + ) -> T: + """Create a new container task + + A container task is a "meta" task to import and view reports from other + systems. + + Args: + name: Name of the task + comment: Comment for the task + """ + return self._send_request_and_transform_response( + Tasks.create_container_task(name=name, comment=comment) + ) + + def create_task( + self, + name: str, + config_id: EntityID, + target_id: EntityID, + scanner_id: EntityID, + *, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[EntityID] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + comment: Optional[str] = None, + schedule_periods: Optional[int] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> T: + """Create a new scan task + + Args: + name: Name of the new task + config_id: UUID of config to use by the task + target_id: UUID of target to be scanned + scanner_id: UUID of scanner to use for scanning the target + comment: Comment for the task + alterable: Whether the task should be alterable + alert_ids: List of UUIDs for alerts to be applied to the task + hosts_ordering: The order hosts are scanned in + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: A limit to the number of times the task will be + scheduled, or 0 for no limit + observers: List of names or ids of users which should be allowed to + observe this task + preferences: Name/Value pairs of scanner preferences. + """ + return self._send_request_and_transform_response( + Tasks.create_task( + name=name, + config_id=config_id, + target_id=target_id, + scanner_id=scanner_id, + alterable=alterable, + hosts_ordering=hosts_ordering, + schedule_id=schedule_id, + alert_ids=alert_ids, + comment=comment, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + ) + + def delete_task( + self, task_id: EntityID, *, ultimate: Optional[bool] = False + ) -> T: + """Deletes an existing task + + Args: + task_id: UUID of the task to be deleted. + ultimate: Whether to remove entirely, or to the trashcan. + """ + return self._send_request_and_transform_response( + Tasks.delete_task(task_id=task_id, ultimate=ultimate) + ) + + def get_tasks( + self, + *, + filter_string: Optional[str] = None, + filter_id: Optional[EntityID] = None, + trash: Optional[bool] = None, + details: Optional[bool] = None, + schedules_only: Optional[bool] = None, + ignore_pagination: Optional[bool] = None, + ) -> T: + """Request a list of tasks + + Args: + filter_string: Filter term to use for the query + filter_id: UUID of an existing filter to use for the query + trash: Whether to get the trashcan tasks instead + details: Whether to include full task details + schedules_only: Whether to only include id, name and schedule + details + ignore_pagination: Whether to ignore pagination settings (filter + terms "first" and "rows"). Default is False. + """ + return self._send_request_and_transform_response( + Tasks.get_tasks( + filter_string=filter_string, + filter_id=filter_id, + trash=trash, + details=details, + schedules_only=schedules_only, + ignore_pagination=ignore_pagination, + ) + ) + + def get_task(self, task_id: EntityID) -> T: + """Request a single task + + Args: + task_id: UUID of an existing task + """ + return self._send_request_and_transform_response( + Tasks.get_task(task_id=task_id) + ) + + def modify_task( + self, + task_id: EntityID, + *, + name: Optional[str] = None, + config_id: Optional[EntityID] = None, + target_id: Optional[EntityID] = None, + scanner_id: Optional[EntityID] = None, + agent_group_id: Optional[EntityID] = None, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[EntityID] = None, + schedule_periods: Optional[int] = None, + comment: Optional[str] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> T: + """Modifies an existing task. + + Args: + task_id: UUID of task to modify. + name: The name of the task. + config_id: UUID of scan config to use by the task + target_id: UUID of target to be scanned + scanner_id: UUID of scanner to use for scanning the target + agent_group_id: UUID of agent group to use for scanning + comment: The comment on the task. + alert_ids: List of UUIDs for alerts to be applied to the task + hosts_ordering: The order hosts are scanned in + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: A limit to the number of times the task will be + scheduled, or 0 for no limit. + observers: List of names or ids of users which should be allowed to + observe this task + preferences: Name/Value pairs of scanner preferences. + """ + return self._send_request_and_transform_response( + Tasks.modify_task( + task_id=task_id, + name=name, + config_id=config_id, + target_id=target_id, + scanner_id=scanner_id, + agent_group_id=agent_group_id, + alterable=alterable, + hosts_ordering=hosts_ordering, + schedule_id=schedule_id, + alert_ids=alert_ids, + comment=comment, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + ) + + def move_task( + self, task_id: EntityID, *, slave_id: Optional[EntityID] = None + ) -> T: + """Move an existing task to another GMP slave scanner or the master + + Args: + task_id: UUID of the task to be moved + slave_id: UUID of the sensor to reassign the task to, empty for master. + """ + return self._send_request_and_transform_response( + Tasks.move_task( + task_id=task_id, + slave_id=slave_id, + ) + ) + + def start_task(self, task_id: EntityID) -> T: + """Start an existing task + + Args: + task_id: UUID of the task to be started + """ + return self._send_request_and_transform_response( + Tasks.start_task(task_id=task_id) + ) + + def resume_task(self, task_id: EntityID) -> T: + """Resume an existing stopped task + + Args: + task_id: UUID of the task to be resumed + """ + return self._send_request_and_transform_response( + Tasks.resume_task(task_id=task_id) + ) + + def stop_task(self, task_id: EntityID) -> T: + """Stop an existing running task + + Args: + task_id: UUID of the task to be stopped + """ + return self._send_request_and_transform_response( + Tasks.stop_task(task_id=task_id) + ) diff --git a/gvm/protocols/gmp/requests/next/__init__.py b/gvm/protocols/gmp/requests/next/__init__.py index 2d1e7184..109199a8 100644 --- a/gvm/protocols/gmp/requests/next/__init__.py +++ b/gvm/protocols/gmp/requests/next/__init__.py @@ -6,6 +6,7 @@ from gvm.protocols.gmp.requests.next._agent_installers import AgentInstallers from gvm.protocols.gmp.requests.next._agents import Agents from gvm.protocols.gmp.requests.next._oci_image_targets import OCIImageTargets +from gvm.protocols.gmp.requests.next._tasks import Tasks from .._entity_id import EntityID from .._version import Version @@ -68,7 +69,6 @@ SystemReports, Tags, Targets, - Tasks, Tickets, TicketStatus, TLSCertificates, diff --git a/gvm/protocols/gmp/requests/next/_tasks.py b/gvm/protocols/gmp/requests/next/_tasks.py new file mode 100644 index 00000000..6e81e1ee --- /dev/null +++ b/gvm/protocols/gmp/requests/next/_tasks.py @@ -0,0 +1,524 @@ +# SPDX-FileCopyrightText: 2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from numbers import Integral +from typing import Mapping, Optional, Sequence + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.core import Request +from gvm.utils import SupportsStr, to_bool, to_comma_list +from gvm.xml import XmlCommand + +from .._entity_id import EntityID +from ..v224 import HostsOrdering + + +class Tasks: + + @classmethod + def clone_task(cls, task_id: EntityID) -> Request: + """Clone an existing task + + Args: + task_id: UUID of existing task to clone from + """ + if not task_id: + raise RequiredArgument( + function=cls.clone_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("create_task") + cmd.add_element("copy", str(task_id)) + return cmd + + @classmethod + def create_agent_group_task( + cls, + name: str, + agent_group_id: EntityID, + scanner_id: EntityID, + *, + comment: Optional[str] = None, + alterable: Optional[bool] = None, + schedule_id: Optional[EntityID] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + schedule_periods: Optional[int] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> Request: + """Create a new scan task using an agent group. + + Args: + name: Name of the new task. + agent_group_id: UUID of the agent group to be scanned. + scanner_id: UUID of scanner to use for scanning the agents. + comment: Optional comment for the task. + alterable: Whether the task should be alterable. + alert_ids: List of UUIDs for alerts to be applied to the task. + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: Limit to number of scheduled runs, 0 for unlimited. + observers: List of usernames or IDs allowed to observe the task. + preferences: Scanner preferences as name/value pairs. + """ + if not name: + raise RequiredArgument( + function=cls.create_agent_group_task.__name__, argument="name" + ) + + if not agent_group_id: + raise RequiredArgument( + function=cls.create_agent_group_task.__name__, + argument="agent_group_id", + ) + + if not scanner_id: + raise RequiredArgument( + function=cls.create_agent_group_task.__name__, + argument="scanner_id", + ) + + cmd = XmlCommand("create_task") + cmd.add_element("name", name) + cmd.add_element("usage_type", "scan") + cmd.add_element("agent_group", attrs={"id": str(agent_group_id)}) + cmd.add_element("scanner", attrs={"id": str(scanner_id)}) + + if comment: + cmd.add_element("comment", comment) + + if alterable is not None: + cmd.add_element("alterable", to_bool(alterable)) + + if alert_ids: + for alert in alert_ids: + cmd.add_element("alert", attrs={"id": str(alert)}) + + if schedule_id: + cmd.add_element("schedule", attrs={"id": str(schedule_id)}) + + if schedule_periods is not None: + if ( + not isinstance(schedule_periods, Integral) + or schedule_periods < 0 + ): + raise InvalidArgument( + "schedule_periods must be an integer greater or equal than 0" + ) + cmd.add_element("schedule_periods", str(schedule_periods)) + + if observers: + cmd.add_element("observers", to_comma_list(observers)) + + if preferences is not None: + xml_prefs = cmd.add_element("preferences") + for pref_name, pref_value in preferences.items(): + xml_pref = xml_prefs.add_element("preference") + xml_pref.add_element("scanner_name", pref_name) + xml_pref.add_element("value", str(pref_value)) + + return cmd + + @classmethod + def create_container_task( + cls, name: str, *, comment: Optional[str] = None + ) -> Request: + """Create a new container task + + A container task is a "meta" task to import and view reports from other + systems. + + Args: + name: Name of the task + comment: Comment for the task + """ + if not name: + raise RequiredArgument( + function=cls.create_container_task.__name__, argument="name" + ) + + cmd = XmlCommand("create_task") + cmd.add_element("name", name) + cmd.add_element("target", attrs={"id": "0"}) + + if comment: + cmd.add_element("comment", comment) + + return cmd + + @classmethod + def create_task( + cls, + name: str, + config_id: EntityID, + target_id: EntityID, + scanner_id: EntityID, + *, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[EntityID] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + comment: Optional[str] = None, + schedule_periods: Optional[int] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> Request: + """Create a new scan task + + Args: + name: Name of the new task + config_id: UUID of config to use by the task + target_id: UUID of target to be scanned + scanner_id: UUID of scanner to use for scanning the target + comment: Comment for the task + alterable: Whether the task should be alterable + alert_ids: List of UUIDs for alerts to be applied to the task + hosts_ordering: The order hosts are scanned in + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: A limit to the number of times the task will be + scheduled, or 0 for no limit + observers: List of names or ids of users which should be allowed to + observe this task + preferences: Name/Value pairs of scanner preferences. + """ + if not name: + raise RequiredArgument( + function=cls.create_task.__name__, argument="name" + ) + + if not config_id: + raise RequiredArgument( + function=cls.create_task.__name__, argument="config_id" + ) + + if not target_id: + raise RequiredArgument( + function=cls.create_task.__name__, argument="target_id" + ) + + if not scanner_id: + raise RequiredArgument( + function=cls.create_task.__name__, argument="scanner_id" + ) + + # don't allow to create a container task with create_task + if target_id == "0": + raise InvalidArgument( + function=cls.create_task.__name__, argument="target_id" + ) + + cmd = XmlCommand("create_task") + cmd.add_element("name", name) + cmd.add_element("usage_type", "scan") + cmd.add_element("config", attrs={"id": str(config_id)}) + cmd.add_element("target", attrs={"id": str(target_id)}) + cmd.add_element("scanner", attrs={"id": str(scanner_id)}) + + if comment: + cmd.add_element("comment", comment) + + if alterable is not None: + cmd.add_element("alterable", to_bool(alterable)) + + if hosts_ordering: + if not isinstance(hosts_ordering, HostsOrdering): + hosts_ordering = HostsOrdering(hosts_ordering) + cmd.add_element("hosts_ordering", hosts_ordering.value) + + if alert_ids: + for alert in alert_ids: + cmd.add_element("alert", attrs={"id": str(alert)}) + + if schedule_id: + cmd.add_element("schedule", attrs={"id": str(schedule_id)}) + + if schedule_periods is not None: + if ( + not isinstance(schedule_periods, Integral) + or schedule_periods < 0 + ): + raise InvalidArgument( + "schedule_periods must be an integer greater or equal " + "than 0" + ) + cmd.add_element("schedule_periods", str(schedule_periods)) + + if observers: + # gvmd splits by comma and space + # gvmd tries to lookup each value as user name and afterwards as + # user id. So both user name and user id are possible + cmd.add_element("observers", to_comma_list(observers)) + + if preferences is not None: + xml_prefs = cmd.add_element("preferences") + for pref_name, pref_value in preferences.items(): + xml_pref = xml_prefs.add_element("preference") + xml_pref.add_element("scanner_name", pref_name) + xml_pref.add_element("value", str(pref_value)) + + return cmd + + @classmethod + def delete_task( + cls, task_id: EntityID, *, ultimate: Optional[bool] = False + ) -> Request: + """Deletes an existing task + + Args: + task_id: UUID of the task to be deleted. + ultimate: Whether to remove entirely, or to the trashcan. + """ + if not task_id: + raise RequiredArgument( + function=cls.delete_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("delete_task") + cmd.set_attribute("task_id", str(task_id)) + cmd.set_attribute("ultimate", to_bool(ultimate)) + + return cmd + + @staticmethod + def get_tasks( + *, + filter_string: Optional[str] = None, + filter_id: Optional[EntityID] = None, + trash: Optional[bool] = None, + details: Optional[bool] = None, + schedules_only: Optional[bool] = None, + ignore_pagination: Optional[bool] = None, + ) -> Request: + """Request a list of tasks + + Args: + filter_string: Filter term to use for the query + filter_id: UUID of an existing filter to use for the query + trash: Whether to get the trashcan tasks instead + details: Whether to include full task details + schedules_only: Whether to only include id, name and schedule + details + ignore_pagination: Whether to ignore pagination settings (filter + terms "first" and "rows"). Default is False. + """ + cmd = XmlCommand("get_tasks") + cmd.set_attribute("usage_type", "scan") + + cmd.add_filter(filter_string, filter_id) + + if trash is not None: + cmd.set_attribute("trash", to_bool(trash)) + + if details is not None: + cmd.set_attribute("details", to_bool(details)) + + if schedules_only is not None: + cmd.set_attribute("schedules_only", to_bool(schedules_only)) + + if ignore_pagination is not None: + cmd.set_attribute("ignore_pagination", to_bool(ignore_pagination)) + + return cmd + + @classmethod + def get_task(cls, task_id: EntityID) -> Request: + """Request a single task + + Args: + task_id: UUID of an existing task + """ + if not task_id: + raise RequiredArgument( + function=cls.get_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("get_tasks") + cmd.set_attribute("task_id", str(task_id)) + cmd.set_attribute("usage_type", "scan") + + # for single entity always request all details + cmd.set_attribute("details", "1") + return cmd + + @classmethod + def modify_task( + cls, + task_id: EntityID, + *, + name: Optional[str] = None, + config_id: Optional[EntityID] = None, + target_id: Optional[EntityID] = None, + scanner_id: Optional[EntityID] = None, + agent_group_id: Optional[EntityID] = None, + alterable: Optional[bool] = None, + hosts_ordering: Optional[HostsOrdering] = None, + schedule_id: Optional[EntityID] = None, + schedule_periods: Optional[int] = None, + comment: Optional[str] = None, + alert_ids: Optional[Sequence[EntityID]] = None, + observers: Optional[Sequence[str]] = None, + preferences: Optional[Mapping[str, SupportsStr]] = None, + ) -> Request: + """Modifies an existing task. + + Args: + task_id: UUID of task to modify. + name: The name of the task. + config_id: UUID of scan config to use by the task + target_id: UUID of target to be scanned + scanner_id: UUID of scanner to use for scanning the target + agent_group_id: UUID of agent group to use for scanning + comment: The comment on the task. + alert_ids: List of UUIDs for alerts to be applied to the task + hosts_ordering: The order hosts are scanned in + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: A limit to the number of times the task will be + scheduled, or 0 for no limit. + observers: List of names or ids of users which should be allowed to + observe this task + preferences: Name/Value pairs of scanner preferences. + """ + if not task_id: + raise RequiredArgument( + function=cls.modify_task.__name__, argument="task_id" + ) + + if target_id and agent_group_id: + raise InvalidArgument( + function=cls.modify_task.__name__, + argument="target_id/agent_group_id", + message="Only one of target_id or agent_group_id can be modified at a time", + ) + + cmd = XmlCommand("modify_task") + cmd.set_attribute("task_id", str(task_id)) + + if name: + cmd.add_element("name", name) + + if comment: + cmd.add_element("comment", comment) + + if config_id: + cmd.add_element("config", attrs={"id": str(config_id)}) + + if target_id: + cmd.add_element("target", attrs={"id": str(target_id)}) + + if agent_group_id: + cmd.add_element("agent_group", attrs={"id": str(agent_group_id)}) + + if alterable is not None: + cmd.add_element("alterable", to_bool(alterable)) + + if hosts_ordering: + if not isinstance(hosts_ordering, HostsOrdering): + hosts_ordering = HostsOrdering(hosts_ordering) + cmd.add_element("hosts_ordering", hosts_ordering.value) + + if scanner_id: + cmd.add_element("scanner", attrs={"id": str(scanner_id)}) + + if schedule_id: + cmd.add_element("schedule", attrs={"id": str(schedule_id)}) + + if schedule_periods is not None: + if ( + not isinstance(schedule_periods, Integral) + or schedule_periods < 0 + ): + raise InvalidArgument( + "schedule_periods must be an integer greater or equal " + "than 0" + ) + cmd.add_element("schedule_periods", str(schedule_periods)) + + if alert_ids is not None: + if len(alert_ids) == 0: + cmd.add_element("alert", attrs={"id": "0"}) + else: + for alert in alert_ids: + cmd.add_element("alert", attrs={"id": str(alert)}) + + if observers is not None: + cmd.add_element("observers", to_comma_list(observers)) + + if preferences is not None: + xml_prefs = cmd.add_element("preferences") + for pref_name, pref_value in preferences.items(): + xml_pref = xml_prefs.add_element("preference") + xml_pref.add_element("scanner_name", pref_name) + xml_pref.add_element("value", str(pref_value)) + + return cmd + + @classmethod + def move_task( + cls, task_id: EntityID, *, slave_id: Optional[EntityID] = None + ) -> Request: + """Move an existing task to another GMP slave scanner or the master + + Args: + task_id: UUID of the task to be moved + slave_id: UUID of the sensor to reassign the task to, empty for master. + """ + if not task_id: + raise RequiredArgument( + function=cls.move_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("move_task") + cmd.set_attribute("task_id", str(task_id)) + + if slave_id is not None: + cmd.set_attribute("slave_id", str(slave_id)) + + return cmd + + @classmethod + def start_task(cls, task_id: EntityID) -> Request: + """Start an existing task + + Args: + task_id: UUID of the task to be started + """ + if not task_id: + raise RequiredArgument( + function=cls.start_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("start_task") + cmd.set_attribute("task_id", str(task_id)) + return cmd + + @classmethod + def resume_task(cls, task_id: EntityID) -> Request: + """Resume an existing stopped task + + Args: + task_id: UUID of the task to be resumed + """ + if not task_id: + raise RequiredArgument( + function=cls.resume_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("resume_task") + cmd.set_attribute("task_id", str(task_id)) + return cmd + + @classmethod + def stop_task(cls, task_id: EntityID) -> Request: + """Stop an existing running task + + Args: + task_id: UUID of the task to be stopped + """ + if not task_id: + raise RequiredArgument( + function=cls.stop_task.__name__, argument="task_id" + ) + + cmd = XmlCommand("stop_task") + cmd.set_attribute("task_id", str(task_id)) + return cmd diff --git a/tests/protocols/gmpnext/entities/tasks/__init__.py b/tests/protocols/gmpnext/entities/tasks/__init__.py new file mode 100644 index 00000000..36374649 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/__init__.py @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: 2021-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from .test_clone_task import GmpCloneTaskTestMixin +from .test_create_agent_group_task import GmpCreateAgentGroupTaskTestMixin +from .test_create_container_task import GmpCreateContainerTaskTestMixin +from .test_create_task import GmpCreateTaskTestMixin +from .test_delete_task import GmpDeleteTaskTestMixin +from .test_get_task import GmpGetTaskTestMixin +from .test_get_tasks import GmpGetTasksTestMixin +from .test_modify_task import GmpModifyTaskTestMixin +from .test_move_task import GmpMoveTaskTestMixin +from .test_resume_task import GmpResumeTaskTestMixin +from .test_start_task import GmpStartTaskTestMixin +from .test_stop_task import GmpStopTaskTestMixin + +__all__ = ( + "GmpCloneTaskTestMixin", + "GmpCreateAgentGroupTaskTestMixin", + "GmpCreateContainerTaskTestMixin", + "GmpCreateTaskTestMixin", + "GmpDeleteTaskTestMixin", + "GmpGetTaskTestMixin", + "GmpGetTasksTestMixin", + "GmpModifyTaskTestMixin", + "GmpMoveTaskTestMixin", + "GmpResumeTaskTestMixin", + "GmpStartTaskTestMixin", + "GmpStopTaskTestMixin", +) diff --git a/tests/protocols/gmpnext/entities/tasks/test_clone_task.py b/tests/protocols/gmpnext/entities/tasks/test_clone_task.py new file mode 100644 index 00000000..a32012d0 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_clone_task.py @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import RequiredArgument + + +class GmpCloneTaskTestMixin: + def test_clone(self): + self.gmp.clone_task("a1") + + self.connection.send.has_been_called_with( + b"a1" + ) + + def test_missing_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.clone_task("") + + with self.assertRaises(RequiredArgument): + self.gmp.clone_task(None) diff --git a/tests/protocols/gmpnext/entities/tasks/test_create_agent_group_task.py b/tests/protocols/gmpnext/entities/tasks/test_create_agent_group_task.py new file mode 100644 index 00000000..5bc78681 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_create_agent_group_task.py @@ -0,0 +1,213 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from collections import OrderedDict + +from gvm.errors import InvalidArgument, RequiredArgument + + +class GmpCreateAgentGroupTaskTestMixin: + def test_create_agent_group_task(self): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="ag1", scanner_id="s1" + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + ) + + def test_create_agent_group_task_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_agent_group_task( + name=None, agent_group_id="ag1", scanner_id="s1" + ) + with self.assertRaises(RequiredArgument): + self.gmp.create_agent_group_task( + name="", agent_group_id="ag1", scanner_id="s1" + ) + + def test_create_agent_group_task_missing_agent_group_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_agent_group_task( + name="foo", agent_group_id=None, scanner_id="s1" + ) + with self.assertRaises(RequiredArgument): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="", scanner_id="s1" + ) + + def test_create_agent_group_task_missing_scanner_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="ag1", scanner_id=None + ) + with self.assertRaises(RequiredArgument): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="ag1", scanner_id="" + ) + + def test_create_agent_group_task_with_comment(self): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + comment="my comment", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"my comment" + b"" + ) + + def test_create_agent_group_task_with_alerts(self): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + alert_ids=["a1", "a2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"" + ) + + def test_create_agent_group_task_with_empty_alerts(self): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="ag1", scanner_id="s1", alert_ids=[] + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + ) + + def test_create_agent_group_task_with_schedule(self): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="ag1", scanner_id="s1", schedule_id="s1" + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"" + ) + + def test_create_agent_group_task_with_schedule_periods(self): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=5, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"5" + b"" + ) + + def test_create_agent_group_task_with_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + schedule_id="s1", + schedule_periods="invalid", + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=-1, + ) + + def test_create_agent_group_task_with_alterable(self): + self.gmp.create_agent_group_task( + name="foo", agent_group_id="ag1", scanner_id="s1", alterable=True + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"1" + b"" + ) + + def test_create_agent_group_task_with_observers(self): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + observers=["u1", "u2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"u1,u2" + b"" + ) + + def test_create_agent_group_task_with_preferences(self): + self.gmp.create_agent_group_task( + name="foo", + agent_group_id="ag1", + scanner_id="s1", + preferences=OrderedDict([("pref1", "val1"), ("pref2", "val2")]), + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + b"pref1val1" + b"pref2val2" + b"" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_create_container_task.py b/tests/protocols/gmpnext/entities/tasks/test_create_container_task.py new file mode 100644 index 00000000..2240de5b --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_create_container_task.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import RequiredArgument + + +class GmpCreateContainerTaskTestMixin: + def test_create_task(self): + self.gmp.create_container_task(name="foo") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b'' + b"" + ) + + def test_create_task_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_container_task(name=None) + + with self.assertRaises(RequiredArgument): + self.gmp.create_container_task(name="") + + def test_create_task_with_comment(self): + self.gmp.create_container_task(name="foo", comment="bar") + + self.connection.send.has_been_called_with( + b"" + b"foo" + b'' + b"bar" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_create_task.py b/tests/protocols/gmpnext/entities/tasks/test_create_task.py new file mode 100644 index 00000000..f66205b1 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_create_task.py @@ -0,0 +1,374 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from collections import OrderedDict + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.gmp.requests.v224 import HostsOrdering + + +class GmpCreateTaskTestMixin: + def test_create_task(self): + self.gmp.create_task( + name="foo", config_id="c1", target_id="t1", scanner_id="s1" + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"" + ) + + def test_create_task_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name=None, config_id="c1", target_id="t1", scanner_id="s1" + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="", config_id="c1", target_id="t1", scanner_id="s1" + ) + + def test_create_task_missing_config_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", config_id=None, target_id="t1", scanner_id="s1" + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", config_id="", target_id="t1", scanner_id="s1" + ) + + def test_create_task_missing_target_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", config_id="c1", target_id=None, scanner_id="s1" + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", config_id="c1", target_id="", scanner_id="s1" + ) + + def test_create_task_missing_scanner_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", config_id="c1", target_id="t1", scanner_id=None + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", config_id="c1", target_id="t1", scanner_id="" + ) + + def test_create_task_with_comment(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + comment="bar", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"bar" + b"" + ) + + def test_create_task_single_alert(self): + # pylint: disable=invalid-name + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + alert_ids=["a1"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"" + ) + + def test_create_task_multiple_alerts(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + alert_ids=["a1", "a2", "a3"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b'' + b'' + b"" + ) + + def test_create_task_with_empty_alert_ids(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + alert_ids=[], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"" + ) + + def test_create_task_with_alterable(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + alterable=True, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"1" + b"" + ) + + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + alterable=False, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"0" + b"" + ) + + def test_create_task_with_hosts_ordering(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + hosts_ordering=HostsOrdering.REVERSE, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"reverse" + b"" + ) + + def test_create_task_invalid_hosts_ordering(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + hosts_ordering="foo", + ) + + def test_create_task_with_schedule(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + schedule_id="s1", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"" + ) + + def test_create_task_with_schedule_and_schedule_periods(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=0, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"0" + b"" + ) + + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=5, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"5" + b"" + ) + + def test_create_task_with_schedule_and_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + schedule_id="s1", + schedule_periods="foo", + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + schedule_id="s1", + schedule_periods=-1, + ) + + def test_create_task_with_observers(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + observers=["u1", "u2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"u1,u2" + b"" + ) + + def test_create_task_with_preferences(self): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="t1", + scanner_id="s1", + preferences=OrderedDict([("foo", "bar"), ("lorem", "ipsum")]), + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"" + b"" + b"foo" + b"bar" + b"" + b"" + b"lorem" + b"ipsum" + b"" + b"" + b"" + ) + + def test_create_task_don_t_allow_container_task(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id="0", + scanner_id="s1", + observers="", + ) + + # target_id=0 is considered as False + with self.assertRaises(RequiredArgument): + self.gmp.create_task( + name="foo", + config_id="c1", + target_id=0, + scanner_id="s1", + observers="", + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_delete_task.py b/tests/protocols/gmpnext/entities/tasks/test_delete_task.py new file mode 100644 index 00000000..ca495fb6 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_delete_task.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpDeleteTaskTestMixin: + def test_delete(self): + self.gmp.delete_task("a1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_delete_ultimate(self): + self.gmp.delete_task("a1", ultimate=True) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_missing_id(self): + with self.assertRaises(GvmError): + self.gmp.delete_task(None) + + with self.assertRaises(GvmError): + self.gmp.delete_task("") diff --git a/tests/protocols/gmpnext/entities/tasks/test_get_task.py b/tests/protocols/gmpnext/entities/tasks/test_get_task.py new file mode 100644 index 00000000..cd629b30 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_get_task.py @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpGetTaskTestMixin: + def test_get_task(self): + self.gmp.get_task("a1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_fail_without_task_id(self): + with self.assertRaises(GvmError): + self.gmp.get_task(None) + + with self.assertRaises(GvmError): + self.gmp.get_task("") diff --git a/tests/protocols/gmpnext/entities/tasks/test_get_tasks.py b/tests/protocols/gmpnext/entities/tasks/test_get_tasks.py new file mode 100644 index 00000000..10bcb689 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_get_tasks.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + + +class GmpGetTasksTestMixin: + def test_get_tasks_simple(self): + self.gmp.get_tasks() + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_with_filter_string(self): + self.gmp.get_tasks(filter_string="name=foo") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_with_filter_id(self): + self.gmp.get_tasks(filter_id="f1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_from_trash(self): + self.gmp.get_tasks(trash=True) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_with_details(self): + self.gmp.get_tasks(details=True) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_without_details(self): + self.gmp.get_tasks(details=False) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_with_schedules_only(self): + self.gmp.get_tasks(schedules_only=True) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_get_tasks_with_ignore_pagination(self): + self.gmp.get_tasks(ignore_pagination=True) + + self.connection.send.has_been_called_with( + b'' + ) + + self.gmp.get_tasks(ignore_pagination=False) + + self.connection.send.has_been_called_with( + b'' + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py new file mode 100644 index 00000000..b1281bd5 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py @@ -0,0 +1,194 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from collections import OrderedDict + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.gmp.requests.v224 import HostsOrdering + + +class GmpModifyTaskTestMixin: + def test_modify_task(self): + self.gmp.modify_task("t1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_missing_task_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.modify_task(None) + + with self.assertRaises(RequiredArgument): + self.gmp.modify_task("") + + with self.assertRaises(RequiredArgument): + self.gmp.modify_task(task_id="") + + def test_modify_task_with_name(self): + self.gmp.modify_task(task_id="t1", name="foo") + + self.connection.send.has_been_called_with( + b'foo' + ) + + def test_modify_task_with_config_id(self): + self.gmp.modify_task(task_id="t1", config_id="c1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_target_id(self): + self.gmp.modify_task(task_id="t1", target_id="t1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_scanner_id(self): + self.gmp.modify_task(task_id="t1", scanner_id="s1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_agent_group_id(self): + self.gmp.modify_task(task_id="t1", agent_group_id="ag1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_target_and_agent_group(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task( + task_id="t1", target_id="t1", agent_group_id="ag1" + ) + + def test_modify_task_with_schedule_id(self): + self.gmp.modify_task(task_id="t1", schedule_id="s1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_comment(self): + self.gmp.modify_task(task_id="t1", comment="bar") + + self.connection.send.has_been_called_with( + b'' + b"bar" + b"" + ) + + def test_modify_task_with_alerts_ids(self): + self.gmp.modify_task(task_id="t1", alert_ids=["a1", "a2", "a3"]) + + self.connection.send.has_been_called_with( + b'' + b'' + b'' + b'' + b"" + ) + + def test_modify_task_with_empty_alert_ids(self): + self.gmp.modify_task(task_id="t1", alert_ids=[]) + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_alterable(self): + self.gmp.modify_task(task_id="t1", alterable=True) + + self.connection.send.has_been_called_with( + b'' + b"1" + b"" + ) + + self.gmp.modify_task(task_id="t1", alterable=False) + + self.connection.send.has_been_called_with( + b'' + b"0" + b"" + ) + + def test_modify_task_with_hosts_ordering(self): + self.gmp.modify_task(task_id="t1", hosts_ordering=HostsOrdering.REVERSE) + + self.connection.send.has_been_called_with( + b'' + b"reverse" + b"" + ) + + def test_modify_task_invalid_hosts_ordering(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task(task_id="t1", hosts_ordering="foo") + + def test_modify_task_with_schedule(self): + self.gmp.modify_task(task_id="t1", schedule_id="s1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_modify_task_with_schedule_periods(self): + self.gmp.modify_task(task_id="t1", schedule_periods=0) + + self.connection.send.has_been_called_with( + b'' + b"0" + b"" + ) + + self.gmp.modify_task(task_id="t1", schedule_periods=5) + + self.connection.send.has_been_called_with( + b'' + b"5" + b"" + ) + + def test_modify_task_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task(task_id="t1", schedule_periods="foo") + + with self.assertRaises(InvalidArgument): + self.gmp.modify_task(task_id="t1", schedule_periods=-1) + + def test_modify_task_with_observers(self): + self.gmp.modify_task(task_id="t1", observers=["u1", "u2"]) + + self.connection.send.has_been_called_with( + b'' + b"u1,u2" + b"" + ) + + def test_modify_task_with_preferences(self): + self.gmp.modify_task( + task_id="t1", + preferences=OrderedDict([("foo", "bar"), ("lorem", "ipsum")]), + ) + + self.connection.send.has_been_called_with( + b'' + b"" + b"" + b"foo" + b"bar" + b"" + b"" + b"lorem" + b"ipsum" + b"" + b"" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_move_task.py b/tests/protocols/gmpnext/entities/tasks/test_move_task.py new file mode 100644 index 00000000..efc58af5 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_move_task.py @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpMoveTaskTestMixin: + def test_move_task(self): + self.gmp.move_task("a1") + + self.connection.send.has_been_called_with(b'') + + def test_move_task_to_slave(self): + self.gmp.move_task("a1", slave_id="s1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_missing_id(self): + with self.assertRaises(GvmError): + self.gmp.move_task(None) + + with self.assertRaises(GvmError): + self.gmp.move_task("") diff --git a/tests/protocols/gmpnext/entities/tasks/test_resume_task.py b/tests/protocols/gmpnext/entities/tasks/test_resume_task.py new file mode 100644 index 00000000..6d72c26d --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_resume_task.py @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpResumeTaskTestMixin: + def test_resume_task(self): + self.gmp.resume_task("a1") + + self.connection.send.has_been_called_with( + b'' + ) + + def test_missing_id(self): + with self.assertRaises(GvmError): + self.gmp.resume_task(None) + + with self.assertRaises(GvmError): + self.gmp.resume_task("") diff --git a/tests/protocols/gmpnext/entities/tasks/test_start_task.py b/tests/protocols/gmpnext/entities/tasks/test_start_task.py new file mode 100644 index 00000000..8a5fc9cd --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_start_task.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpStartTaskTestMixin: + def test_start_task(self): + self.gmp.start_task("a1") + + self.connection.send.has_been_called_with(b'') + + def test_missing_id(self): + with self.assertRaises(GvmError): + self.gmp.start_task(None) + + with self.assertRaises(GvmError): + self.gmp.start_task("") diff --git a/tests/protocols/gmpnext/entities/tasks/test_stop_task.py b/tests/protocols/gmpnext/entities/tasks/test_stop_task.py new file mode 100644 index 00000000..e10681a9 --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_stop_task.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: 2018-2025 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from gvm.errors import GvmError + + +class GmpStopTaskTestMixin: + def test_stop_task(self): + self.gmp.stop_task("a1") + + self.connection.send.has_been_called_with(b'') + + def test_missing_id(self): + with self.assertRaises(GvmError): + self.gmp.stop_task(None) + + with self.assertRaises(GvmError): + self.gmp.stop_task("") diff --git a/tests/protocols/gmpnext/entities/test_tasks.py b/tests/protocols/gmpnext/entities/test_tasks.py index 0ad450b2..386e816a 100644 --- a/tests/protocols/gmpnext/entities/test_tasks.py +++ b/tests/protocols/gmpnext/entities/test_tasks.py @@ -3,8 +3,10 @@ # SPDX-License-Identifier: GPL-3.0-or-later # -from ...gmpv224.entities.tasks import ( +from ...gmpnext import GMPTestCase +from ...gmpnext.entities.tasks import ( GmpCloneTaskTestMixin, + GmpCreateAgentGroupTaskTestMixin, GmpCreateContainerTaskTestMixin, GmpCreateTaskTestMixin, GmpDeleteTaskTestMixin, @@ -16,13 +18,18 @@ GmpStartTaskTestMixin, GmpStopTaskTestMixin, ) -from ...gmpv227 import GMPTestCase class GMPCloneTaskTestCase(GmpCloneTaskTestMixin, GMPTestCase): pass +class GmpCreateAgentGroupTaskTestCase( + GmpCreateAgentGroupTaskTestMixin, GMPTestCase +): + pass + + class GMPCreateContainerTaskTestCase( GmpCreateContainerTaskTestMixin, GMPTestCase ):