Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 288 additions & 2 deletions gvm/protocols/gmp/_gmpnext.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Optional
from typing import Mapping, Optional, Sequence

from gvm.protocols.gmp.requests import EntityID

from ...utils import SupportsStr
from .._protocol import T
from ._gmp227 import GMPv227
from .requests.next import AgentGroups, AgentInstallers, Agents, OCIImageTargets
from .requests.next import (
AgentGroups,
AgentInstallers,
Agents,
OCIImageTargets,
Tasks,
)
from .requests.v224 import HostsOrdering


class GMPNext(GMPv227[T]):
Expand Down Expand Up @@ -392,3 +400,281 @@ def get_oci_image_targets(
tasks=tasks,
)
)

def clone_task(self, task_id: EntityID) -> T:
"""Clone an existing task

Args:
task_id: UUID of existing task to clone from
"""
return self._send_request_and_transform_response(
Tasks.clone_task(task_id)
)

def create_agent_group_task(
self,
name: str,
agent_group_id: EntityID,
scanner_id: EntityID,
*,
comment: Optional[str] = None,
alterable: Optional[bool] = None,
schedule_id: Optional[EntityID] = None,
alert_ids: Optional[Sequence[EntityID]] = None,
schedule_periods: Optional[int] = None,
observers: Optional[Sequence[str]] = None,
preferences: Optional[Mapping[str, SupportsStr]] = None,
) -> T:
"""Create a new scan task using an agent group.

Args:
name: Name of the new task.
agent_group_id: UUID of the agent group to be scanned.
scanner_id: UUID of scanner to use for scanning the agents.
comment: Optional comment for the task.
alterable: Whether the task should be alterable.
alert_ids: List of UUIDs for alerts to be applied to the task.
schedule_id: UUID of a schedule when the task should be run.
schedule_periods: Limit to number of scheduled runs, 0 for unlimited.
observers: List of usernames or IDs allowed to observe the task.
preferences: Scanner preferences as name/value pairs.
"""
return self._send_request_and_transform_response(
Tasks.create_agent_group_task(
name=name,
agent_group_id=agent_group_id,
scanner_id=scanner_id,
comment=comment,
alterable=alterable,
schedule_id=schedule_id,
alert_ids=alert_ids,
schedule_periods=schedule_periods,
observers=observers,
preferences=preferences,
)
)

def create_container_task(
self, name: str, *, comment: Optional[str] = None
) -> T:
"""Create a new container task

A container task is a "meta" task to import and view reports from other
systems.

Args:
name: Name of the task
comment: Comment for the task
"""
return self._send_request_and_transform_response(
Tasks.create_container_task(name=name, comment=comment)
)

def create_task(
self,
name: str,
config_id: EntityID,
target_id: EntityID,
scanner_id: EntityID,
*,
alterable: Optional[bool] = None,
hosts_ordering: Optional[HostsOrdering] = None,
schedule_id: Optional[EntityID] = None,
alert_ids: Optional[Sequence[EntityID]] = None,
comment: Optional[str] = None,
schedule_periods: Optional[int] = None,
observers: Optional[Sequence[str]] = None,
preferences: Optional[Mapping[str, SupportsStr]] = None,
) -> T:
"""Create a new scan task

Args:
name: Name of the new task
config_id: UUID of config to use by the task
target_id: UUID of target to be scanned
scanner_id: UUID of scanner to use for scanning the target
comment: Comment for the task
alterable: Whether the task should be alterable
alert_ids: List of UUIDs for alerts to be applied to the task
hosts_ordering: The order hosts are scanned in
schedule_id: UUID of a schedule when the task should be run.
schedule_periods: A limit to the number of times the task will be
scheduled, or 0 for no limit
observers: List of names or ids of users which should be allowed to
observe this task
preferences: Name/Value pairs of scanner preferences.
"""
return self._send_request_and_transform_response(
Tasks.create_task(
name=name,
config_id=config_id,
target_id=target_id,
scanner_id=scanner_id,
alterable=alterable,
hosts_ordering=hosts_ordering,
schedule_id=schedule_id,
alert_ids=alert_ids,
comment=comment,
schedule_periods=schedule_periods,
observers=observers,
preferences=preferences,
)
)

def delete_task(
self, task_id: EntityID, *, ultimate: Optional[bool] = False
) -> T:
"""Deletes an existing task

Args:
task_id: UUID of the task to be deleted.
ultimate: Whether to remove entirely, or to the trashcan.
"""
return self._send_request_and_transform_response(
Tasks.delete_task(task_id=task_id, ultimate=ultimate)
)

def get_tasks(
self,
*,
filter_string: Optional[str] = None,
filter_id: Optional[EntityID] = None,
trash: Optional[bool] = None,
details: Optional[bool] = None,
schedules_only: Optional[bool] = None,
ignore_pagination: Optional[bool] = None,
) -> T:
"""Request a list of tasks

Args:
filter_string: Filter term to use for the query
filter_id: UUID of an existing filter to use for the query
trash: Whether to get the trashcan tasks instead
details: Whether to include full task details
schedules_only: Whether to only include id, name and schedule
details
ignore_pagination: Whether to ignore pagination settings (filter
terms "first" and "rows"). Default is False.
"""
return self._send_request_and_transform_response(
Tasks.get_tasks(
filter_string=filter_string,
filter_id=filter_id,
trash=trash,
details=details,
schedules_only=schedules_only,
ignore_pagination=ignore_pagination,
)
)

def get_task(self, task_id: EntityID) -> T:
"""Request a single task

Args:
task_id: UUID of an existing task
"""
return self._send_request_and_transform_response(
Tasks.get_task(task_id=task_id)
)

def modify_task(
self,
task_id: EntityID,
*,
name: Optional[str] = None,
config_id: Optional[EntityID] = None,
target_id: Optional[EntityID] = None,
scanner_id: Optional[EntityID] = None,
agent_group_id: Optional[EntityID] = None,
alterable: Optional[bool] = None,
hosts_ordering: Optional[HostsOrdering] = None,
schedule_id: Optional[EntityID] = None,
schedule_periods: Optional[int] = None,
comment: Optional[str] = None,
alert_ids: Optional[Sequence[EntityID]] = None,
observers: Optional[Sequence[str]] = None,
preferences: Optional[Mapping[str, SupportsStr]] = None,
) -> T:
"""Modifies an existing task.

Args:
task_id: UUID of task to modify.
name: The name of the task.
config_id: UUID of scan config to use by the task
target_id: UUID of target to be scanned
scanner_id: UUID of scanner to use for scanning the target
agent_group_id: UUID of agent group to use for scanning
comment: The comment on the task.
alert_ids: List of UUIDs for alerts to be applied to the task
hosts_ordering: The order hosts are scanned in
schedule_id: UUID of a schedule when the task should be run.
schedule_periods: A limit to the number of times the task will be
scheduled, or 0 for no limit.
observers: List of names or ids of users which should be allowed to
observe this task
preferences: Name/Value pairs of scanner preferences.
"""
return self._send_request_and_transform_response(
Tasks.modify_task(
task_id=task_id,
name=name,
config_id=config_id,
target_id=target_id,
scanner_id=scanner_id,
agent_group_id=agent_group_id,
alterable=alterable,
hosts_ordering=hosts_ordering,
schedule_id=schedule_id,
alert_ids=alert_ids,
comment=comment,
schedule_periods=schedule_periods,
observers=observers,
preferences=preferences,
)
)

def move_task(
self, task_id: EntityID, *, slave_id: Optional[EntityID] = None
) -> T:
"""Move an existing task to another GMP slave scanner or the master

Args:
task_id: UUID of the task to be moved
slave_id: UUID of the sensor to reassign the task to, empty for master.
"""
return self._send_request_and_transform_response(
Tasks.move_task(
task_id=task_id,
slave_id=slave_id,
)
)

def start_task(self, task_id: EntityID) -> T:
"""Start an existing task

Args:
task_id: UUID of the task to be started
"""
return self._send_request_and_transform_response(
Tasks.start_task(task_id=task_id)
)

def resume_task(self, task_id: EntityID) -> T:
"""Resume an existing stopped task

Args:
task_id: UUID of the task to be resumed
"""
return self._send_request_and_transform_response(
Tasks.resume_task(task_id=task_id)
)

def stop_task(self, task_id: EntityID) -> T:
"""Stop an existing running task

Args:
task_id: UUID of the task to be stopped
"""
return self._send_request_and_transform_response(
Tasks.stop_task(task_id=task_id)
)
2 changes: 1 addition & 1 deletion gvm/protocols/gmp/requests/next/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from gvm.protocols.gmp.requests.next._agent_installers import AgentInstallers
from gvm.protocols.gmp.requests.next._agents import Agents
from gvm.protocols.gmp.requests.next._oci_image_targets import OCIImageTargets
from gvm.protocols.gmp.requests.next._tasks import Tasks

from .._entity_id import EntityID
from .._version import Version
Expand Down Expand Up @@ -68,7 +69,6 @@
SystemReports,
Tags,
Targets,
Tasks,
Tickets,
TicketStatus,
TLSCertificates,
Expand Down
Loading
Loading