diff --git a/src/spaceone/inventory_v2/error/__init__.py b/src/spaceone/inventory_v2/error/__init__.py index e4099d8..ef92057 100644 --- a/src/spaceone/inventory_v2/error/__init__.py +++ b/src/spaceone/inventory_v2/error/__init__.py @@ -1,3 +1,4 @@ from spaceone.inventory_v2.error.region import * from spaceone.inventory_v2.error.collector import * +from spaceone.inventory_v2.error.collector_rule import * from spaceone.inventory_v2.error.metric import * diff --git a/src/spaceone/inventory_v2/error/collector_rule.py b/src/spaceone/inventory_v2/error/collector_rule.py new file mode 100644 index 0000000..7f54b6b --- /dev/null +++ b/src/spaceone/inventory_v2/error/collector_rule.py @@ -0,0 +1,13 @@ +from spaceone.core.error import * + + +class ERROR_NOT_ALLOWED_TO_UPDATE_RULE(ERROR_INVALID_ARGUMENT): + _message = "If rule_type is MANAGED, it cannot be updated." + + +class ERROR_NOT_ALLOWED_TO_CHANGE_ORDER(ERROR_INVALID_ARGUMENT): + _message = "If rule_type is MANAGED, it cannot be changed." + + +class ERROR_NOT_ALLOWED_TO_DELETE_RULE(ERROR_INVALID_ARGUMENT): + _message = "If rule_type is MANAGED, it cannot be deleted." diff --git a/src/spaceone/inventory_v2/interface/grpc/__init__.py b/src/spaceone/inventory_v2/interface/grpc/__init__.py index cc0ee6c..5665b19 100644 --- a/src/spaceone/inventory_v2/interface/grpc/__init__.py +++ b/src/spaceone/inventory_v2/interface/grpc/__init__.py @@ -3,6 +3,7 @@ from .asset import Asset from .region import Region from .collector import Collector +from .collector_rule import CollectorRule from .job import Job from .job_task import JobTask from .metric import Metric @@ -17,6 +18,7 @@ app = GRPCServer() app.add_service(Region) app.add_service(Collector) +app.add_service(CollectorRule) app.add_service(Job) app.add_service(JobTask) app.add_service(NamespaceGroup) diff --git a/src/spaceone/inventory_v2/interface/grpc/collector_rule.py b/src/spaceone/inventory_v2/interface/grpc/collector_rule.py new file mode 100644 index 0000000..9d78ff2 --- /dev/null +++ b/src/spaceone/inventory_v2/interface/grpc/collector_rule.py @@ -0,0 +1,51 @@ +from spaceone.api.inventory_v2.v1 import collector_rule_pb2, collector_rule_pb2_grpc +from spaceone.core.pygrpc import BaseAPI + +from spaceone.inventory_v2.service.collector_rule_service import CollectorRuleService + + +class CollectorRule(BaseAPI, collector_rule_pb2_grpc.CollectorRuleServicer): + pb2 = collector_rule_pb2 + pb2_grpc = collector_rule_pb2_grpc + + def create(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + response: dict = collector_rule_svc.create(params) + return self.dict_to_message(response) + + def update(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + response: dict = collector_rule_svc.update(params) + return self.dict_to_message(response) + + def delete(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + collector_rule_svc.delete(params) + return self.empty() + + def get(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + response: dict = collector_rule_svc.get(params) + return self.dict_to_message(response) + + def change_order(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + response: dict = collector_rule_svc.change_order(params) + return self.dict_to_message(response) + + def list(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + response: dict = collector_rule_svc.list(params) + return self.dict_to_message(response) + + def stat(self, request, context): + params, metadata = self.parse_request(request, context) + collector_rule_svc = CollectorRuleService(metadata) + response: dict = collector_rule_svc.stat(params) + return self.dict_to_message(response) diff --git a/src/spaceone/inventory_v2/manager/__init__.py b/src/spaceone/inventory_v2/manager/__init__.py index 083af3e..cf9697d 100644 --- a/src/spaceone/inventory_v2/manager/__init__.py +++ b/src/spaceone/inventory_v2/manager/__init__.py @@ -1,5 +1,6 @@ from spaceone.inventory_v2.manager.asset_manager import AssetManager from spaceone.inventory_v2.manager.asset_type_manager import AssetTypeManager +from spaceone.inventory_v2.manager.collector_rule_manager import CollectorRuleManager from spaceone.inventory_v2.manager.region_manager import RegionManager from spaceone.inventory_v2.manager.collecting_manager import CollectingManager from spaceone.inventory_v2.manager.namespace_group_manager import NamespaceGroupManager diff --git a/src/spaceone/inventory_v2/model/collector_rule/database.py b/src/spaceone/inventory_v2/model/collector_rule/database.py index 3721fc9..30dec16 100644 --- a/src/spaceone/inventory_v2/model/collector_rule/database.py +++ b/src/spaceone/inventory_v2/model/collector_rule/database.py @@ -33,6 +33,7 @@ class CollectorRule(MongoModel): workspace_id = StringField(max_length=40) domain_id = StringField(max_length=40) created_at = DateTimeField(auto_now_add=True) + updated_at = DateTimeField(auto_now=True) meta = { "updatable_fields": [ diff --git a/src/spaceone/inventory_v2/model/collector_rule/request.py b/src/spaceone/inventory_v2/model/collector_rule/request.py new file mode 100644 index 0000000..437141a --- /dev/null +++ b/src/spaceone/inventory_v2/model/collector_rule/request.py @@ -0,0 +1,73 @@ +from typing import Union, Literal +from pydantic import BaseModel + +__all__ = [ + "CollectorRuleCreateRequest", + "CollectorRuleUpdateRequest", + "CollectorRuleDeleteRequest", + "CollectorRuleGetRequest", + "CollectorRuleChangeOrderRequest", + "CollectorRuleSearchQueryRequest", + "CollectorRuleStatQueryRequest", +] + +ResourceGroup = Literal["DOMAIN", "WORKSPACE"] + + +class CollectorRuleCreateRequest(BaseModel): + collector_id: str + name: Union[str, None] = None + conditions: Union[list, None] = None + conditions_policy: str + actions: dict + options: Union[dict, None] = None + tags: Union[dict, None] = None + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorRuleUpdateRequest(BaseModel): + collector_rule_id: str + name: Union[str, None] = None + conditions: Union[list, None] = None + conditions_policy: Union[str, None] = None + actions: Union[dict, None] = None + options: Union[dict, None] = None + tags: Union[dict, None] = None + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorRuleDeleteRequest(BaseModel): + collector_rule_id: str + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorRuleGetRequest(BaseModel): + collector_rule_id: str + workspace_id: Union[list, str, None] = None + domain_id: str + + +class CollectorRuleChangeOrderRequest(BaseModel): + collector_rule_id: str + order: int + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorRuleSearchQueryRequest(BaseModel): + query: Union[dict, None] = None + collector_rule_id: Union[str, None] = None + name: Union[str, None] = None + rule_type: Union[str, None] = None + collector_id: Union[str, None] = None + workspace_id: Union[list, str, None] = None + domain_id: str + + +class CollectorRuleStatQueryRequest(BaseModel): + query: dict + workspace_id: Union[list, str, None] = None + domain_id: str diff --git a/src/spaceone/inventory_v2/model/collector_rule/response.py b/src/spaceone/inventory_v2/model/collector_rule/response.py new file mode 100644 index 0000000..14b5501 --- /dev/null +++ b/src/spaceone/inventory_v2/model/collector_rule/response.py @@ -0,0 +1,36 @@ +from datetime import datetime +from typing import Union, List +from pydantic import BaseModel + +from spaceone.core import utils + +__all__ = ["CollectorRuleResponse", "CollectorRulesResponse"] + + +class CollectorRuleResponse(BaseModel): + collector_rule_id: Union[str, None] = None + name: Union[str, None] = None + rule_type: Union[str, None] = None + order: Union[int, None] = None + conditions: Union[list, None] = None + conditions_policy: Union[str, None] = None + actions: Union[dict, None] = None + options: Union[dict, None] = None + tags: Union[dict, None] = None + collector_id: Union[str, None] = None + resource_group: Union[str, None] = None + workspace_id: Union[str, None] = None + domain_id: Union[str, None] = None + created_at: Union[datetime, None] = None + updated_at: Union[datetime, None] = None + + def dict(self, *args, **kwargs): + data = super().dict(*args, **kwargs) + data["created_at"] = utils.datetime_to_iso8601(data["created_at"]) + data["updated_at"] = utils.datetime_to_iso8601(data["updated_at"]) + return data + + +class CollectorRulesResponse(BaseModel): + results: List[CollectorRuleResponse] + total_count: int diff --git a/src/spaceone/inventory_v2/model/namespace/__init__.py b/src/spaceone/inventory_v2/model/namespace/__init__.py index d939915..423a4c0 100644 --- a/src/spaceone/inventory_v2/model/namespace/__init__.py +++ b/src/spaceone/inventory_v2/model/namespace/__init__.py @@ -1 +1 @@ -from spaceone.inventory_v2.model.namespace import * \ No newline at end of file +from spaceone.inventory_v2.model.namespace import * diff --git a/src/spaceone/inventory_v2/model/namespace_group/__init__.py b/src/spaceone/inventory_v2/model/namespace_group/__init__.py index 2006a3d..e2ea8de 100644 --- a/src/spaceone/inventory_v2/model/namespace_group/__init__.py +++ b/src/spaceone/inventory_v2/model/namespace_group/__init__.py @@ -1 +1 @@ -from spaceone.inventory_v2.model.namespace_group import * \ No newline at end of file +from spaceone.inventory_v2.model.namespace_group import * diff --git a/src/spaceone/inventory_v2/service/__init__.py b/src/spaceone/inventory_v2/service/__init__.py index f815016..e996d99 100644 --- a/src/spaceone/inventory_v2/service/__init__.py +++ b/src/spaceone/inventory_v2/service/__init__.py @@ -1,7 +1,6 @@ from spaceone.inventory_v2.service.asset_service import AssetService from spaceone.inventory_v2.service.asset_type_service import AssetTypeService +from spaceone.inventory_v2.service.collector_rule_service import CollectorRuleService from spaceone.inventory_v2.service.region_service import RegionService from spaceone.inventory_v2.service.namespace_group_service import NamespaceGroupService -from spaceone.inventory_v2.service.namespace_service import NamespaceService - - +from spaceone.inventory_v2.service.namespace_service import NamespaceService diff --git a/src/spaceone/inventory_v2/service/collector_rule_service.py b/src/spaceone/inventory_v2/service/collector_rule_service.py new file mode 100644 index 0000000..98981c4 --- /dev/null +++ b/src/spaceone/inventory_v2/service/collector_rule_service.py @@ -0,0 +1,486 @@ +import logging +import fnmatch +from typing import Union + +from spaceone.core.error import * +from spaceone.core.service import * + +from spaceone.inventory_v2.error.collector_rule import * +from spaceone.inventory_v2.manager.collector_rule_manager import CollectorRuleManager +from spaceone.inventory_v2.manager.collector_manager import CollectorManager +from spaceone.inventory_v2.manager.identity_manager import IdentityManager +from spaceone.inventory_v2.model.collector_rule.request import * +from spaceone.inventory_v2.model.collector_rule.response import * +from spaceone.inventory_v2.model.collector_rule.database import CollectorRule + +__LOGGER = logging.getLogger(__name__) + +_SUPPORTED_CONDITION_KEYS = [ + "provider", + "cloud_service_group", + "cloud_service_type", + "region_code", + "account", + "reference.resource_id", + "data.", + "tags.", +] +_SUPPORTED_CONDITION_OPERATORS = ["eq", "contain", "not", "not_contain"] + + +@authentication_handler +@authorization_handler +@mutation_handler +@event_handler +class CollectorRuleService(BaseService): + resource = "CollectorRule" + + def __init__(self, metadata): + super().__init__(metadata) + self.collector_rule_mgr = CollectorRuleManager() + + @transaction( + permission="inventory-v2:CollectorRule.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def create( + self, params: CollectorRuleCreateRequest + ) -> Union[CollectorRuleResponse, dict]: + """Create Collector rule + + Args: + params (dict): { + 'collector_id': 'str', # required + 'name': 'str', + 'conditions': 'list', + 'conditions_policy': 'str', # required + 'actions': 'dict', # required + 'options': 'dict', + 'tags': 'dict', + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + CollectorRuleResponse: + """ + + collector_mgr = CollectorManager() + + domain_id = params.domain_id + workspace_id = params.workspace_id + collector_id = params.collector_id + conditions = params.conditions or [] + conditions_policy = params.conditions_policy + actions = params.actions + + collector_vo = collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + + params_dict = params.dict() + params_dict["collector"] = collector_vo + params_dict["rule_type"] = "CUSTOM" + params_dict["resource_group"] = collector_vo.resource_group + + # Check permission by resource group + if params_dict["resource_group"] == "WORKSPACE": + if workspace_id is None: + raise ERROR_REQUIRED_PARAMETER(key="workspace_id") + + identity_mgr = IdentityManager() + identity_mgr.check_workspace(workspace_id, domain_id) + else: + params_dict["workspace_id"] = "*" + + if conditions_policy == "ALWAYS": + params_dict["conditions"] = [] + else: + if len(conditions) == 0: + raise ERROR_REQUIRED_PARAMETER(key="conditions") + else: + self._check_conditions(conditions) + + self._check_actions(actions, domain_id) + + params_dict["order"] = ( + self._get_highest_order(collector_id, params_dict["rule_type"], domain_id) + + 1 + ) + + collector_rule_vo = self.collector_rule_mgr.create_collector_rule(params_dict) + + return CollectorRuleResponse(**collector_rule_vo.to_dict()) + + @transaction( + permission="inventory-v2:CollectorRule.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def update( + self, params: CollectorRuleUpdateRequest + ) -> Union[CollectorRuleResponse, dict]: + """Update collector rule + Args: + params (dict): { + 'collector_rule_id': 'str', # required + 'name': 'str', + 'conditions': 'list', + 'conditions_policy': 'str', + 'actions': 'dict', + 'options': 'dict' + 'tags': 'dict' + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + CollectorRuleResponse: + """ + + collector_rule_id = params.collector_rule_id + domain_id = params.domain_id + workspace_id = params.workspace_id + + conditions_policy = params.conditions_policy + conditions = params.conditions or [] + + actions = params.actions + + collector_rule_vo = self.collector_rule_mgr.get_collector_rule( + collector_rule_id, domain_id, workspace_id + ) + + params_dict = params.dict(exclude_unset=True) + print(params_dict) + + if collector_rule_vo.rule_type == "MANAGED": + raise ERROR_NOT_ALLOWED_TO_UPDATE_RULE() + + if conditions_policy: + if conditions_policy == "ALWAYS": + params_dict["conditions"] = [] + else: + if len(conditions) == 0: + raise ERROR_REQUIRED_PARAMETER(key="conditions") + else: + self._check_conditions(conditions) + + if actions: + self._check_actions(actions, domain_id) + + collector_rule_vo = self.collector_rule_mgr.update_collector_rule_by_vo( + params_dict, collector_rule_vo + ) + + return CollectorRuleResponse(**collector_rule_vo.to_dict()) + + @transaction( + permission="inventory-v2:CollectorRule.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def change_order( + self, params: CollectorRuleChangeOrderRequest + ) -> Union[CollectorRuleResponse, dict]: + """Change collector rule's order + + Args: + params (dict): { + 'collector_rule_id': 'str', # required + 'order': 'int', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + CollectorRuleResponse: + """ + + collector_rule_id = params.collector_rule_id + domain_id = params.domain_id + workspace_id = params.workspace_id + order = params.order + + target_rule_vo: CollectorRule = self.collector_rule_mgr.get_collector_rule( + collector_rule_id, domain_id, workspace_id + ) + + self._check_order(order) + + if target_rule_vo.rule_type == "MANAGED": + raise ERROR_NOT_ALLOWED_TO_CHANGE_ORDER() + + if target_rule_vo.order == order: + return CollectorRuleResponse(**target_rule_vo.to_dict()) + + highest_order = self._get_highest_order( + target_rule_vo.collector_id, + target_rule_vo.rule_type, + target_rule_vo.domain_id, + ) + + if order > highest_order: + raise ERROR_INVALID_PARAMETER( + key="order", + reason=f"There is no collector rules greater than the {str(order)} order.", + ) + + collector_rule_vos = self._get_all_collector_rules( + target_rule_vo.collector_id, + target_rule_vo.rule_type, + target_rule_vo.domain_id, + target_rule_vo.collector_rule_id, + ) + + collector_rule_vos.insert(order - 1, target_rule_vo) + + i = 0 + for collector_rule_vo in collector_rule_vos: + if target_rule_vo != collector_rule_vo: + self.collector_rule_mgr.update_collector_rule_by_vo( + {"order": i + 1}, collector_rule_vo + ) + i += 1 + + collector_rule_vo = self.collector_rule_mgr.update_collector_rule_by_vo( + {"order": order}, target_rule_vo + ) + + return CollectorRuleResponse(**collector_rule_vo.to_dict()) + + @transaction( + permission="inventory-v2:CollectorRule.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def delete(self, params: CollectorRuleDeleteRequest) -> None: + """Delete collector rule + + Args: + params (dict): { + 'collector_rule_id': 'str', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + None + """ + + collector_rule_id = params.collector_rule_id + domain_id = params.domain_id + workspace_id = params.workspace_id + + collector_rule_vo = self.collector_rule_mgr.get_collector_rule( + collector_rule_id, domain_id, workspace_id + ) + + rule_type = collector_rule_vo.rule_type + + if rule_type == "MANAGED": + raise ERROR_NOT_ALLOWED_TO_DELETE_RULE() + + collector_id = collector_rule_vo.collector_id + self.collector_rule_mgr.delete_collector_rule_by_vo(collector_rule_vo) + + collector_rule_vos = self._get_all_collector_rules( + collector_id, rule_type, domain_id + ) + + i = 0 + for collector_rule_vo in collector_rule_vos: + self.collector_rule_mgr.update_collector_rule_by_vo( + {"order": i + 1}, collector_rule_vo + ) + i += 1 + + @transaction( + permission="inventory-v2:CollectorRule.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @change_value_by_rule("APPEND", "workspace_id", "*") + @convert_model + def get( + self, params: CollectorRuleGetRequest + ) -> Union[CollectorRuleResponse, dict]: + """Get collector rule + + Args: + params (dict): { + 'collector_rule_id': 'str', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + CollectorRuleResponse: + """ + + collector_rule_vo = self.collector_rule_mgr.get_collector_rule( + params.collector_rule_id, params.domain_id, params.workspace_id + ) + + return CollectorRuleResponse(**collector_rule_vo.to_dict()) + + @transaction( + permission="inventory:CollectorRule.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @change_value_by_rule("APPEND", "workspace_id", "*") + @append_query_filter( + [ + "collector_rule_id", + "name", + "rule_type", + "collector_id", + "workspace_id", + "domain_id", + ] + ) + @append_keyword_filter(["collector_rule_id", "name"]) + @convert_model + def list( + self, params: CollectorRuleSearchQueryRequest + ) -> Union[CollectorRulesResponse, dict]: + """List collector rule + + Args: + params (dict): { + 'query': 'dict (spaceone.api.core.v1.Query)', + 'collector_rule_id': 'str', + 'name': 'str', + 'rule_type': 'str', + 'collector_id': 'str', + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + results (list) + total_count (int) + """ + + query = params.query or {} + collector_rule_vos, total_count = self.collector_rule_mgr.list_collector_rules( + query=query + ) + + collector_rule_infos = [ + collector_rule_vo.to_dict() for collector_rule_vo in collector_rule_vos + ] + + return CollectorRulesResponse( + results=collector_rule_infos, total_count=total_count + ) + + @transaction( + permission="inventory:CollectorRule.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @append_query_filter(["workspace_id", "domain_id"]) + @append_keyword_filter(["collector_rule_id", "name"]) + @change_value_by_rule("APPEND", "workspace_id", "*") + @convert_model + def stat(self, params: CollectorRuleStatQueryRequest) -> dict: + """ + Args: + params (dict): { + 'query': 'dict (spaceone.api.core.v1.StatisticsQuery)', # required + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + values (list) : 'list of statistics data' + + """ + + query = params.query or {} + return self.collector_rule_mgr.stat_collector_rules(query) + + def _check_actions(self, actions: dict, domain_id: str) -> None: + if "change_project" in actions: + project_id = actions["change_project"] + + identity_mgr: IdentityManager = self.locator.get_manager("IdentityManager") + identity_mgr.get_project(project_id, domain_id) + + if "match_project" in actions: + if "source" not in actions["match_project"]: + raise ERROR_REQUIRED_PARAMETER(key="actions.match_project.source") + + if "match_service_account" in actions: + if "source" not in actions["match_service_account"]: + raise ERROR_REQUIRED_PARAMETER( + key="actions.match_service_account.source" + ) + + def _get_highest_order(self, collector_id: str, rule_type: str, domain_id: str): + collector_rule_vos = self.collector_rule_mgr.filter_collector_rules( + collector_id=collector_id, rule_type=rule_type, domain_id=domain_id + ) + + return collector_rule_vos.count() + + def _get_all_collector_rules( + self, + collector_id: str, + rule_type: str, + domain_id: str, + exclude_collector_rule_id: str = None, + ): + query = { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "collector_id", "v": collector_id, "o": "eq"}, + {"k": "rule_type", "v": rule_type, "o": "eq"}, + ], + "sort": [{"key": "order"}], + } + + if exclude_collector_rule_id is not None: + query["filter"].append( + {"k": "collector_rule_id", "v": exclude_collector_rule_id, "o": "not"} + ) + + collector_rule_vos, total_count = self.collector_rule_mgr.list_collector_rules( + query + ) + return list(collector_rule_vos) + + @staticmethod + def _check_conditions(conditions: list) -> None: + for condition in conditions: + key = condition.get("key") + value = condition.get("value") + operator = condition.get("operator") + + if not (key and value and operator): + raise ERROR_INVALID_PARAMETER( + key="conditions", + reason="Condition should have key, value and operator.", + ) + + if key not in _SUPPORTED_CONDITION_KEYS: + if not ( + fnmatch.fnmatch(key, "tags.*") or fnmatch.fnmatch(key, "data.*") + ): + raise ERROR_INVALID_PARAMETER( + key="conditions.key", + reason=f"Unsupported key. " + f'({" | ".join(_SUPPORTED_CONDITION_KEYS)})', + ) + if operator not in _SUPPORTED_CONDITION_OPERATORS: + raise ERROR_INVALID_PARAMETER( + key="conditions.operator", + reason=f"Unsupported operator. " + f'({" | ".join(_SUPPORTED_CONDITION_OPERATORS)})', + ) + + @staticmethod + def _check_order(order: int) -> None: + if order <= 0: + raise ERROR_INVALID_PARAMETER( + key="order", reason="The order must be greater than 0." + )