diff --git a/gvm/protocols/_protocol.py b/gvm/protocols/_protocol.py index 0dc4a4cf..310a7597 100644 --- a/gvm/protocols/_protocol.py +++ b/gvm/protocols/_protocol.py @@ -112,16 +112,22 @@ def send_command(self, cmd: str) -> str: string """ return bytes( - self._send_command(cmd.encode("utf-8", errors="ignore")) # type: ignore[arg-type] # it seems mypy on Python < 3.11 can't handle bytes here + self._send_request(cmd.encode("utf-8", errors="ignore")) # type: ignore[arg-type] # it seems mypy on Python < 3.11 can't handle bytes here ).decode("utf-8", errors="ignore") def _transform(self, response: Response) -> T: transform = self._transform_callable return transform(response) - def _send_command(self, cmd: Request) -> Response: + def _send_request(self, request: Request) -> Response: + """ + Send a request to the remote daemon and return the response + + Args: + request: The request to be send. + """ try: - send_data = self._protocol.send(cmd) + send_data = self._protocol.send(request) self._send(send_data) response: Optional[Response] = None while not response: @@ -132,5 +138,11 @@ def _send_command(self, cmd: Request) -> Response: self.disconnect() raise e - def _send_and_transform_command(self, cmd: Request) -> T: - return self._transform(self._send_command(cmd)) + def _send_request_and_transform_response(self, request: Request) -> T: + """ + Send a request and transform its response using the transform callable. + + Args: + request: The request to be send. + """ + return self._transform(self._send_request(request)) diff --git a/gvm/protocols/core/_connection.py b/gvm/protocols/core/_connection.py index 4c56cf87..8b679ace 100644 --- a/gvm/protocols/core/_connection.py +++ b/gvm/protocols/core/_connection.py @@ -184,7 +184,7 @@ def receive_data(self, data: bytes) -> Optional[Response]: Raises: An InvalidStateError if no data can be received currently. For - example if not request is send yet. + example if no request is send yet. """ return self._state.receive_data(data) diff --git a/gvm/protocols/gmp/_gmp.py b/gvm/protocols/gmp/_gmp.py index 5325d3be..6ad5fff4 100644 --- a/gvm/protocols/gmp/_gmp.py +++ b/gvm/protocols/gmp/_gmp.py @@ -71,7 +71,7 @@ def __init__( def determine_remote_gmp_version(self) -> str: """Determine the supported GMP version of the remote daemon""" self.connect() - resp = self._send_command(Version.get_version()) + resp = self._send_request(Version.get_version()) self.disconnect() version_el = resp.xml().find("version") diff --git a/gvm/protocols/gmp/_gmp224.py b/gvm/protocols/gmp/_gmp224.py index a09d7e89..77ac5366 100644 --- a/gvm/protocols/gmp/_gmp224.py +++ b/gvm/protocols/gmp/_gmp224.py @@ -169,7 +169,7 @@ def authenticate(self, username: str, password: str) -> T: username: Username password: Password """ - response = self._send_command( + response = self._send_request( Authentication.authenticate(username=username, password=password) ) @@ -184,7 +184,9 @@ def describe_auth(self) -> T: Returns a list of all used authentication methods if such a list is available. """ - return self._send_and_transform_command(Authentication.describe_auth()) + return self._send_request_and_transform_response( + Authentication.describe_auth() + ) def modify_auth( self, group_name: str, auth_conf_settings: dict[str, str] @@ -195,7 +197,7 @@ def modify_auth( group_name: Name of the group to be modified. auth_conf_settings: The new auth config. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Authentication.modify_auth(group_name, auth_conf_settings) ) @@ -203,7 +205,7 @@ def get_version(self) -> T: """Get the Greenbone Vulnerability Management Protocol (GMP) version used by the remote gvmd. """ - return self._send_and_transform_command(Version.get_version()) + return self._send_request_and_transform_response(Version.get_version()) def clone_port_list(self, port_list_id: EntityID) -> T: """Clone an existing port list @@ -211,7 +213,7 @@ def clone_port_list(self, port_list_id: EntityID) -> T: Args: port_list_id: UUID of an existing port list to clone from """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.clone_port_list(port_list_id) ) @@ -226,7 +228,7 @@ def create_port_list( 1 - 1234 comment: Comment for the port list """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.create_port_list(name, port_range, comment=comment) ) @@ -248,7 +250,7 @@ def create_port_range( port_range_type: The type of the ports: TCP, UDP, ... comment: Comment for the port range """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.create_port_range( port_list_id, start, end, port_range_type, comment=comment ) @@ -263,7 +265,7 @@ def delete_port_list( port_list_id: UUID of the port list to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.delete_port_list(port_list_id, ultimate=ultimate) ) @@ -273,7 +275,7 @@ def delete_port_range(self, port_range_id: EntityID) -> T: Args: port_range_id: UUID of the port range to be deleted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.delete_port_range(port_range_id) ) @@ -295,7 +297,7 @@ def get_port_lists( targets: Whether to include targets using this port list trash: Whether to get port lists in the trashcan instead """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.get_port_lists( filter_string=filter_string, filter_id=filter_id, @@ -311,7 +313,7 @@ def get_port_list(self, port_list_id: EntityID) -> T: Args: port_list_id: UUID of an existing port list """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.get_port_list(port_list_id) ) @@ -329,7 +331,7 @@ def modify_port_list( name: Name of port list. comment: Comment on port list. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( PortLists.modify_port_list(port_list_id, comment=comment, name=name) ) @@ -373,7 +375,7 @@ def get_aggregates( -1 for all mode: Special mode for aggregation """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Aggregates.get_aggregates( resource_type, filter_string=filter_string, @@ -392,7 +394,7 @@ def get_aggregates( def get_feeds(self) -> T: """Request the list of feeds""" - return self._send_and_transform_command(Feed.get_feeds()) + return self._send_request_and_transform_response(Feed.get_feeds()) def get_feed(self, feed_type: Union[FeedType, str]) -> T: """Request a single feed @@ -400,7 +402,9 @@ def get_feed(self, feed_type: Union[FeedType, str]) -> T: Args: feed_type: Type of single feed to get: NVT, CERT or SCAP """ - return self._send_and_transform_command(Feed.get_feed(feed_type)) + return self._send_request_and_transform_response( + Feed.get_feed(feed_type) + ) def help( self, @@ -415,7 +419,7 @@ def help( "html", "rnc", "text" or "xml brief: If True help is brief """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Help.help(help_format=help_format, brief=brief) ) @@ -442,7 +446,7 @@ def get_system_reports( brief: Whether to include the actual system reports slave_id: UUID of GMP scanner from which to get the system reports """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( SystemReports.get_system_reports( name=name, duration=duration, @@ -459,7 +463,9 @@ def empty_trashcan(self) -> T: Remove all entities from the trashcan. **Attention:** this command can not be reverted """ - return self._send_and_transform_command(TrashCan.empty_trashcan()) + return self._send_request_and_transform_response( + TrashCan.empty_trashcan() + ) def restore_from_trashcan(self, entity_id: EntityID) -> T: """Restore an entity from the trashcan @@ -467,7 +473,7 @@ def restore_from_trashcan(self, entity_id: EntityID) -> T: Args: entity_id: ID of the entity to be restored from the trashcan """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TrashCan.restore_from_trashcan(entity_id) ) @@ -477,7 +483,7 @@ def get_user_settings(self, *, filter_string: Optional[str] = None) -> T: Args: filter_string: Filter term to use for the query """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( UserSettings.get_user_settings(filter_string=filter_string) ) @@ -487,7 +493,7 @@ def get_user_setting(self, setting_id: EntityID) -> T: Args: setting_id: UUID of an existing setting """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( UserSettings.get_user_setting(setting_id) ) @@ -506,7 +512,7 @@ def modify_user_setting( passed. value: The value of the setting. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( UserSettings.modify_user_setting( setting_id=setting_id, name=name, value=value ) @@ -518,7 +524,7 @@ def clone_scan_config(self, config_id: EntityID) -> T: Args: config_id: UUID of the existing scan config """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.clone_scan_config(config_id) ) @@ -536,7 +542,7 @@ def create_scan_config( name: Name of the new scan config comment: A comment on the config """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.create_scan_config(config_id, name, comment=comment) ) @@ -549,7 +555,7 @@ def delete_scan_config( config_id: UUID of the config to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.delete_scan_config(config_id, ultimate=ultimate) ) @@ -578,7 +584,7 @@ def get_scan_configs( requested tasks: Whether to get tasks using this config """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.get_scan_configs( filter_string=filter_string, filter_id=filter_id, @@ -599,7 +605,7 @@ def get_scan_config( config_id: UUID of an existing scan config tasks: Whether to get tasks using this config """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.get_scan_config(config_id, tasks=tasks) ) @@ -621,7 +627,7 @@ def get_scan_config_preferences( nvt_oid: OID of nvt config_id: UUID of scan config of which to show preference values """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.get_scan_config_preferences( nvt_oid=nvt_oid, config_id=config_id ) @@ -641,7 +647,7 @@ def get_scan_config_preference( nvt_oid: OID of nvt config_id: UUID of scan config of which to show preference values """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.get_scan_config_preference( name, nvt_oid=nvt_oid, config_id=config_id ) @@ -654,7 +660,7 @@ def import_scan_config(self, config: str) -> T: config: Scan Config XML as string to import. This XML must contain a :code:`` root element. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.import_scan_config(config) ) @@ -675,7 +681,7 @@ def modify_scan_config_set_nvt_preference( value: New value for the preference. None to delete the preference and to use the default instead. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.modify_scan_config_set_nvt_preference( config_id, name, nvt_oid, value=value ) @@ -688,7 +694,7 @@ def modify_scan_config_set_name(self, config_id: EntityID, name: str) -> T: config_id: UUID of scan config to modify. name: New name for the config. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.modify_scan_config_set_name(config_id, name) ) @@ -703,7 +709,7 @@ def modify_scan_config_set_comment( empty comment and the previous comment will be removed. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.modify_scan_config_set_comment( config_id, comment=comment ) @@ -724,7 +730,7 @@ def modify_scan_config_set_scanner_preference( value: New value for the preference. None to delete the preference and to use the default instead. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.modify_scan_config_set_scanner_preference( config_id, name, value=value ) @@ -746,7 +752,7 @@ def modify_scan_config_set_nvt_selection( family: Name of the NVT family to include NVTs from nvt_oids: List of NVTs to select for the family. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.modify_scan_config_set_nvt_selection( config_id, family, nvt_oids ) @@ -771,7 +777,7 @@ def modify_scan_config_set_family_selection( auto_add_new_families: Whether new families should be added to the scan config automatically. Default: True. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ScanConfigs.modify_scan_config_set_family_selection( config_id, families, auto_add_new_families=auto_add_new_families ) @@ -800,7 +806,7 @@ def create_scanner( ca_pub: Certificate of CA to verify scanner certificate comment: Comment for the scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.create_scanner( name, host, @@ -837,7 +843,7 @@ def modify_scanner( ca_pub: New certificate of CA to verify scanner certificate comment: New comment for the scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.modify_scanner( scanner_id, name=name, @@ -867,7 +873,7 @@ def get_scanners( details: Whether to include extra details like tasks using this scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.get_scanners( filter_string=filter_string, filter_id=filter_id, @@ -882,7 +888,7 @@ def get_scanner(self, scanner_id: EntityID) -> T: Args: scanner_id: UUID of an existing scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.get_scanner(scanner_id) ) @@ -892,7 +898,7 @@ def verify_scanner(self, scanner_id: EntityID) -> T: Args: scanner_id: UUID of an existing scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.verify_scanner(scanner_id) ) @@ -902,7 +908,7 @@ def clone_scanner(self, scanner_id: EntityID) -> T: Args: scanner_id: UUID of an existing scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.clone_scanner(scanner_id) ) @@ -914,7 +920,7 @@ def delete_scanner( Args: scanner_id: UUID of an existing scanner """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Scanners.delete_scanner(scanner_id, ultimate=ultimate) ) @@ -937,7 +943,7 @@ def create_user( deny access. Default is False for deny hosts. role_ids: A list of role UUIDs for the user """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Users.create_user( name, password=password, @@ -982,7 +988,7 @@ def modify_user( provided by the hosts parameter. group_ids: List of group UUIDs for the user. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Users.modify_user( user_id, name=name, @@ -1002,7 +1008,9 @@ def clone_user(self, user_id: EntityID) -> T: Args: user_id: UUID of the user to be cloned. """ - return self._send_and_transform_command(Users.clone_user(user_id)) + return self._send_request_and_transform_response( + Users.clone_user(user_id) + ) def delete_user( self, @@ -1023,7 +1031,7 @@ def delete_user( inheritor_name. inheritor_name: The name of the inheriting user. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Users.delete_user( user_id=user_id, name=name, @@ -1044,7 +1052,7 @@ def get_users( filter_string: Filter term to use for the query filter_id: UUID of an existing filter to use for the query """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Users.get_users(filter_string=filter_string, filter_id=filter_id) ) @@ -1054,7 +1062,9 @@ def get_user(self, user_id: EntityID) -> T: Args: user_id: UUID of the user to be requested. """ - return self._send_and_transform_command(Users.get_user(user_id)) + return self._send_request_and_transform_response( + Users.get_user(user_id) + ) def create_note( self, @@ -1082,7 +1092,7 @@ def create_note( severity: Severity to which note applies task_id: UUID of task to which note applies """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Notes.create_note( text, nvt_oid, @@ -1120,7 +1130,7 @@ def modify_note( severity: Severity to which note applies task_id: UUID of task to which note applies """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Notes.modify_note( note_id, text, @@ -1139,7 +1149,9 @@ def clone_note(self, note_id: EntityID) -> T: Args: note_id: UUID of an existing note to clone from """ - return self._send_and_transform_command(Notes.clone_note(note_id)) + return self._send_request_and_transform_response( + Notes.clone_note(note_id) + ) def delete_note( self, note_id: EntityID, *, ultimate: Optional[bool] = False @@ -1150,7 +1162,7 @@ def delete_note( note_id: UUID of the note to be deleted. ultimate: Whether to remove entirely or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Notes.delete_note(note_id, ultimate=ultimate) ) @@ -1171,7 +1183,7 @@ def get_notes( details: Add info about connected results and tasks result: Return the details of possible connected results. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Notes.get_notes( filter_string=filter_string, filter_id=filter_id, @@ -1186,7 +1198,9 @@ def get_note(self, note_id: EntityID) -> T: Arguments: note_id: UUID of an existing note """ - return self._send_and_transform_command(Notes.get_note(note_id)) + return self._send_request_and_transform_response( + Notes.get_note(note_id) + ) def create_override( self, @@ -1215,7 +1229,7 @@ def create_override( new_severity: New severity for result task_id: UUID of task to which override applies """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Overrides.create_override( text, nvt_oid, @@ -1257,7 +1271,7 @@ def modify_override( new_severity: New severity score for result. task_id: Task to which override applies. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Overrides.modify_override( override_id, text, @@ -1277,7 +1291,7 @@ def clone_override(self, override_id: EntityID) -> T: Args: override_id: UUID of an existing override to clone from """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Overrides.clone_override(override_id) ) @@ -1290,7 +1304,7 @@ def delete_override( override_id: UUID of an existing override to delete ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Overrides.delete_override(override_id, ultimate=ultimate) ) @@ -1310,7 +1324,7 @@ def get_overrides( details: Whether to include full details result: Whether to include results using the override """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Overrides.get_overrides( filter_string=filter_string, filter_id=filter_id, @@ -1325,7 +1339,7 @@ def get_override(self, override_id: EntityID) -> T: Args: override_id: UUID of an existing override """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Overrides.get_override(override_id) ) @@ -1371,7 +1385,7 @@ def create_target( port_range: Port range for the target port_list_id: UUID of the port list to use on target """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Targets.create_target( name, asset_hosts_filter=asset_hosts_filter, @@ -1432,7 +1446,7 @@ def modify_target( reverse_lookup_unify: Whether to scan only one IP when multiple IPs have the same name. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Targets.modify_target( target_id, name=name, @@ -1458,7 +1472,9 @@ def clone_target(self, target_id: EntityID) -> T: Args: target_id: UUID of an existing target to clone. """ - return self._send_and_transform_command(Targets.clone_target(target_id)) + return self._send_request_and_transform_response( + Targets.clone_target(target_id) + ) def delete_target( self, target_id: EntityID, *, ultimate: Optional[bool] = False @@ -1469,7 +1485,7 @@ def delete_target( target_id: UUID of an existing target to delete. ultimate: Whether to remove entirely or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Targets.delete_target(target_id, ultimate=ultimate) ) @@ -1482,7 +1498,7 @@ def get_target( target_id: UUID of the target to request. tasks: Whether to include list of tasks that use the target """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Targets.get_target(target_id, tasks=tasks) ) @@ -1502,7 +1518,7 @@ def get_targets( trash: Whether to include targets in the trashcan. tasks: Whether to include list of tasks that use the target. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Targets.get_targets( filter_string=filter_string, filter_id=filter_id, @@ -1547,7 +1563,7 @@ def create_alert( filter_id: Filter to apply when executing alert comment: Comment for the alert """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Alerts.create_alert( name, condition, @@ -1599,7 +1615,7 @@ def modify_alert( filter_id: Filter to apply when executing alert comment: Comment for the alert """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Alerts.modify_alert( alert_id, name=name, @@ -1620,7 +1636,9 @@ def clone_alert(self, alert_id: EntityID) -> T: Args: alert_id: UUID of the alert to clone from """ - return self._send_and_transform_command(Alerts.clone_alert(alert_id)) + return self._send_request_and_transform_response( + Alerts.clone_alert(alert_id) + ) def delete_alert( self, alert_id: EntityID, *, ultimate: Optional[bool] = False @@ -1631,7 +1649,7 @@ def delete_alert( alert_id: UUID of the alert to delete ultimate: Whether to remove entirely or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Alerts.delete_alert(alert_id, ultimate=ultimate) ) @@ -1643,7 +1661,9 @@ def test_alert(self, alert_id: EntityID) -> T: Args: alert_id: UUID of the alert to be tested """ - return self._send_and_transform_command(Alerts.test_alert(alert_id)) + return self._send_request_and_transform_response( + Alerts.test_alert(alert_id) + ) def trigger_alert( self, @@ -1668,7 +1688,7 @@ def trigger_alert( report_format_id: UUID of report format to use or ReportFormatType (enum) delta_report_id: UUID of an existing report to compare report to. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Alerts.trigger_alert( alert_id, report_id, @@ -1695,7 +1715,7 @@ def get_alerts( trash: True to request the alerts in the trashcan tasks: Whether to include the tasks using the alerts """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Alerts.get_alerts( filter_string=filter_string, filter_id=filter_id, @@ -1713,7 +1733,7 @@ def get_alert( alert_id: UUID of an existing alert tasks: Whether to include the tasks using the alert """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Alerts.get_alert(alert_id, tasks=tasks) ) @@ -1751,7 +1771,7 @@ def create_audit( observe this audit preferences: Name/Value pairs of scanner preferences. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Audits.create_audit( name, policy_id, @@ -1803,7 +1823,7 @@ def modify_audit( observe this audit preferences: Name/Value pairs of scanner preferences. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Audits.modify_audit( audit_id, name=name, @@ -1827,7 +1847,9 @@ def clone_audit(self, audit_id: EntityID) -> T: Args: audit_id: UUID of the audit to clone """ - return self._send_and_transform_command(Audits.clone_audit(audit_id)) + return self._send_request_and_transform_response( + Audits.clone_audit(audit_id) + ) def delete_audit( self, audit_id: EntityID, *, ultimate: Optional[bool] = False @@ -1838,7 +1860,7 @@ def delete_audit( audit_id: UUID of the audit to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Audits.delete_audit(audit_id, ultimate=ultimate) ) @@ -1861,7 +1883,7 @@ def get_audits( schedules_only: Whether to only include id, name and schedule details """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Audits.get_audits( filter_string=filter_string, filter_id=filter_id, @@ -1877,7 +1899,9 @@ def get_audit(self, audit_id: EntityID) -> T: Args: audit_id: UUID of an existing audit """ - return self._send_and_transform_command(Audits.get_audit(audit_id)) + return self._send_request_and_transform_response( + Audits.get_audit(audit_id) + ) def resume_audit(self, audit_id: EntityID) -> T: """Resume an existing stopped audit @@ -1885,7 +1909,9 @@ def resume_audit(self, audit_id: EntityID) -> T: Args: audit_id: UUID of the audit to be resumed """ - return self._send_and_transform_command(Audits.resume_audit(audit_id)) + return self._send_request_and_transform_response( + Audits.resume_audit(audit_id) + ) def start_audit(self, audit_id: EntityID) -> T: """Start an existing audit @@ -1893,7 +1919,9 @@ def start_audit(self, audit_id: EntityID) -> T: Args: audit_id: UUID of the audit to be started """ - return self._send_and_transform_command(Audits.start_audit(audit_id)) + return self._send_request_and_transform_response( + Audits.start_audit(audit_id) + ) def stop_audit(self, audit_id: EntityID) -> T: """Stop an existing running audit @@ -1901,7 +1929,9 @@ def stop_audit(self, audit_id: EntityID) -> T: Args: audit_id: UUID of the audit to be stopped """ - return self._send_and_transform_command(Audits.stop_audit(audit_id)) + return self._send_request_and_transform_response( + Audits.stop_audit(audit_id) + ) def clone_credential(self, credential_id: EntityID) -> T: """Clone an existing credential @@ -1909,7 +1939,7 @@ def clone_credential(self, credential_id: EntityID) -> T: Args: credential_id: UUID of the credential to clone """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Credentials.clone_credential(credential_id) ) @@ -2040,7 +2070,7 @@ def create_credential( password='foo', ) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Credentials.create_credential( name, credential_type, @@ -2068,7 +2098,7 @@ def delete_credential( credential_id: UUID of the credential to delete ultimate: Whether to remove entirely or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Credentials.delete_credential(credential_id, ultimate=ultimate) ) @@ -2091,7 +2121,7 @@ def get_credentials( trash: Whether to get the trashcan credentials instead targets: Whether to include a list of targets using the credentials """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Credentials.get_credentials( filter_string=filter_string, filter_id=filter_id, @@ -2118,7 +2148,7 @@ def get_credential( targets: Whether to include a list of targets using the credentials credential_format: One of "key", "rpm", "deb", "exe" or "pem" """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Credentials.get_credential( credential_id, scanners=scanners, @@ -2163,7 +2193,7 @@ def modify_credential( privacy_password: The SNMP privacy password public_key: PGP public key in *armor* plain text format """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Credentials.modify_credential( credential_id, name=name, @@ -2188,7 +2218,9 @@ def clone_filter(self, filter_id: EntityID) -> T: Args: filter_id: ID of the filter to clone """ - return self._send_and_transform_command(Filters.clone_filter(filter_id)) + return self._send_request_and_transform_response( + Filters.clone_filter(filter_id) + ) def create_filter( self, @@ -2206,7 +2238,7 @@ def create_filter( comment: Comment for the filter term: Filter term e.g. 'name=foo' """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.create_filter( name, filter_type=filter_type, comment=comment, term=term ) @@ -2221,7 +2253,7 @@ def delete_filter( filter_id: UUID of the filter to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.delete_filter(filter_id, ultimate=ultimate) ) @@ -2241,7 +2273,7 @@ def get_filters( trash: Whether to get the trashcan filters instead alerts: Whether to include list of alerts that use the filter. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.get_filters( filter_string=filter_string, filter_id=filter_id, @@ -2259,7 +2291,7 @@ def get_filter( filter_id: UUID of an existing filter alerts: Whether to include list of alerts that use the filter. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.get_filter(filter_id, alerts=alerts) ) @@ -2281,7 +2313,7 @@ def modify_filter( term: Filter term. filter_type: Resource type filter applies to. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.modify_filter( filter_id, comment=comment, @@ -2297,7 +2329,9 @@ def clone_group(self, group_id: EntityID) -> T: Args: group_id: UUID of an existing group to clone from """ - return self._send_and_transform_command(Groups.clone_group(group_id)) + return self._send_request_and_transform_response( + Groups.clone_group(group_id) + ) def create_group( self, @@ -2316,7 +2350,7 @@ def create_group( other's entities users: List of user names to be in the group """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Groups.create_group( name, comment=comment, special=special, users=users ) @@ -2331,7 +2365,7 @@ def delete_group( group_id: UUID of the group to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Groups.delete_group(group_id, ultimate=ultimate) ) @@ -2349,7 +2383,7 @@ def get_groups( filter_id: UUID of an existing filter to use for the query trash: Whether to get the trashcan groups instead """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Groups.get_groups( filter_string=filter_string, filter_id=filter_id, trash=trash ) @@ -2361,7 +2395,9 @@ def get_group(self, group_id: EntityID) -> T: Args: group_id: UUID of an existing group """ - return self._send_and_transform_command(Groups.get_group(group_id)) + return self._send_request_and_transform_response( + Groups.get_group(group_id) + ) def modify_group( self, @@ -2379,7 +2415,7 @@ def modify_group( name: Name of group. users: List of user names to be in the group """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Groups.modify_group( group_id, comment=comment, name=name, users=users ) @@ -2392,7 +2428,7 @@ def create_host(self, name: str, *, comment: Optional[str] = None) -> T: name: Name for the new host host comment: Comment for the new host host """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Hosts.create_host(name, comment=comment) ) @@ -2402,7 +2438,9 @@ def delete_host(self, host_id: EntityID) -> T: Args: host_id: UUID of the single host to delete. """ - return self._send_and_transform_command(Hosts.delete_host(host_id)) + return self._send_request_and_transform_response( + Hosts.delete_host(host_id) + ) def get_hosts( self, @@ -2418,7 +2456,7 @@ def get_hosts( filter_id: UUID of an existing filter to use for the query details: Whether to include additional information (e.g. tags) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Hosts.get_hosts( filter_string=filter_string, filter_id=filter_id, @@ -2435,7 +2473,7 @@ def get_host( host_id: UUID of an existing host details: Whether to include additional information (e.g. tags) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Hosts.get_host(host_id, details=details) ) @@ -2449,7 +2487,7 @@ def modify_host( comment: Comment for the host. Not passing a comment arguments clears the comment for this host. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Hosts.modify_host(host_id, comment=comment) ) @@ -2462,7 +2500,7 @@ def delete_operating_system( Args: operating_system_id: UUID of the single operating_system to delete. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( OperatingSystems.delete_operating_system(operating_system_id) ) @@ -2480,7 +2518,7 @@ def get_operating_systems( filter_id: UUID of an existing filter to use for the query details: Whether to include additional information (e.g. tags) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( OperatingSystems.get_operating_systems( filter_string=filter_string, filter_id=filter_id, @@ -2497,7 +2535,7 @@ def get_operating_system( operating_system_id: UUID of an existing operating_system details: Whether to include additional information (e.g. tags) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( OperatingSystems.get_operating_system( operating_system_id, details=details ) @@ -2513,7 +2551,7 @@ def modify_operating_system( comment: Comment for the operating_system. Not passing a comment arguments clears the comment for this operating system. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( OperatingSystems.modify_operating_system( operating_system_id, comment=comment ) @@ -2525,7 +2563,7 @@ def clone_permission(self, permission_id: EntityID) -> T: Args: permission_id: UUID of an existing permission to clone from """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Permissions.clone_permission(permission_id) ) @@ -2550,7 +2588,7 @@ def create_permission( resource_type: Type of the resource. For Super permissions user, group or role """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Permissions.create_permission( name, subject_id, @@ -2570,7 +2608,7 @@ def delete_permission( permission_id: UUID of the permission to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Permissions.delete_permission(permission_id, ultimate=ultimate) ) @@ -2588,7 +2626,7 @@ def get_permissions( filter_id: UUID of an existing filter to use for the query trash: Whether to get permissions in the trashcan instead """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Permissions.get_permissions( filter_string=filter_string, filter_id=filter_id, trash=trash ) @@ -2600,7 +2638,7 @@ def get_permission(self, permission_id: EntityID) -> T: Args: permission_id: UUID of an existing permission """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Permissions.get_permission(permission_id) ) @@ -2627,7 +2665,7 @@ def modify_permission( resource_type: Type of the resource. For Super permissions user, group or role """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Permissions.modify_permission( permission_id, comment=comment, @@ -2645,7 +2683,7 @@ def clone_policy(self, policy_id: EntityID) -> T: Args: policy_id: UUID of the existing policy """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.clone_policy(policy_id) ) @@ -2664,7 +2702,7 @@ def create_policy( policy is used. comment: A comment on the policy """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.create_policy(name, policy_id=policy_id, comment=comment) ) @@ -2677,7 +2715,7 @@ def delete_policy( policy_id: UUID of the policy to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.delete_policy(policy_id, ultimate=ultimate) ) @@ -2706,7 +2744,7 @@ def get_policies( requested trash: Whether to get the trashcan audits instead """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.get_policies( audits=audits, filter_string=filter_string, @@ -2727,7 +2765,7 @@ def get_policy( policy_id: UUID of an existing policy audits: Whether to get audits using this policy """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.get_policy(policy_id, audits=audits) ) @@ -2738,7 +2776,9 @@ def import_policy(self, policy: str) -> T: policy: Policy XML as string to import. This XML must contain a :code:`` root element. """ - return self._send_and_transform_command(Policies.import_policy(policy)) + return self._send_request_and_transform_response( + Policies.import_policy(policy) + ) def modify_policy_set_nvt_preference( self, @@ -2757,7 +2797,7 @@ def modify_policy_set_nvt_preference( value: New value for the preference. None to delete the preference and to use the default instead. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.modify_policy_set_nvt_preference( policy_id, name, nvt_oid, value=value ) @@ -2770,7 +2810,7 @@ def modify_policy_set_name(self, policy_id: EntityID, name: str) -> T: policy_id: UUID of policy to modify. name: New name for the policy. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.modify_policy_set_name(policy_id, name) ) @@ -2785,7 +2825,7 @@ def modify_policy_set_comment( empty comment and the previous comment will be removed. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.modify_policy_set_comment(policy_id, comment=comment) ) @@ -2800,7 +2840,7 @@ def modify_policy_set_scanner_preference( value: New value for the preference. None to delete the preference and to use the default instead. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.modify_policy_set_scanner_preference( policy_id, name, value=value ) @@ -2819,7 +2859,7 @@ def modify_policy_set_nvt_selection( family: Name of the NVT family to include NVTs from nvt_oids: List of NVTs to select for the family. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.modify_policy_set_nvt_selection( policy_id, family, nvt_oids ) @@ -2845,7 +2885,7 @@ def modify_policy_set_family_selection( auto_add_new_families: Whether new families should be added to the policy automatically. Default: True. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Policies.modify_policy_set_family_selection( policy_id, families, auto_add_new_families=auto_add_new_families ) @@ -2857,7 +2897,7 @@ def delete_report(self, report_id: EntityID) -> T: Args: report_id: UUID of the report to be deleted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.delete_report(report_id) ) @@ -2886,7 +2926,7 @@ def get_report( details: Request additional report information details defaults to True """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.get_report( report_id, filter_string=filter_string, @@ -2920,7 +2960,7 @@ def get_reports( "rows". details: Whether to exclude results """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.get_reports( filter_string=filter_string, filter_id=filter_id, @@ -2946,7 +2986,7 @@ def import_report( task_id: UUID of task to import report to in_asset: Whether to create or update assets using the report """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.import_report(report, task_id, in_assets=in_assets) ) @@ -2956,7 +2996,9 @@ def get_result(self, result_id: EntityID) -> T: Args: result_id: UUID of an existing result """ - return self._send_and_transform_command(Results.get_result(result_id)) + return self._send_request_and_transform_response( + Results.get_result(result_id) + ) def get_results( self, @@ -2979,7 +3021,7 @@ def get_results( override details details: Whether to include additional details of the results """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Results.get_results( filter_string=filter_string, filter_id=filter_id, @@ -2996,7 +3038,9 @@ def clone_role(self, role_id: EntityID) -> T: Args: role_id: UUID of an existing role to clone from """ - return self._send_and_transform_command(Roles.clone_role(role_id)) + return self._send_request_and_transform_response( + Roles.clone_role(role_id) + ) def create_role( self, @@ -3012,7 +3056,7 @@ def create_role( comment: Comment for the role users: List of user names to add to the role """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Roles.create_role(name, comment=comment, users=users) ) @@ -3025,7 +3069,7 @@ def delete_role( role_id: UUID of the role to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Roles.delete_role(role_id, ultimate=ultimate) ) @@ -3043,7 +3087,7 @@ def get_roles( filter_id: UUID of an existing filter to use for the query trash: Whether to get the trashcan roles instead """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Roles.get_roles( filter_string=filter_string, filter_id=filter_id, trash=trash ) @@ -3055,7 +3099,9 @@ def get_role(self, role_id: EntityID) -> T: Args: role_id: UUID of an existing role """ - return self._send_and_transform_command(Roles.get_role(role_id)) + return self._send_request_and_transform_response( + Roles.get_role(role_id) + ) def modify_role( self, @@ -3073,7 +3119,7 @@ def modify_role( name: Comment on role. users: List of user names. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Roles.modify_role(role_id, comment=comment, name=name, users=users) ) @@ -3083,7 +3129,7 @@ def clone_schedule(self, schedule_id: EntityID) -> T: Args: schedule_id: UUID of an existing schedule to clone from """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Schedules.clone_schedule(schedule_id) ) @@ -3138,7 +3184,7 @@ def create_schedule( .. _iCalendar: https://tools.ietf.org/html/rfc5545 """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Schedules.create_schedule( name, icalendar, timezone, comment=comment ) @@ -3153,7 +3199,7 @@ def delete_schedule( schedule_id: UUID of the schedule to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Schedules.delete_schedule(schedule_id, ultimate=ultimate) ) @@ -3173,7 +3219,7 @@ def get_schedules( trash: Whether to get the trashcan schedules instead tasks: Whether to include tasks using the schedules """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Schedules.get_schedules( filter_string=filter_string, filter_id=filter_id, @@ -3191,7 +3237,7 @@ def get_schedule( schedule_id: UUID of an existing schedule tasks: Whether to include tasks using the schedules """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Schedules.get_schedule(schedule_id, tasks=tasks) ) @@ -3220,7 +3266,7 @@ def modify_schedule( .. _iCalendar: https://tools.ietf.org/html/rfc5545 """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Schedules.modify_schedule( schedule_id, name=name, @@ -3236,7 +3282,7 @@ def get_nvt_families(self, *, sort_order: Optional[str] = None) -> T: Args: sort_order: Sort order """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_nvt_families(sort_order=sort_order) ) @@ -3267,7 +3313,7 @@ def get_scan_config_nvts( sort_order: Sort order sort_field: Sort field """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_scan_config_nvts( details=details, preferences=preferences, @@ -3287,7 +3333,7 @@ def get_scan_config_nvt(self, nvt_oid: str) -> T: Args: nvt_oid: OID of an existing nvt """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_scan_config_nvt(nvt_oid) ) @@ -3328,7 +3374,7 @@ def get_nvts( sort_order: Sort order (only for extended) sort_field: Sort field (only for extended) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_nvts( filter_string=filter_string, filter_id=filter_id, @@ -3354,7 +3400,7 @@ def get_nvt(self, nvt_id: str, *, extended: Optional[bool] = None) -> T: extended: Whether to receive extended NVT information (calls get_nvts, instead of get_info) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_nvt(nvt_id, extended=extended) ) @@ -3371,7 +3417,7 @@ def get_nvt_preferences( Args: nvt_oid: OID of nvt """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_nvt_preferences(nvt_oid=nvt_oid) ) @@ -3388,7 +3434,7 @@ def get_nvt_preference( nvt_oid: OID of nvt config_id: UUID of scan config of which to show preference values """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Nvts.get_nvt_preference(name, nvt_oid=nvt_oid) ) @@ -3400,7 +3446,7 @@ def get_info(self, info_id: EntityID, info_type: InfoType) -> T: info_type: Type must be either CERT_BUND_ADV, CPE, CVE, DFN_CERT_ADV, OVALDEF, NVT """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( SecInfo.get_info(info_id, info_type) ) @@ -3424,7 +3470,7 @@ def get_info_list( details: Whether to include information about references to this information """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( SecInfo.get_info_list( info_type, filter_string=filter_string, @@ -3451,7 +3497,7 @@ def get_cves( details: Whether to include information about references to this information """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Cves.get_cves( filter_string=filter_string, filter_id=filter_id, @@ -3466,7 +3512,7 @@ def get_cve(self, cve_id: str) -> T: Args: cve_id: ID of an existing CVE """ - return self._send_and_transform_command(Cves.get_cve(cve_id)) + return self._send_request_and_transform_response(Cves.get_cve(cve_id)) def get_cpes( self, @@ -3485,7 +3531,7 @@ def get_cpes( details: Whether to include information about references to this information """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Cpes.get_cpes( filter_string=filter_string, filter_id=filter_id, @@ -3500,7 +3546,7 @@ def get_cpe(self, cpe_id: str) -> T: Args: cpe_id: ID of an existing CPE """ - return self._send_and_transform_command(Cpes.get_cpe(cpe_id)) + return self._send_request_and_transform_response(Cpes.get_cpe(cpe_id)) def get_dfn_cert_advisories( self, @@ -3519,7 +3565,7 @@ def get_dfn_cert_advisories( details: Whether to include information about references to this information """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( DfnCertAdvisories.get_dfn_cert_advisories( filter_string=filter_string, filter_id=filter_id, @@ -3534,7 +3580,7 @@ def get_dfn_cert_advisory(self, cert_id: EntityID) -> T: Args: cert_id: ID of an existing DFN-CERT Advisory """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( DfnCertAdvisories.get_dfn_cert_advisory(cert_id) ) @@ -3555,7 +3601,7 @@ def get_cert_bund_advisories( details: Whether to include information about references to this information """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( CertBundAdvisories.get_cert_bund_advisories( filter_string=filter_string, filter_id=filter_id, @@ -3570,7 +3616,7 @@ def get_cert_bund_advisory(self, cert_id: EntityID) -> T: Args: cert_id: ID of an existing CERT-BUND Advisory """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( CertBundAdvisories.get_cert_bund_advisory(cert_id) ) @@ -3580,7 +3626,7 @@ def clone_tag(self, tag_id: EntityID) -> T: Args: tag_id: UUID of an existing tag to clone from """ - return self._send_and_transform_command(Tags.clone_tag(tag_id)) + return self._send_request_and_transform_response(Tags.clone_tag(tag_id)) def create_tag( self, @@ -3608,7 +3654,7 @@ def create_tag( comment: Comment for the tag. active: Whether the tag should be active. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tags.create_tag( name, resource_type, @@ -3629,7 +3675,7 @@ def delete_tag( tag_id: UUID of the tag to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tags.delete_tag(tag_id, ultimate=ultimate) ) @@ -3649,7 +3695,7 @@ def get_tags( trash: Whether to get tags from the trashcan instead names_only: Whether to get only distinct tag names """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tags.get_tags( filter_string=filter_string, filter_id=filter_id, @@ -3664,7 +3710,7 @@ def get_tag(self, tag_id: EntityID) -> T: Args: tag_id: UUID of an existing tag """ - return self._send_and_transform_command(Tags.get_tag(tag_id)) + return self._send_request_and_transform_response(Tags.get_tag(tag_id)) def modify_tag( self, @@ -3695,7 +3741,7 @@ def modify_tag( attached to. resource_ids: IDs of the resources to which to attach the tag. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tags.modify_tag( tag_id, comment=comment, @@ -3715,7 +3761,9 @@ def clone_task(self, task_id: EntityID) -> T: Args: task_id: UUID of existing task to clone from """ - return self._send_and_transform_command(Tasks.clone_task(task_id)) + return self._send_request_and_transform_response( + Tasks.clone_task(task_id) + ) def create_container_task( self, name: str, *, comment: Optional[str] = None @@ -3729,7 +3777,7 @@ def create_container_task( name: Name of the task comment: Comment for the task """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tasks.create_container_task(name, comment=comment) ) @@ -3767,7 +3815,7 @@ def create_task( observe this task preferences: Name/Value pairs of scanner preferences. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tasks.create_task( name, config_id, @@ -3793,7 +3841,7 @@ def delete_task( task_id: UUID of the task to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tasks.delete_task(task_id, ultimate=ultimate) ) @@ -3819,7 +3867,7 @@ def get_tasks( ignore_pagination: Whether to ignore pagination settings (filter terms "first" and "rows"). Default is False. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tasks.get_tasks( filter_string=filter_string, filter_id=filter_id, @@ -3836,7 +3884,9 @@ def get_task(self, task_id: EntityID) -> T: Args: task_id: UUID of an existing task """ - return self._send_and_transform_command(Tasks.get_task(task_id)) + return self._send_request_and_transform_response( + Tasks.get_task(task_id) + ) def modify_task( self, @@ -3873,7 +3923,7 @@ def modify_task( observe this task preferences: Name/Value pairs of scanner preferences. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tasks.modify_task( task_id, name=name, @@ -3900,7 +3950,7 @@ def move_task( task_id: UUID of the task to be moved slave_id: UUID of the sensor to reassign the task to, empty for master. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tasks.move_task(task_id, slave_id=slave_id) ) @@ -3910,7 +3960,9 @@ def start_task(self, task_id: EntityID) -> T: Args: task_id: UUID of the task to be started """ - return self._send_and_transform_command(Tasks.start_task(task_id)) + return self._send_request_and_transform_response( + Tasks.start_task(task_id) + ) def resume_task(self, task_id: EntityID) -> T: """Resume an existing stopped task @@ -3918,7 +3970,9 @@ def resume_task(self, task_id: EntityID) -> T: Args: task_id: UUID of the task to be resumed """ - return self._send_and_transform_command(Tasks.resume_task(task_id)) + return self._send_request_and_transform_response( + Tasks.resume_task(task_id) + ) def stop_task(self, task_id: EntityID) -> T: """Stop an existing running task @@ -3926,7 +3980,9 @@ def stop_task(self, task_id: EntityID) -> T: Args: task_id: UUID of the task to be stopped """ - return self._send_and_transform_command(Tasks.stop_task(task_id)) + return self._send_request_and_transform_response( + Tasks.stop_task(task_id) + ) def clone_ticket(self, ticket_id: EntityID) -> T: """Clone an existing ticket @@ -3934,7 +3990,9 @@ def clone_ticket(self, ticket_id: EntityID) -> T: Args: ticket_id: UUID of an existing ticket to clone from """ - return self._send_and_transform_command(Tickets.clone_ticket(ticket_id)) + return self._send_request_and_transform_response( + Tickets.clone_ticket(ticket_id) + ) def create_ticket( self, @@ -3952,7 +4010,7 @@ def create_ticket( note: A note about opening the ticket comment: Comment for the ticket """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tickets.create_ticket( result_id=result_id, assigned_to_user_id=assigned_to_user_id, @@ -3970,7 +4028,7 @@ def delete_ticket( ticket_id: UUID of the ticket to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tickets.delete_ticket(ticket_id, ultimate=ultimate) ) @@ -3988,7 +4046,7 @@ def get_tickets( filter_id: UUID of an existing filter to use for the query trash: True to request the tickets in the trashcan """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tickets.get_tickets( filter_string=filter_string, filter_id=filter_id, trash=trash ) @@ -4000,7 +4058,9 @@ def get_ticket(self, ticket_id: EntityID) -> T: Args: ticket_id: UUID of an existing ticket """ - return self._send_and_transform_command(Tickets.get_ticket(ticket_id)) + return self._send_request_and_transform_response( + Tickets.get_ticket(ticket_id) + ) def modify_ticket( self, @@ -4021,7 +4081,7 @@ def modify_ticket( to comment: Comment for the ticket """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Tickets.modify_ticket( ticket_id, status=status, @@ -4037,7 +4097,7 @@ def clone_tls_certificate(self, tls_certificate_id: EntityID) -> T: Args: tls_certificate_id: The UUID of an existing TLS certificate """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TLSCertificates.clone_tls_certificate(tls_certificate_id) ) @@ -4058,7 +4118,7 @@ def create_tls_certificate( comment: Comment for the TLS certificate. trust: Whether the certificate is trusted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TLSCertificates.create_tls_certificate( name, certificate, comment=comment, trust=trust ) @@ -4070,7 +4130,7 @@ def delete_tls_certificate(self, tls_certificate_id: EntityID) -> T: Args: tls_certificate_id: UUID of the tls certificate to be deleted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TLSCertificates.delete_tls_certificate(tls_certificate_id) ) @@ -4092,7 +4152,7 @@ def get_tls_certificates( details: Whether to include additional details of the tls certificates """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TLSCertificates.get_tls_certificates( filter_string=filter_string, filter_id=filter_id, @@ -4107,7 +4167,7 @@ def get_tls_certificate(self, tls_certificate_id: EntityID) -> T: Args: tls_certificate_id: UUID of an existing TLS certificate """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TLSCertificates.get_tls_certificate(tls_certificate_id) ) @@ -4127,7 +4187,7 @@ def modify_tls_certificate( comment: Comment for the TLS certificate. trust: Whether the certificate is trusted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( TLSCertificates.modify_tls_certificate( tls_certificate_id, name=name, comment=comment, trust=trust ) @@ -4145,7 +4205,7 @@ def get_vulnerabilities( filter_string: Filter term to use for the query filter_id: UUID of an existing filter to use for the query """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Vulnerabilities.get_vulnerabilities( filter_string=filter_string, filter_id=filter_id ) @@ -4157,7 +4217,7 @@ def get_vulnerability(self, vulnerability_id: EntityID) -> T: Args: vulnerability_id: ID of an existing vulnerability """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Vulnerabilities.get_vulnerability(vulnerability_id) ) @@ -4170,7 +4230,7 @@ def clone_report_format( report_format_id: UUID of the existing report format or ReportFormatType (enum) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.clone_report_format(report_format_id) ) @@ -4187,7 +4247,7 @@ def delete_report_format( or ReportFormatType (enum) ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.delete_report_format( report_format_id, ultimate=ultimate ) @@ -4213,7 +4273,7 @@ def get_report_formats( params: Whether to include report format parameters details: Include report format file, signature and parameters """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.get_report_formats( filter_string=filter_string, filter_id=filter_id, @@ -4233,7 +4293,7 @@ def get_report_format( report_format_id: UUID of an existing report format or ReportFormatType (enum) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.get_report_format(report_format_id) ) @@ -4244,7 +4304,7 @@ def import_report_format(self, report_format: str) -> T: report_format: Report format XML as string to import. This XML must contain a :code:`` root element. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.import_report_format(report_format) ) @@ -4269,7 +4329,7 @@ def modify_report_format( param_name: The name of the param. param_value: The value of the param. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.modify_report_format( report_format_id, active=active, @@ -4295,6 +4355,6 @@ def verify_report_format( report_format_id: UUID of the report format to be verified or ReportFormatType (enum) """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportFormats.verify_report_format(report_format_id) ) diff --git a/gvm/protocols/gmp/_gmp225.py b/gvm/protocols/gmp/_gmp225.py index 1c90a689..d43f7510 100644 --- a/gvm/protocols/gmp/_gmp225.py +++ b/gvm/protocols/gmp/_gmp225.py @@ -70,7 +70,7 @@ def get_resource_names( or USER filter_string: Filter term to use for the query """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ResourceNames.get_resource_names( resource_type, filter_string=filter_string ) @@ -90,6 +90,6 @@ def get_resource_name( SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE or USER """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ResourceNames.get_resource_name(resource_id, resource_type) ) diff --git a/gvm/protocols/gmp/_gmp226.py b/gvm/protocols/gmp/_gmp226.py index 73b9ecc5..cad24f90 100644 --- a/gvm/protocols/gmp/_gmp226.py +++ b/gvm/protocols/gmp/_gmp226.py @@ -78,7 +78,7 @@ def get_resource_names( or USER filter_string: Filter term to use for the query """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ResourceNames.get_resource_names( resource_type, filter_string=filter_string ) @@ -100,7 +100,7 @@ def get_resource_name( SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE or USER """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ResourceNames.get_resource_name(resource_id, resource_type) ) @@ -110,7 +110,7 @@ def delete_report(self, report_id: EntityID) -> T: Args: report_id: UUID of the report to be deleted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.delete_report(report_id) ) @@ -139,7 +139,7 @@ def get_report( details: Request additional report information details defaults to True """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.get_report( report_id, filter_string=filter_string, @@ -173,7 +173,7 @@ def get_reports( "rows". details: Whether to exclude results """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.get_reports( filter_string=filter_string, filter_id=filter_id, @@ -199,7 +199,7 @@ def import_report( task_id: UUID of task to import report to in_asset: Whether to create or update assets using the report """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( Reports.import_report(report, task_id, in_assets=in_assets) ) @@ -209,7 +209,7 @@ def delete_audit_report(self, report_id: EntityID) -> T: Args: report_id: UUID of the report to be deleted. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( AuditReports.delete_report(report_id) ) @@ -238,7 +238,7 @@ def get_audit_report( details: Request additional report information details defaults to True """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( AuditReports.get_report( report_id, filter_string=filter_string, @@ -272,7 +272,7 @@ def get_audit_reports( "rows". details: Whether to exclude results """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( AuditReports.get_reports( filter_string=filter_string, filter_id=filter_id, @@ -301,7 +301,7 @@ def create_filter( """ # override create_filter because of the different FilterType enum # this avoids warnings with type checkers - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.create_filter( name, filter_type=filter_type, comment=comment, term=term ) @@ -327,7 +327,7 @@ def modify_filter( """ # override create_filter because of the different FilterType enum # this avoids warnings with type checkers - return self._send_and_transform_command( + return self._send_request_and_transform_response( Filters.modify_filter( filter_id, comment=comment, @@ -343,7 +343,7 @@ def clone_report_config(self, report_config_id: EntityID) -> T: Args: report_config_id: UUID of the existing report config """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportConfigs.clone_report_config(report_config_id) ) @@ -359,7 +359,7 @@ def delete_report_config( report_config_id: UUID of the report config to be deleted. ultimate: Whether to remove entirely, or to the trashcan. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportConfigs.delete_report_config( report_config_id, ultimate=ultimate ) @@ -381,7 +381,7 @@ def get_report_configs( trash: Whether to get the trashcan report configs instead details: Include report config details """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportConfigs.get_report_configs( filter_string=filter_string, filter_id=filter_id, @@ -399,7 +399,7 @@ def get_report_config( Args: report_config_id: UUID of an existing report config """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportConfigs.get_report_config(report_config_id) ) @@ -419,7 +419,7 @@ def create_report_config( comment: An optional comment for the report config. params: A list of report config parameters. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportConfigs.create_report_config( name, report_format_id, comment=comment, params=params ) @@ -441,7 +441,7 @@ def modify_report_config( comment: An optional comment for the report config. params: A list of report config parameters. """ - return self._send_and_transform_command( + return self._send_request_and_transform_response( ReportConfigs.modify_report_config( report_config_id, name=name, comment=comment, params=params ) diff --git a/gvm/protocols/ospv1.py b/gvm/protocols/ospv1.py index ae4e190a..06a0ffb8 100644 --- a/gvm/protocols/ospv1.py +++ b/gvm/protocols/ospv1.py @@ -8,6 +8,7 @@ .. _Open Scanner Protocol version 1: https://docs.greenbone.net/API/OSP/osp-20.08.html """ + import logging from typing import Any, Optional @@ -78,9 +79,9 @@ def get_protocol_version() -> tuple[int, int]: """ return PROTOCOL_VERSION - def _send_command(self, cmd: Request) -> Response: + def _send_request(self, cmd: Request) -> Response: try: - return super()._send_command(cmd) + return super()._send_request(cmd) finally: # OSP is stateless. Therefore the connection is closed after each # response and we must reset the connection @@ -89,12 +90,12 @@ def _send_command(self, cmd: Request) -> Response: def get_version(self) -> T: """Get the version of the OSPD server which is connected to.""" cmd = XmlCommand("get_version") - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def help(self) -> T: """Get the help text.""" cmd = XmlCommand("help") - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def get_scans( self, @@ -118,7 +119,7 @@ def get_scans( cmd.set_attribute("details", to_bool(details)) cmd.set_attribute("pop_results", to_bool(pop_results)) - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def delete_scan(self, scan_id: str) -> T: """Delete a finished scan. @@ -132,12 +133,12 @@ def delete_scan(self, scan_id: str) -> T: cmd = XmlCommand("delete_scan") cmd.set_attribute("scan_id", scan_id) - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def get_scanner_details(self) -> T: """Return scanner description and parameters.""" cmd = XmlCommand("get_scanner_details") - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def get_vts(self, vt_id: Optional[str] = None) -> T: """Return information about vulnerability tests, @@ -150,7 +151,7 @@ def get_vts(self, vt_id: Optional[str] = None) -> T: if vt_id: cmd.set_attribute("vt_id", vt_id) - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def start_scan( self, @@ -250,7 +251,7 @@ def start_scan( cmd.add_element("vt_selection"), vt_selection ) - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd) def stop_scan(self, scan_id: str) -> T: """Stop a currently running scan. @@ -266,4 +267,4 @@ def stop_scan(self, scan_id: str) -> T: cmd = XmlCommand("stop_scan") cmd.set_attribute("scan_id", scan_id) - return self._send_and_transform_command(cmd) + return self._send_request_and_transform_response(cmd)