diff --git a/src/vinyldns/client.py b/src/vinyldns/client.py index 60a6ddf..4ac769f 100644 --- a/src/vinyldns/client.py +++ b/src/vinyldns/client.py @@ -32,10 +32,13 @@ from vinyldns.batch_change import BatchChange, ListBatchChangeSummaries, to_review_json from vinyldns.membership import Group, ListGroupsResponse, ListGroupChangesResponse, ListMembersResponse, \ - ListAdminsResponse + ListAdminsResponse, GroupChange, UserInfo from vinyldns.serdes import to_json_string -from vinyldns.zone import ListZonesResponse, ListZoneChangesResponse, Zone, ZoneChange -from vinyldns.record import ListRecordSetsResponse, ListRecordSetChangesResponse, RecordSet, RecordSetChange +from vinyldns.zone import ListZonesResponse, ListZoneChangesResponse, Zone, ZoneChange, ZoneDetails, \ + ZoneChangeFailuresResponse, DeletedZonesResponse +from vinyldns.record import ListRecordSetsResponse, ListRecordSetChangesResponse, RecordSet, RecordSetChange, \ + RecordSetCount, RecordSetChangeFailuresResponse, OwnershipTransfer, OwnershipTransferStatus +from vinyldns.status import SystemStatus try: basestring @@ -161,6 +164,24 @@ def __make_request(self, url, method=u'GET', headers=None, body_string=None, **k return self.__check_response(response, method) + def __make_request_raw(self, url, method=u'GET', headers=None, body_string=None, **kwargs): + + # remove retries arg if provided + kwargs.pop(u'retries', None) + + path = urlparse(url).path + query = parse_qs(urlsplit(url).query) + if query: + query = dict((k, v if len(v) > 1 else v[0]) + for k, v in query.items()) + + signed_headers, signed_body = self.__build_vinyldns_request(method, path, body_string, query, + with_headers=headers or {}, **kwargs) + + response = self.session.request(method, url, data=signed_body, headers=signed_headers, **kwargs) + + return self.__check_response_raw(response, method) + def __check_response(self, response, method): status = response.status_code if status == 200 or status == 202: @@ -183,6 +204,28 @@ def __check_response(self, response, method): else: raise ClientError(response.text) + def __check_response_raw(self, response, method): + status = response.status_code + if status == 200 or status == 202: + return response.status_code, response.text + elif status == 404: + if method == 'GET': + return 404, None + else: + raise NotFoundError(response.text) + elif status == 400: + raise BadRequestError(response.text) + elif status == 401: + raise UnauthorizedError(response.text) + elif status == 403: + raise ForbiddenError(response.text) + elif status == 409: + raise ConflictError(response.text) + elif status == 422: + raise UnprocessableError(response.text) + else: + raise ClientError(response.text) + def __build_vinyldns_request(self, method, path, body_data, params=None, **kwargs): if isinstance(body_data, basestring): @@ -379,6 +422,28 @@ def list_group_changes(self, group_id, start_from=None, max_items=None, **kwargs return ListGroupChangesResponse.from_dict(data) + def get_group_change(self, group_change_id, **kwargs): + """ + Get a group change by ID. + + :param group_change_id: the group change ID + :return: the group change details + """ + url = urljoin(self.index_url, u'/groups/change/{0}'.format(group_change_id)) + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + + return GroupChange.from_dict(data) if data is not None else None + + def list_group_valid_domains(self, **kwargs): + """ + List valid email domains for groups. + + :return: list of valid domains + """ + url = urljoin(self.index_url, u'/groups/valid/domains') + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return data if data is not None else [] + def connect_zone(self, zone, **kwargs): """ Create a new zone with the given name and email. @@ -447,6 +512,71 @@ def get_zone_by_name(self, name, **kwargs): response, data = self.__make_request(url, u'GET', self.headers, **kwargs) return Zone.from_dict(data['zone']) if data is not None else None + def get_zone_details(self, zone_id, **kwargs): + """ + Get detailed zone info for the given zone id. + + :param zone_id: the id of the zone to retrieve + :return: the zone details, or will 404 if not found + """ + url = urljoin(self.index_url, u'/zones/{0}/details'.format(zone_id)) + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + + return ZoneDetails.from_dict(data['zone']) if data is not None else None + + def list_zone_backend_ids(self, **kwargs): + """ + List configured backend IDs. + """ + url = urljoin(self.index_url, u'/zones/backendids') + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + + if data is None: + return [] + if isinstance(data, dict): + return data.get('backendIds', []) + return data + + def list_zone_changes_failure(self, name_filter=None, start_from=None, max_items=None, **kwargs): + """ + List failed zone changes. + """ + args = [] + if name_filter: + args.append(u'nameFilter={0}'.format(name_filter)) + if start_from: + args.append(u'startFrom={0}'.format(start_from)) + if max_items is not None: + args.append(u'maxItems={0}'.format(max_items)) + + url = urljoin(self.index_url, u'/metrics/health/zonechangesfailure') + if args: + url = url + u'?' + u'&'.join(args) + + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return ZoneChangeFailuresResponse.from_dict(data) + + def list_deleted_zones(self, name_filter=None, start_from=None, max_items=None, ignore_access=None, **kwargs): + """ + List deleted zone changes. + """ + args = [] + if name_filter: + args.append(u'nameFilter={0}'.format(name_filter)) + if start_from: + args.append(u'startFrom={0}'.format(start_from)) + if max_items is not None: + args.append(u'maxItems={0}'.format(max_items)) + if ignore_access is not None: + args.append(u'ignoreAccess={0}'.format(ignore_access)) + + url = urljoin(self.index_url, u'/zones/deleted/changes') + if args: + url = url + u'?' + u'&'.join(args) + + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return DeletedZonesResponse.from_dict(data) + def list_zone_changes(self, zone_id, start_from=None, max_items=None, **kwargs): """ Get the zone changes for the given zone id. @@ -564,6 +694,84 @@ def list_record_sets(self, zone_id, start_from=None, max_items=None, record_name response, data = self.__make_request(url, u'GET', self.headers, **kwargs) return ListRecordSetsResponse.from_dict(data) + def get_record_set_count(self, zone_id, **kwargs): + """ + Get record set count for a zone. + """ + url = urljoin(self.index_url, u'/zones/{0}/recordsetcount'.format(zone_id)) + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return RecordSetCount.from_dict(data) + + def list_record_set_change_history(self, zone_id, fqdn, record_type, start_from=None, max_items=None, **kwargs): + """ + Retrieve record set change history for a FQDN and type. + """ + args = [u'zoneId={0}'.format(zone_id), u'fqdn={0}'.format(fqdn), u'recordType={0}'.format(record_type)] + if start_from: + args.append(u'startFrom={0}'.format(start_from)) + if max_items is not None: + args.append(u'maxItems={0}'.format(max_items)) + + url = urljoin(self.index_url, u'/recordsetchange/history') + u'?' + u'&'.join(args) + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return ListRecordSetChangesResponse.from_dict(data) + + def list_record_set_changes_failure(self, zone_id, start_from=None, max_items=None, **kwargs): + """ + List failed record set changes for a zone. + """ + args = [] + if start_from: + args.append(u'startFrom={0}'.format(start_from)) + if max_items is not None: + args.append(u'maxItems={0}'.format(max_items)) + + url = urljoin(self.index_url, u'/metrics/health/zones/{0}/recordsetchangesfailure'.format(zone_id)) + if args: + url = url + u'?' + u'&'.join(args) + + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return RecordSetChangeFailuresResponse.from_dict(data) + + def request_record_set_ownership(self, record_set, requested_owner_group_id, **kwargs): + """ + Request record set ownership transfer. + """ + return self.__record_set_ownership_transfer(record_set, OwnershipTransferStatus.Requested, + requested_owner_group_id, **kwargs) + + def approve_record_set_ownership(self, record_set, requested_owner_group_id, **kwargs): + """ + Approve record set ownership transfer. + """ + return self.__record_set_ownership_transfer(record_set, OwnershipTransferStatus.ManuallyApproved, + requested_owner_group_id, update_owner_group=True, **kwargs) + + def reject_record_set_ownership(self, record_set, requested_owner_group_id, **kwargs): + """ + Reject record set ownership transfer. + """ + return self.__record_set_ownership_transfer(record_set, OwnershipTransferStatus.ManuallyRejected, + requested_owner_group_id, **kwargs) + + def cancel_record_set_ownership(self, record_set, requested_owner_group_id, **kwargs): + """ + Cancel record set ownership transfer. + """ + return self.__record_set_ownership_transfer(record_set, OwnershipTransferStatus.Cancelled, + requested_owner_group_id, **kwargs) + + def __record_set_ownership_transfer(self, record_set, status, requested_owner_group_id, + update_owner_group=False, **kwargs): + record_set.record_set_group_change = OwnershipTransfer( + ownership_transfer_status=status, + requested_owner_group_id=requested_owner_group_id + ) + if update_owner_group: + record_set.owner_group_id = requested_owner_group_id + + return self.update_record_set(record_set, **kwargs) + def search_record_sets(self, start_from=None, max_items=None, record_name_filter=None, record_type_filter=None, record_owner_group_filter=None, name_sort=None, **kwargs): """ @@ -745,3 +953,83 @@ def delete_zone_acl_rule(self, zone_id, acl_rule, **kwargs): to_json_string(acl_rule), **kwargs) return ZoneChange.from_dict(data) + + def ping(self, **kwargs): + """ + Simple health check. + """ + url = urljoin(self.index_url, u'/ping') + response, data = self.__make_request_raw(url, u'GET', self.headers, **kwargs) + return data + + def health(self, **kwargs): + """ + Comprehensive health check. + """ + url = urljoin(self.index_url, u'/health') + response, data = self.__make_request_raw(url, u'GET', self.headers, **kwargs) + return data + + def color(self, **kwargs): + """ + Blue/green deployment status. + """ + url = urljoin(self.index_url, u'/color') + response, data = self.__make_request_raw(url, u'GET', self.headers, **kwargs) + return data + + def metrics_prometheus(self, names=None, **kwargs): + """ + Prometheus metrics export. + """ + args = [] + if names: + for name in names: + args.append(u'name={0}'.format(name)) + + url = urljoin(self.index_url, u'/metrics/prometheus') + if args: + url = url + u'?' + u'&'.join(args) + + response, data = self.__make_request_raw(url, u'GET', self.headers, **kwargs) + return data + + def get_status(self, **kwargs): + """ + Get system processing status. + """ + url = urljoin(self.index_url, u'/status') + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return SystemStatus.from_dict(data) + + def update_status(self, processing_disabled, **kwargs): + """ + Enable/disable processing (admin). + """ + url = urljoin(self.index_url, u'/status?processingDisabled={0}'.format(str(processing_disabled).lower())) + response, data = self.__make_request(url, u'POST', self.headers, **kwargs) + return SystemStatus.from_dict(data) + + def get_user(self, user_id, **kwargs): + """ + Get user by ID. + """ + url = urljoin(self.index_url, u'/users/{0}'.format(user_id)) + response, data = self.__make_request(url, u'GET', self.headers, **kwargs) + return UserInfo.from_dict(data) if data is not None else None + + def lock_user(self, user_id, **kwargs): + """ + Lock a user (admin). + """ + url = urljoin(self.index_url, u'/users/{0}/lock'.format(user_id)) + response, data = self.__make_request(url, u'PUT', self.headers, **kwargs) + return UserInfo.from_dict(data) + + def unlock_user(self, user_id, **kwargs): + """ + Unlock a user (admin). + """ + url = urljoin(self.index_url, u'/users/{0}/unlock'.format(user_id)) + response, data = self.__make_request(url, u'PUT', self.headers, **kwargs) + return UserInfo.from_dict(data) diff --git a/src/vinyldns/membership.py b/src/vinyldns/membership.py index 67aa948..beddb5a 100644 --- a/src/vinyldns/membership.py +++ b/src/vinyldns/membership.py @@ -46,6 +46,40 @@ def from_dict(d): ) +class UserGroup(object): + def __init__(self, id): + self.id = id + + @staticmethod + def from_dict(d): + return UserGroup(id=d.get('id')) + + +class UserInfo(object): + def __init__(self, id, user_name=None, group_ids=None, lock_status=None): + self.id = id + self.user_name = user_name + self.group_ids = group_ids or [] + self.lock_status = lock_status + + @staticmethod + def from_dict(d): + group_ids = d.get('groupId', []) + parsed_groups = [] + for entry in group_ids: + if isinstance(entry, dict): + parsed_groups.append(UserGroup.from_dict(entry)) + else: + parsed_groups.append(UserGroup(entry)) + + return UserInfo( + id=d.get('id'), + user_name=d.get('userName'), + group_ids=parsed_groups, + lock_status=d.get('lockStatus') + ) + + class Group(object): def __init__(self, name, email, description=None, created=None, members=[], admins=[], id=None): self.name = name @@ -90,13 +124,16 @@ def from_dict(d): class GroupChange(object): - def __init__(self, new_group, change_type, user_id, old_group, id, created): + def __init__(self, new_group, change_type, user_id, old_group, id, created, + user_name=None, group_change_message=None): self.new_group = new_group self.change_type = change_type self.user_id = user_id self.old_group = old_group self.id = id self.created = created + self.user_name = user_name + self.group_change_message = group_change_message @staticmethod def from_dict(d): @@ -106,7 +143,9 @@ def from_dict(d): user_id=d['userId'], old_group=map_option(d.get('oldGroup'), Group.from_dict), id=d['id'], - created=map_option(d.get('created'), parse_datetime) + created=map_option(d.get('created'), parse_datetime), + user_name=d.get('userName'), + group_change_message=d.get('groupChangeMessage') ) diff --git a/src/vinyldns/record.py b/src/vinyldns/record.py index 141dc25..32e0826 100644 --- a/src/vinyldns/record.py +++ b/src/vinyldns/record.py @@ -41,6 +41,28 @@ class RecordSetStatus: PendingDelete = "PendingDelete" +class OwnershipTransferStatus: + AutoApproved = "AutoApproved" + Cancelled = "Cancelled" + ManuallyApproved = "ManuallyApproved" + ManuallyRejected = "ManuallyRejected" + Requested = "Requested" + PendingReview = "PendingReview" + + +class OwnershipTransfer(object): + def __init__(self, ownership_transfer_status, requested_owner_group_id=None): + self.ownership_transfer_status = ownership_transfer_status + self.requested_owner_group_id = requested_owner_group_id + + @staticmethod + def from_dict(d): + return OwnershipTransfer( + ownership_transfer_status=d.get('ownershipTransferStatus'), + requested_owner_group_id=d.get('requestedOwnerGroupId') + ) + + class AData(object): def __init__(self, address): self.address = address @@ -180,7 +202,8 @@ def from_dict(d): class RecordSet(object): def __init__(self, zone_id, name, type, ttl, status=None, created=None, - updated=None, records=[], id=None, owner_group_id=None,fqdn=None): + updated=None, records=[], id=None, owner_group_id=None, fqdn=None, + record_set_group_change=None): self.zone_id = zone_id self.name = name self.type = type @@ -191,7 +214,8 @@ def __init__(self, zone_id, name, type, ttl, status=None, created=None, self.records = records self.id = id self.owner_group_id = owner_group_id - self.fqdn=fqdn + self.fqdn = fqdn + self.record_set_group_change = record_set_group_change @staticmethod def from_dict(d): @@ -206,7 +230,8 @@ def from_dict(d): records=[rdata_converters[d['type']](rd) for rd in d.get('records', [])], id=d.get('id'), owner_group_id=d.get('ownerGroupId'), - fqdn=d.get('fqdn') + fqdn=d.get('fqdn'), + record_set_group_change=map_option(d.get('recordSetGroupChange'), OwnershipTransfer.from_dict) ) @@ -273,3 +298,30 @@ def from_dict(d): return ListRecordSetChangesResponse(zone_id=d['zoneId'], record_set_changes=changes, next_id=d.get('nextId'), start_from=d.get('startFrom'), max_items=d['maxItems']) + + +class RecordSetCount(object): + def __init__(self, count): + self.count = count + + @staticmethod + def from_dict(d): + return RecordSetCount(count=d.get('count')) + + +class RecordSetChangeFailuresResponse(object): + def __init__(self, failed_record_set_changes, start_from=None, next_id=None, max_items=None): + self.failed_record_set_changes = failed_record_set_changes + self.start_from = start_from + self.next_id = next_id + self.max_items = max_items + + @staticmethod + def from_dict(d): + changes = [RecordSetChange.from_dict(elem) for elem in d.get('failedRecordSetChanges', [])] + return RecordSetChangeFailuresResponse( + failed_record_set_changes=changes, + start_from=d.get('startFrom'), + next_id=d.get('nextId'), + max_items=d.get('maxItems') + ) diff --git a/src/vinyldns/status.py b/src/vinyldns/status.py new file mode 100644 index 0000000..91c9cc2 --- /dev/null +++ b/src/vinyldns/status.py @@ -0,0 +1,31 @@ +# Copyright 2026 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TODO: Add module docstring.""" + + +class SystemStatus(object): + def __init__(self, processing_disabled, color=None, key_name=None, version=None): + self.processing_disabled = processing_disabled + self.color = color + self.key_name = key_name + self.version = version + + @staticmethod + def from_dict(d): + return SystemStatus( + processing_disabled=d.get('processingDisabled'), + color=d.get('color'), + key_name=d.get('keyName'), + version=d.get('version') + ) diff --git a/src/vinyldns/zone.py b/src/vinyldns/zone.py index 8e71428..4a8999f 100644 --- a/src/vinyldns/zone.py +++ b/src/vinyldns/zone.py @@ -105,6 +105,25 @@ def from_dict(d): ) +class ZoneDetails(object): + def __init__(self, name, email, status, admin_group_id, admin_group_name): + self.name = name + self.email = email + self.status = status + self.admin_group_id = admin_group_id + self.admin_group_name = admin_group_name + + @staticmethod + def from_dict(d): + return ZoneDetails( + name=d.get('name'), + email=d.get('email'), + status=d.get('status'), + admin_group_id=d.get('adminGroupId'), + admin_group_name=d.get('adminGroupName') + ) + + class ListZonesResponse(object): def __init__(self, zones, name_filter, start_from=None, next_id=None, max_items=100): self.zones = zones @@ -151,3 +170,58 @@ def from_dict(d): zone_changes = [ZoneChange.from_dict(elem) for elem in d.get('zoneChanges', [])] return ListZoneChangesResponse(zone_id=d['zoneId'], zone_changes=zone_changes, next_id=d.get('nextId'), start_from=d.get('startFrom'), max_items=d.get('maxItems', 100)) + + +class ZoneChangeFailuresResponse(object): + def __init__(self, failed_zone_changes, start_from=None, next_id=None, max_items=None): + self.failed_zone_changes = failed_zone_changes + self.start_from = start_from + self.next_id = next_id + self.max_items = max_items + + @staticmethod + def from_dict(d): + changes = [ZoneChange.from_dict(elem) for elem in d.get('failedZoneChanges', [])] + return ZoneChangeFailuresResponse( + failed_zone_changes=changes, + start_from=d.get('startFrom'), + next_id=d.get('nextId'), + max_items=d.get('maxItems') + ) + + +class DeletedZoneInfo(object): + def __init__(self, zone_change, admin_group_name=None, user_name=None, access_level=None): + self.zone_change = zone_change + self.admin_group_name = admin_group_name + self.user_name = user_name + self.access_level = access_level + + @staticmethod + def from_dict(d): + return DeletedZoneInfo( + zone_change=ZoneChange.from_dict(d.get('zoneChange')), + admin_group_name=d.get('adminGroupName'), + user_name=d.get('userName'), + access_level=d.get('accessLevel') + ) + + +class DeletedZonesResponse(object): + def __init__(self, zones_deleted_info, start_from=None, next_id=None, max_items=None, ignore_access=None): + self.zones_deleted_info = zones_deleted_info + self.start_from = start_from + self.next_id = next_id + self.max_items = max_items + self.ignore_access = ignore_access + + @staticmethod + def from_dict(d): + zones = [DeletedZoneInfo.from_dict(elem) for elem in d.get('zonesDeletedInfo', [])] + return DeletedZonesResponse( + zones_deleted_info=zones, + start_from=d.get('startFrom'), + next_id=d.get('nextId'), + max_items=d.get('maxItems'), + ignore_access=d.get('ignoreAccess') + ) diff --git a/tests/test_health_status.py b/tests/test_health_status.py new file mode 100644 index 0000000..90b077f --- /dev/null +++ b/tests/test_health_status.py @@ -0,0 +1,83 @@ +# Copyright 2026 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TODO: Add module docstring.""" + +import responses + +from vinyldns.serdes import to_json_string + + +def test_ping(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/ping', + body='PONG', status=200) + + assert vinyldns_client.ping() == 'PONG' + + +def test_health(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/health', + body='OK', status=200) + + assert vinyldns_client.health() == 'OK' + + +def test_color(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/color', + body='blue', status=200) + + assert vinyldns_client.color() == 'blue' + + +def test_metrics_prometheus(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/metrics/prometheus?name=foo&name=bar', + body='metric 1', status=200) + + assert vinyldns_client.metrics_prometheus(['foo', 'bar']) == 'metric 1' + + +def test_status_get(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/status', + body=to_json_string({ + 'processingDisabled': False, + 'color': 'blue', + 'keyName': 'vinyldns.', + 'version': '0.21.3' + }), + status=200) + + status = vinyldns_client.get_status() + assert status.processing_disabled is False + assert status.color == 'blue' + assert status.key_name == 'vinyldns.' + assert status.version == '0.21.3' + + +def test_status_update(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.POST, 'http://test.com/status?processingDisabled=true', + body=to_json_string({ + 'processingDisabled': True, + 'color': 'blue', + 'keyName': 'vinyldns.', + 'version': '0.21.3' + }), + status=200) + + status = vinyldns_client.update_status(True) + assert status.processing_disabled is True diff --git a/tests/test_membership_extras.py b/tests/test_membership_extras.py new file mode 100644 index 0000000..3fea23a --- /dev/null +++ b/tests/test_membership_extras.py @@ -0,0 +1,44 @@ +# Copyright 2026 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TODO: Add module docstring.""" + +import datetime + +import responses + +from sampledata import sample_group +from vinyldns.membership import GroupChange +from vinyldns.serdes import to_json_string + + +def test_get_group_change(mocked_responses, vinyldns_client): + change = GroupChange(sample_group, 'Update', 'user-id', sample_group, 'change-id', datetime.datetime.utcnow(), + user_name='user', group_change_message='updated') + mocked_responses.add( + responses.GET, 'http://test.com/groups/change/change-id', + body=to_json_string(change), status=200) + + resp = vinyldns_client.get_group_change('change-id') + assert resp.id == 'change-id' + assert resp.user_name == 'user' + assert resp.group_change_message == 'updated' + + +def test_list_group_valid_domains(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/groups/valid/domains', + body=to_json_string(['example.com', 'test.com']), status=200) + + resp = vinyldns_client.list_group_valid_domains() + assert resp == ['example.com', 'test.com'] diff --git a/tests/test_record_extras.py b/tests/test_record_extras.py new file mode 100644 index 0000000..260c8c9 --- /dev/null +++ b/tests/test_record_extras.py @@ -0,0 +1,159 @@ +# Copyright 2026 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TODO: Add module docstring.""" + +import copy +import json + +import responses + +from sampledata import forward_zone, record_set_values, gen_rs_change +from vinyldns.record import OwnershipTransferStatus +from vinyldns.serdes import to_json_string + + +def _request_json(request): + body = request.body + if isinstance(body, bytes): + body = body.decode('utf-8') + return json.loads(body) + + +def test_get_record_set_count(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/zones/{0}/recordsetcount'.format(forward_zone.id), + body=to_json_string({'count': 5}), status=200) + + resp = vinyldns_client.get_record_set_count(forward_zone.id) + assert resp.count == 5 + + +def test_list_record_set_change_history(mocked_responses, vinyldns_client): + change = gen_rs_change(list(record_set_values)[0]) + body = { + 'zoneId': forward_zone.id, + 'recordSetChanges': [change], + 'startFrom': 'start', + 'nextId': 'next', + 'maxItems': 100 + } + mocked_responses.add( + responses.GET, + 'http://test.com/recordsetchange/history?zoneId={0}&fqdn=rs.ok.&recordType=A&startFrom=start&maxItems=100'.format( + forward_zone.id), + body=to_json_string(body), status=200) + + resp = vinyldns_client.list_record_set_change_history( + forward_zone.id, 'rs.ok.', 'A', start_from='start', max_items=100) + assert resp.zone_id == forward_zone.id + assert len(resp.record_set_changes) == 1 + + +def test_list_record_set_changes_failure(mocked_responses, vinyldns_client): + change = gen_rs_change(list(record_set_values)[0]) + body = { + 'failedRecordSetChanges': [change], + 'startFrom': 'start', + 'nextId': 'next', + 'maxItems': 100 + } + mocked_responses.add( + responses.GET, + 'http://test.com/metrics/health/zones/{0}/recordsetchangesfailure?startFrom=start&maxItems=100'.format( + forward_zone.id), + body=to_json_string(body), status=200) + + resp = vinyldns_client.list_record_set_changes_failure(forward_zone.id, start_from='start', max_items=100) + assert len(resp.failed_record_set_changes) == 1 + + +def test_record_set_ownership_request(mocked_responses, vinyldns_client): + rs = copy.deepcopy(list(record_set_values)[0]) + rs.id = 'rs-id' + + def request_callback(request): + payload = _request_json(request) + group_change = payload.get('recordSetGroupChange') + assert group_change['ownershipTransferStatus'] == OwnershipTransferStatus.Requested + assert group_change['requestedOwnerGroupId'] == 'target-group' + return (200, {}, to_json_string(gen_rs_change(rs))) + + mocked_responses.add_callback( + responses.PUT, + 'http://test.com/zones/{0}/recordsets/{1}'.format(rs.zone_id, rs.id), + callback=request_callback + ) + + vinyldns_client.request_record_set_ownership(rs, 'target-group') + + +def test_record_set_ownership_approve(mocked_responses, vinyldns_client): + rs = copy.deepcopy(list(record_set_values)[0]) + rs.id = 'rs-id' + + def request_callback(request): + payload = _request_json(request) + group_change = payload.get('recordSetGroupChange') + assert group_change['ownershipTransferStatus'] == OwnershipTransferStatus.ManuallyApproved + assert group_change['requestedOwnerGroupId'] == 'target-group' + assert payload.get('ownerGroupId') == 'target-group' + return (200, {}, to_json_string(gen_rs_change(rs))) + + mocked_responses.add_callback( + responses.PUT, + 'http://test.com/zones/{0}/recordsets/{1}'.format(rs.zone_id, rs.id), + callback=request_callback + ) + + vinyldns_client.approve_record_set_ownership(rs, 'target-group') + + +def test_record_set_ownership_reject(mocked_responses, vinyldns_client): + rs = copy.deepcopy(list(record_set_values)[0]) + rs.id = 'rs-id' + + def request_callback(request): + payload = _request_json(request) + group_change = payload.get('recordSetGroupChange') + assert group_change['ownershipTransferStatus'] == OwnershipTransferStatus.ManuallyRejected + assert group_change['requestedOwnerGroupId'] == 'target-group' + return (200, {}, to_json_string(gen_rs_change(rs))) + + mocked_responses.add_callback( + responses.PUT, + 'http://test.com/zones/{0}/recordsets/{1}'.format(rs.zone_id, rs.id), + callback=request_callback + ) + + vinyldns_client.reject_record_set_ownership(rs, 'target-group') + + +def test_record_set_ownership_cancel(mocked_responses, vinyldns_client): + rs = copy.deepcopy(list(record_set_values)[0]) + rs.id = 'rs-id' + + def request_callback(request): + payload = _request_json(request) + group_change = payload.get('recordSetGroupChange') + assert group_change['ownershipTransferStatus'] == OwnershipTransferStatus.Cancelled + assert group_change['requestedOwnerGroupId'] == 'target-group' + return (200, {}, to_json_string(gen_rs_change(rs))) + + mocked_responses.add_callback( + responses.PUT, + 'http://test.com/zones/{0}/recordsets/{1}'.format(rs.zone_id, rs.id), + callback=request_callback + ) + + vinyldns_client.cancel_record_set_ownership(rs, 'target-group') diff --git a/tests/test_users.py b/tests/test_users.py new file mode 100644 index 0000000..b936e48 --- /dev/null +++ b/tests/test_users.py @@ -0,0 +1,65 @@ +# Copyright 2026 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TODO: Add module docstring.""" + +import responses + +from vinyldns.serdes import to_json_string + + +def test_get_user(mocked_responses, vinyldns_client): + body = { + 'id': 'user-id', + 'userName': 'user', + 'groupId': [{'id': 'group-1'}, {'id': 'group-2'}], + 'lockStatus': 'Unlocked' + } + mocked_responses.add( + responses.GET, 'http://test.com/users/user-id', + body=to_json_string(body), status=200) + + user = vinyldns_client.get_user('user-id') + assert user.id == 'user-id' + assert user.user_name == 'user' + assert len(user.group_ids) == 2 + + +def test_lock_user(mocked_responses, vinyldns_client): + body = { + 'id': 'user-id', + 'userName': 'user', + 'groupId': [{'id': 'group-1'}], + 'lockStatus': 'Locked' + } + mocked_responses.add( + responses.PUT, 'http://test.com/users/user-id/lock', + body=to_json_string(body), status=200) + + user = vinyldns_client.lock_user('user-id') + assert user.lock_status == 'Locked' + + +def test_unlock_user(mocked_responses, vinyldns_client): + body = { + 'id': 'user-id', + 'userName': 'user', + 'groupId': [{'id': 'group-1'}], + 'lockStatus': 'Unlocked' + } + mocked_responses.add( + responses.PUT, 'http://test.com/users/user-id/unlock', + body=to_json_string(body), status=200) + + user = vinyldns_client.unlock_user('user-id') + assert user.lock_status == 'Unlocked' diff --git a/tests/test_zone_extras.py b/tests/test_zone_extras.py new file mode 100644 index 0000000..da11820 --- /dev/null +++ b/tests/test_zone_extras.py @@ -0,0 +1,85 @@ +# Copyright 2026 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TODO: Add module docstring.""" + +import responses + +from sampledata import sample_zone_change, forward_zone +from vinyldns.serdes import to_json_string + + +def test_get_zone_details(mocked_responses, vinyldns_client): + body = { + 'zone': { + 'name': forward_zone.name, + 'email': forward_zone.email, + 'status': 'Active', + 'adminGroupId': forward_zone.admin_group_id, + 'adminGroupName': 'admins' + } + } + mocked_responses.add( + responses.GET, 'http://test.com/zones/{0}/details'.format(forward_zone.id), + body=to_json_string(body), status=200) + + details = vinyldns_client.get_zone_details(forward_zone.id) + assert details.admin_group_name == 'admins' + assert details.admin_group_id == forward_zone.admin_group_id + + +def test_list_zone_backend_ids(mocked_responses, vinyldns_client): + mocked_responses.add( + responses.GET, 'http://test.com/zones/backendids', + body=to_json_string({'backendIds': ['b1', 'b2']}), status=200) + + assert vinyldns_client.list_zone_backend_ids() == ['b1', 'b2'] + + +def test_list_zone_changes_failure(mocked_responses, vinyldns_client): + body = { + 'failedZoneChanges': [sample_zone_change], + 'startFrom': 'start', + 'nextId': 'next', + 'maxItems': 100 + } + mocked_responses.add( + responses.GET, 'http://test.com/metrics/health/zonechangesfailure?startFrom=start&maxItems=100', + body=to_json_string(body), status=200) + + resp = vinyldns_client.list_zone_changes_failure(start_from='start', max_items=100) + assert resp.start_from == 'start' + assert resp.next_id == 'next' + assert len(resp.failed_zone_changes) == 1 + + +def test_list_deleted_zones(mocked_responses, vinyldns_client): + body = { + 'zonesDeletedInfo': [{ + 'zoneChange': sample_zone_change, + 'adminGroupName': 'admins', + 'userName': 'user', + 'accessLevel': 'Read' + }], + 'startFrom': 'start', + 'nextId': 'next', + 'maxItems': 100, + 'ignoreAccess': True + } + mocked_responses.add( + responses.GET, 'http://test.com/zones/deleted/changes?startFrom=start&maxItems=100&ignoreAccess=True', + body=to_json_string(body), status=200) + + resp = vinyldns_client.list_deleted_zones(start_from='start', max_items=100, ignore_access=True) + assert resp.ignore_access is True + assert len(resp.zones_deleted_info) == 1