diff --git a/examples/oauth_token.py b/examples/oauth_token.py index 725fb59..fbaa115 100644 --- a/examples/oauth_token.py +++ b/examples/oauth_token.py @@ -30,7 +30,7 @@ from pytfe import TFEClient, TFEConfig from pytfe.errors import NotFound -from pytfe.models import OAuthTokenListOptions, OAuthTokenUpdateOptions +from pytfe.models import OAuthTokenUpdateOptions def main(): @@ -55,34 +55,18 @@ def main(): # ===================================================== print("\n1. Testing list() function:") try: - # Test basic list without options - token_list = client.oauth_tokens.list(organization_name) - print(f" ✓ Found {len(token_list.items)} OAuth tokens") - - # Show token details - for i, token in enumerate(token_list.items[:3], 1): # Show first 3 - print(f" {i}. Token ID: {token.id}") - print(f" UID: {token.uid}") - print(f" Service Provider User: {token.service_provider_user}") - print(f" Has SSH Key: {token.has_ssh_key}") - print(f" Created: {token.created_at}") + for token in client.oauth_tokens.list(organization_name): + print(f" Token ID: {token.id}") + print(f" Service Provider User: {token.service_provider_user}") + print(f" Has SSH Key: {token.has_ssh_key}") + print(f" Created: {token.created_at}") if token.oauth_client: print(f" OAuth Client: {token.oauth_client.id}") - # Store first token for subsequent tests - if token_list.items: - test_token_id = token_list.items[0].id - print(f"\n Using token {test_token_id} for subsequent tests") - - # Test list with options - print("\n Testing list() with pagination options:") - options = OAuthTokenListOptions(page_size=10, page_number=1) - token_list_with_options = client.oauth_tokens.list(organization_name, options) - print(f" ✓ Found {len(token_list_with_options.items)} tokens with options") - if token_list_with_options.current_page: - print(f" Current page: {token_list_with_options.current_page}") - if token_list_with_options.total_count: - print(f" Total count: {token_list_with_options.total_count}") + # Store first token for subsequent tests + if token and not test_token_id: + test_token_id = token.id + print(f"\n Using token {test_token_id} for subsequent tests \n") except NotFound: print( @@ -99,7 +83,6 @@ def main(): try: token = client.oauth_tokens.read(test_token_id) print(f" ✓ Read OAuth token: {token.id}") - print(f" UID: {token.uid}") print(f" Service Provider User: {token.service_provider_user}") print(f" Has SSH Key: {token.has_ssh_key}") print(f" Created: {token.created_at}") diff --git a/examples/policy_evaluation.py b/examples/policy_evaluation.py index 9a0bd05..20640ac 100644 --- a/examples/policy_evaluation.py +++ b/examples/policy_evaluation.py @@ -26,7 +26,6 @@ def main(): required=True, help="Task stage ID to list policy evaluations for", ) - parser.add_argument("--page", type=int, default=1) parser.add_argument("--page-size", type=int, default=20) args = parser.parse_args() @@ -41,62 +40,55 @@ def main(): _print_header(f"Listing policy evaluations for task stage: {args.task_stage_id}") options = PolicyEvaluationListOptions( - page_number=args.page, page_size=args.page_size, ) try: - pe_list = client.policy_evaluations.list(args.task_stage_id, options) - - print(f"Total policy evaluations: {pe_list.total_count}") - print(f"Page {pe_list.current_page} of {pe_list.total_pages}") - print() - - if not pe_list.items: + pe_count = 0 + for pe in client.policy_evaluations.list(args.task_stage_id, options): + pe_count += 1 + print(f"- ID: {pe.id}") + print(f" Status: {pe.status}") + print(f" Policy Kind: {pe.policy_kind}") + + if pe.result_count: + print(" Result Count:") + if pe.result_count.passed is not None: + print(f" - Passed: {pe.result_count.passed}") + if pe.result_count.advisory_failed is not None: + print(f" - Advisory Failed: {pe.result_count.advisory_failed}") + if pe.result_count.mandatory_failed is not None: + print(f" - Mandatory Failed: {pe.result_count.mandatory_failed}") + if pe.result_count.errored is not None: + print(f" - Errored: {pe.result_count.errored}") + + if pe.status_timestamp: + print(" Status Timestamps:") + if pe.status_timestamp.passed_at: + print(f" - Passed At: {pe.status_timestamp.passed_at}") + if pe.status_timestamp.failed_at: + print(f" - Failed At: {pe.status_timestamp.failed_at}") + if pe.status_timestamp.running_at: + print(f" - Running At: {pe.status_timestamp.running_at}") + if pe.status_timestamp.canceled_at: + print(f" - Canceled At: {pe.status_timestamp.canceled_at}") + if pe.status_timestamp.errored_at: + print(f" - Errored At: {pe.status_timestamp.errored_at}") + + if pe.policy_attachable: + print(f" Task Stage: {pe.task_stage.id} ({pe.task_stage.type})") + + if pe.created_at: + print(f" Created At: {pe.created_at}") + if pe.updated_at: + print(f" Updated At: {pe.updated_at}") + + print() + + if pe_count == 0: print("No policy evaluations found for this task stage.") else: - for pe in pe_list.items: - print(f"- ID: {pe.id}") - print(f" Status: {pe.status}") - print(f" Policy Kind: {pe.policy_kind}") - - if pe.result_count: - print(" Result Count:") - if pe.result_count.passed is not None: - print(f" - Passed: {pe.result_count.passed}") - if pe.result_count.advisory_failed is not None: - print( - f" - Advisory Failed: {pe.result_count.advisory_failed}" - ) - if pe.result_count.mandatory_failed is not None: - print( - f" - Mandatory Failed: {pe.result_count.mandatory_failed}" - ) - if pe.result_count.errored is not None: - print(f" - Errored: {pe.result_count.errored}") - - if pe.status_timestamp: - print(" Status Timestamps:") - if pe.status_timestamp.passed_at: - print(f" - Passed At: {pe.status_timestamp.passed_at}") - if pe.status_timestamp.failed_at: - print(f" - Failed At: {pe.status_timestamp.failed_at}") - if pe.status_timestamp.running_at: - print(f" - Running At: {pe.status_timestamp.running_at}") - if pe.status_timestamp.canceled_at: - print(f" - Canceled At: {pe.status_timestamp.canceled_at}") - if pe.status_timestamp.errored_at: - print(f" - Errored At: {pe.status_timestamp.errored_at}") - - if pe.task_stage: - print(f" Task Stage: {pe.task_stage.id} ({pe.task_stage.type})") - - if pe.created_at: - print(f" Created At: {pe.created_at}") - if pe.updated_at: - print(f" Updated At: {pe.updated_at}") - - print() + print(f"\nTotal: {pe_count} policy evaluations") except Exception as e: print(f"Error listing policy evaluations: {e}") diff --git a/examples/reserved_tag_key.py b/examples/reserved_tag_key.py index 8eeb5e7..59f5f3c 100644 --- a/examples/reserved_tag_key.py +++ b/examples/reserved_tag_key.py @@ -53,9 +53,7 @@ def main(): try: # 1. List existing reserved tag keys print("\n1. Listing reserved tag keys...") - reserved_tag_keys = client.reserved_tag_key.list(TFE_ORG) - print(f"✅ Found {len(reserved_tag_keys.items)} reserved tag keys:") - for rtk in reserved_tag_keys.items: + for rtk in client.reserved_tag_key.list(TFE_ORG): print( f" - ID: {rtk.id}, Key: {rtk.key}, Disable Overrides: {rtk.disable_overrides}" ) @@ -87,18 +85,16 @@ def main(): # 5. Verify deletion by listing again print("\n5. Verifying deletion...") - reserved_tag_keys_after = client.reserved_tag_key.list(TFE_ORG) - print( - f"✅ Reserved tag keys after deletion: {len(reserved_tag_keys_after.items)}" - ) + reserved_tag_keys_after = list(client.reserved_tag_key.list(TFE_ORG)) + print(f"Reserved tag keys after deletion: {len(reserved_tag_keys_after)}") # 6. Demonstrate pagination with options print("\n6. Demonstrating pagination options...") - list_options = ReservedTagKeyListOptions(page_size=5, page_number=1) - paginated_rtks = client.reserved_tag_key.list(TFE_ORG, list_options) - print(f"✅ Page 1 with page size 5: {len(paginated_rtks.items)} keys") - print(f" Total pages: {paginated_rtks.total_pages}") - print(f" Total count: {paginated_rtks.total_count}") + list_options = ReservedTagKeyListOptions(page_size=5) + for rtk in client.reserved_tag_key.list(TFE_ORG, list_options): + print( + f" - ID: {rtk.id}, Key: {rtk.key}, Disable Overrides: {rtk.disable_overrides}" + ) print("\n🎉 Reserved Tag Keys API example completed successfully!") diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 3894886..e007fd4 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -15,13 +15,13 @@ from .resources.policy_check import PolicyChecks from .resources.policy_evaluation import PolicyEvaluations from .resources.policy_set import PolicySets -from .resources.policy_set_outcome import PolicySets as PolicySetOutcomes +from .resources.policy_set_outcome import PolicySetOutcomes from .resources.policy_set_version import PolicySetVersions from .resources.projects import Projects from .resources.query_run import QueryRuns from .resources.registry_module import RegistryModules from .resources.registry_provider import RegistryProviders -from .resources.reserved_tag_key import ReservedTagKey +from .resources.reserved_tag_key import ReservedTagKeys from .resources.run import Runs from .resources.run_event import RunEvents from .resources.run_task import RunTasks @@ -91,7 +91,7 @@ def __init__(self, config: TFEConfig | None = None): self.ssh_keys = SSHKeys(self._transport) # Reserved Tag Key - self.reserved_tag_key = ReservedTagKey(self._transport) + self.reserved_tag_key = ReservedTagKeys(self._transport) def close(self) -> None: try: diff --git a/src/pytfe/errors.py b/src/pytfe/errors.py index 3eac2be..fa52737 100644 --- a/src/pytfe/errors.py +++ b/src/pytfe/errors.py @@ -460,3 +460,11 @@ class InvalidPolicyEvaluationIDError(InvalidValues): def __init__(self, message: str = "invalid value for policy evaluation ID"): super().__init__(message) + + +# Policy Set Outcome errors +class InvalidPolicySetOutcomeIDError(InvalidValues): + """Raised when an invalid policy set outcome ID is provided.""" + + def __init__(self, message: str = "invalid value for policy set outcome ID"): + super().__init__(message) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index f3eb33b..55ed3a1 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -71,7 +71,6 @@ ) from .oauth_token import ( OAuthToken, - OAuthTokenList, OAuthTokenListOptions, OAuthTokenUpdateOptions, ) @@ -110,7 +109,6 @@ from .policy_evaluation import ( PolicyAttachable, PolicyEvaluation, - PolicyEvaluationList, PolicyEvaluationListOptions, PolicyEvaluationStatus, PolicyEvaluationStatusTimestamps, @@ -208,7 +206,6 @@ from .reserved_tag_key import ( ReservedTagKey, ReservedTagKeyCreateOptions, - ReservedTagKeyList, ReservedTagKeyListOptions, ReservedTagKeyUpdateOptions, ) @@ -349,7 +346,6 @@ "ServiceProviderType", # OAuth token "OAuthToken", - "OAuthTokenList", "OAuthTokenListOptions", "OAuthTokenUpdateOptions", # SSH keys @@ -361,7 +357,6 @@ # Reserved tag keys "ReservedTagKey", "ReservedTagKeyCreateOptions", - "ReservedTagKeyList", "ReservedTagKeyListOptions", "ReservedTagKeyUpdateOptions", # Agent & pools @@ -559,7 +554,6 @@ # Policy Evaluation "PolicyAttachable", "PolicyEvaluation", - "PolicyEvaluationList", "PolicyEvaluationListOptions", "PolicyEvaluationStatus", "PolicyEvaluationStatusTimestamps", diff --git a/src/pytfe/models/oauth_token.py b/src/pytfe/models/oauth_token.py index c6b004b..a70c20e 100644 --- a/src/pytfe/models/oauth_token.py +++ b/src/pytfe/models/oauth_token.py @@ -15,7 +15,6 @@ class OAuthToken(BaseModel): model_config = ConfigDict(extra="forbid") id: str = Field(..., description="OAuth token ID") - uid: str = Field(..., description="OAuth token UID") created_at: datetime = Field(..., description="Creation timestamp") has_ssh_key: bool = Field(..., description="Whether the token has an SSH key") service_provider_user: str = Field(..., description="Service provider user") @@ -26,26 +25,12 @@ class OAuthToken(BaseModel): ) -class OAuthTokenList(BaseModel): - """List of OAuth tokens with pagination information.""" - - model_config = ConfigDict(extra="forbid") - - items: list[OAuthToken] = Field(default_factory=list, description="OAuth tokens") - current_page: int | None = Field(None, description="Current page number") - prev_page: int | None = Field(None, description="Previous page number") - next_page: int | None = Field(None, description="Next page number") - total_pages: int | None = Field(None, description="Total number of pages") - total_count: int | None = Field(None, description="Total count of items") - - class OAuthTokenListOptions(BaseModel): """Options for listing OAuth tokens.""" - model_config = ConfigDict(extra="forbid") + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) - page_number: int | None = Field(None, description="Page number") - page_size: int | None = Field(None, description="Page size") + page_size: int | None = Field(None, alias="page[size]", description="Page size") class OAuthTokenUpdateOptions(BaseModel): @@ -63,7 +48,6 @@ class OAuthTokenUpdateOptions(BaseModel): from .oauth_client import OAuthClient # noqa: F401 OAuthToken.model_rebuild() - OAuthTokenList.model_rebuild() except ImportError: # If OAuthClient is not available, create a dummy class pass diff --git a/src/pytfe/models/policy_evaluation.py b/src/pytfe/models/policy_evaluation.py index 86175e9..49ad257 100644 --- a/src/pytfe/models/policy_evaluation.py +++ b/src/pytfe/models/policy_evaluation.py @@ -37,7 +37,7 @@ class PolicyEvaluation(BaseModel): updated_at: datetime | None = Field(None, alias="updated-at") # The task stage the policy evaluation belongs to - task_stage: PolicyAttachable | None = Field(None, alias="policy-attachable") + policy_attachable: PolicyAttachable | None = Field(None, alias="policy-attachable") class PolicyEvaluationStatusTimestamps(BaseModel): @@ -72,23 +72,9 @@ class PolicyResultCount(BaseModel): errored: int | None = Field(None, alias="errored") -class PolicyEvaluationList(BaseModel): - """PolicyEvaluationList represents a list of policy evaluations""" - - model_config = ConfigDict(populate_by_name=True, validate_by_name=True) - - items: list[PolicyEvaluation] | None = Field(default_factory=list) - current_page: int | None = None - next_page: str | None = None - prev_page: str | None = None - total_count: int | None = None - total_pages: int | None = None - - class PolicyEvaluationListOptions(BaseModel): """PolicyEvaluationListOptions represents the options for listing policy evaluations""" model_config = ConfigDict(populate_by_name=True, validate_by_name=True) - page_number: int | None = Field(None, alias="page[number]") page_size: int | None = Field(None, alias="page[size]") diff --git a/src/pytfe/models/policy_set_outcome.py b/src/pytfe/models/policy_set_outcome.py index 4059546..ffb9723 100644 --- a/src/pytfe/models/policy_set_outcome.py +++ b/src/pytfe/models/policy_set_outcome.py @@ -34,19 +34,6 @@ class Outcome(BaseModel): description: str | None = Field(None, alias="description") -class PolicySetOutcomeList(BaseModel): - """PolicySetOutcomeList represents a list of policy set outcomes""" - - model_config = ConfigDict(populate_by_name=True, validate_by_name=True) - - items: list[PolicySetOutcome] | None = Field(default_factory=list) - current_page: int | None = None - next_page: str | None = None - prev_page: str | None = None - total_count: int | None = None - total_pages: int | None = None - - class PolicySetOutcomeListFilter(BaseModel): """PolicySetOutcomeListFilter represents the filters that are supported while listing a policy set outcome""" @@ -62,5 +49,4 @@ class PolicySetOutcomeListOptions(BaseModel): model_config = ConfigDict(populate_by_name=True, validate_by_name=True) filter: dict[str, PolicySetOutcomeListFilter] | None = None - page_number: int | None = Field(None, alias="page[number]") page_size: int | None = Field(None, alias="page[size]") diff --git a/src/pytfe/models/reserved_tag_key.py b/src/pytfe/models/reserved_tag_key.py index eb125ea..c332742 100644 --- a/src/pytfe/models/reserved_tag_key.py +++ b/src/pytfe/models/reserved_tag_key.py @@ -65,24 +65,6 @@ class ReservedTagKeyListOptions(BaseModel): model_config = ConfigDict(populate_by_name=True) - page_number: int | None = Field( - None, alias="page[number]", description="Page number to retrieve", ge=1 - ) page_size: int | None = Field( None, alias="page[size]", description="Number of items per page", ge=1, le=100 ) - - -class ReservedTagKeyList(BaseModel): - """Represents a paginated list of reserved tag keys.""" - - model_config = ConfigDict(populate_by_name=True) - - items: list[ReservedTagKey] = Field( - default_factory=list, description="List of reserved tag keys" - ) - current_page: int | None = Field(None, description="Current page number") - total_pages: int | None = Field(None, description="Total number of pages") - prev_page: str | None = Field(None, description="URL of the previous page") - next_page: str | None = Field(None, description="URL of the next page") - total_count: int | None = Field(None, description="Total number of items") diff --git a/src/pytfe/resources/oauth_token.py b/src/pytfe/resources/oauth_token.py index fb25074..337fa02 100644 --- a/src/pytfe/resources/oauth_token.py +++ b/src/pytfe/resources/oauth_token.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Iterator from datetime import datetime from typing import Any from urllib.parse import quote @@ -7,11 +8,10 @@ from ..errors import ERR_INVALID_OAUTH_TOKEN_ID, ERR_INVALID_ORG from ..models.oauth_token import ( OAuthToken, - OAuthTokenList, OAuthTokenListOptions, OAuthTokenUpdateOptions, ) -from ..utils import encode_query, valid_string_id +from ..utils import valid_string_id from ._base import _Service @@ -20,7 +20,7 @@ class OAuthTokens(_Service): def list( self, organization: str, options: OAuthTokenListOptions | None = None - ) -> OAuthTokenList: + ) -> Iterator[OAuthToken]: """List all the OAuth tokens for a given organization.""" if not valid_string_id(organization): raise ValueError(ERR_INVALID_ORG) @@ -29,37 +29,11 @@ def list( params = {} if options: - if options.page_number: - params["page[number]"] = str(options.page_number) if options.page_size: params["page[size]"] = str(options.page_size) - query_string = encode_query(params) - full_path = f"{path}{query_string}" - - response = self.t.request("GET", full_path) - data = response.json() - - tokens = [] - if "data" in data: - for item in data["data"]: - tokens.append(self._parse_oauth_token(item)) - - # Parse pagination metadata - pagination = {} - if "meta" in data: - meta = data["meta"] - if "pagination" in meta: - page_info = meta["pagination"] - pagination = { - "current_page": page_info.get("current-page"), - "prev_page": page_info.get("prev-page"), - "next_page": page_info.get("next-page"), - "total_pages": page_info.get("total-pages"), - "total_count": page_info.get("total-count"), - } - - return OAuthTokenList(items=tokens, **pagination) + for item in self._list(path, params=params): + yield self._parse_oauth_token(item) def read(self, oauth_token_id: str) -> OAuthToken: """Read an OAuth token by its ID.""" @@ -128,7 +102,6 @@ def _parse_oauth_token(self, data: dict[str, Any]) -> OAuthToken: return OAuthToken( id=data.get("id", ""), - uid=attributes.get("uid", ""), created_at=created_at, has_ssh_key=attributes.get("has-ssh-key", False), service_provider_user=attributes.get("service-provider-user", ""), diff --git a/src/pytfe/resources/policy_evaluation.py b/src/pytfe/resources/policy_evaluation.py index bc30193..2f911f6 100644 --- a/src/pytfe/resources/policy_evaluation.py +++ b/src/pytfe/resources/policy_evaluation.py @@ -1,11 +1,12 @@ from __future__ import annotations +from collections.abc import Iterator + from ..errors import ( InvalidTaskStageIDError, ) from ..models.policy_evaluation import ( PolicyEvaluation, - PolicyEvaluationList, PolicyEvaluationListOptions, ) from ..utils import valid_string_id @@ -20,34 +21,21 @@ class PolicyEvaluations(_Service): def list( self, task_stage_id: str, options: PolicyEvaluationListOptions | None = None - ) -> PolicyEvaluationList: + ) -> Iterator[PolicyEvaluation]: """ **Note: This method is still in BETA and subject to change.** - List all policy evaluations in the task stage. Only available for OPA policies. + List all policy evaluations in the task stage. Only available for OPA policies. """ if not valid_string_id(task_stage_id): raise InvalidTaskStageIDError() params = options.model_dump(by_alias=True) if options else {} path = f"api/v2/task-stages/{task_stage_id}/policy-evaluations" - r = self.t.request("GET", path, params=params) - jd = r.json() - items = [] - meta = jd.get("meta", {}) - pagination = meta.get("pagination", {}) - for item in jd.get("data", []): + for item in self._list(path, params=params): attrs = item.get("attributes", {}) attrs["id"] = item.get("id") - attrs["task-stage"] = ( + attrs["policy-attachable"] = ( item.get("relationships", {}) .get("policy-attachable", {}) .get("data", {}) ) - items.append(PolicyEvaluation.model_validate(attrs)) - return PolicyEvaluationList( - items=items, - current_page=pagination.get("current-page"), - next_page=pagination.get("next-page"), - prev_page=pagination.get("prev-page"), - total_count=pagination.get("total-count"), - total_pages=pagination.get("total-pages"), - ) + yield PolicyEvaluation.model_validate(attrs) diff --git a/src/pytfe/resources/policy_set_outcome.py b/src/pytfe/resources/policy_set_outcome.py index 56f7f34..42389d3 100644 --- a/src/pytfe/resources/policy_set_outcome.py +++ b/src/pytfe/resources/policy_set_outcome.py @@ -1,18 +1,21 @@ from __future__ import annotations +from collections.abc import Iterator +from typing import Any + from ..errors import ( InvalidPolicyEvaluationIDError, + InvalidPolicySetOutcomeIDError, ) from ..models.policy_set_outcome import ( PolicySetOutcome, - PolicySetOutcomeList, PolicySetOutcomeListOptions, ) from ..utils import valid_string_id from ._base import _Service -class PolicySets(_Service): +class PolicySetOutcomes(_Service): """ PolicySetOutcomes describes all the policy set outcome related methods that the Terraform Enterprise API supports. TFE API docs: https://developer.hashicorp.com/terraform/cloud-docs/api-docs/policy-checks @@ -22,7 +25,7 @@ def list( self, policy_evaluation_id: str, options: PolicySetOutcomeListOptions | None = None, - ) -> PolicySetOutcomeList: + ) -> Iterator[PolicySetOutcome]: """ **Note: This method is still in BETA and subject to change.** List all policy set outcomes in the policy evaluation. Only available for OPA policies. @@ -35,28 +38,8 @@ def list( if additional_query_params: params.update(additional_query_params) path = f"api/v2/policy-evaluations/{policy_evaluation_id}/policy-set-outcomes" - r = self.t.request("GET", path, params=params) - jd = r.json() - items = [] - meta = jd.get("meta", {}) - pagination = meta.get("pagination", {}) - for item in jd.get("data", []): - attrs = item.get("attributes", {}) - attrs["id"] = item.get("id") - attrs["policy-evaluation"] = ( - item.get("relationships", {}) - .get("policy-evaluation", {}) - .get("data", {}) - ) - items.append(PolicySetOutcome.model_validate(attrs)) - return PolicySetOutcomeList( - items=items, - current_page=pagination.get("current-page"), - next_page=pagination.get("next-page"), - prev_page=pagination.get("prev-page"), - total_count=pagination.get("total-count"), - total_pages=pagination.get("total-pages"), - ) + for item in self._list(path, params=params): + yield self._policy_set_outcome_from(item) def build_query_string( self, options: PolicySetOutcomeListOptions | None @@ -77,14 +60,17 @@ def read(self, policy_set_outcome_id: str) -> PolicySetOutcome: **Note: This method is still in BETA and subject to change.** Read a single policy set outcome by ID. Only available for OPA policies.""" if not valid_string_id(policy_set_outcome_id): - raise InvalidPolicyEvaluationIDError() + raise InvalidPolicySetOutcomeIDError() path = f"api/v2/policy-set-outcomes/{policy_set_outcome_id}" r = self.t.request("GET", path) - jd = r.json() - item = jd.get("data", {}) - attrs = item.get("attributes", {}) - attrs["id"] = item.get("id") + data = r.json().get("data", {}) + return PolicySetOutcome.model_validate(data) + + def _policy_set_outcome_from(self, d: dict[str, Any]) -> PolicySetOutcome: + """Convert API response dict to PolicySetParameter model.""" + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") attrs["policy-evaluation"] = ( - item.get("relationships", {}).get("policy-evaluation", {}).get("data", {}) + d.get("relationships", {}).get("policy-evaluation", {}).get("data", {}) ) return PolicySetOutcome.model_validate(attrs) diff --git a/src/pytfe/resources/reserved_tag_key.py b/src/pytfe/resources/reserved_tag_key.py index aeff161..8eed7fa 100644 --- a/src/pytfe/resources/reserved_tag_key.py +++ b/src/pytfe/resources/reserved_tag_key.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Iterator from typing import Any from ..errors import ( @@ -7,11 +8,8 @@ ValidationError, ) from ..models.reserved_tag_key import ( - ReservedTagKey as ReservedTagKeyModel, -) -from ..models.reserved_tag_key import ( + ReservedTagKey, ReservedTagKeyCreateOptions, - ReservedTagKeyList, ReservedTagKeyListOptions, ReservedTagKeyUpdateOptions, ) @@ -19,12 +17,12 @@ from ._base import _Service -class ReservedTagKey(_Service): +class ReservedTagKeys(_Service): """Reserved Tag Key API for Terraform Enterprise.""" def list( self, organization: str, options: ReservedTagKeyListOptions | None = None - ) -> ReservedTagKeyList: + ) -> Iterator[ReservedTagKey]: """List reserved tag keys for the given organization.""" if not valid_string_id(organization): raise InvalidOrgError() @@ -32,33 +30,13 @@ def list( params = ( options.model_dump(by_alias=True, exclude_none=True) if options else None ) - - r = self.t.request( - "GET", - f"/api/v2/organizations/{organization}/reserved-tag-keys", - params=params, - ) - - jd = r.json() - items = [] - meta = jd.get("meta", {}) - pagination = meta.get("pagination", {}) - - for d in jd.get("data", []): - items.append(self._parse_reserved_tag_key(d)) - - return ReservedTagKeyList( - items=items, - current_page=pagination.get("current-page"), - total_pages=pagination.get("total-pages"), - prev_page=pagination.get("prev-page"), - next_page=pagination.get("next-page"), - total_count=pagination.get("total-count"), - ) + path = f"/api/v2/organizations/{organization}/reserved-tag-keys" + for item in self._list(path, params=params): + yield self._parse_reserved_tag_key(item) def create( self, organization: str, options: ReservedTagKeyCreateOptions - ) -> ReservedTagKeyModel: + ) -> ReservedTagKey: """Create a new reserved tag key for the given organization.""" if not valid_string_id(organization): raise InvalidOrgError() @@ -82,20 +60,9 @@ def create( return self._parse_reserved_tag_key(data) - def read(self, reserved_tag_key_id: str) -> ReservedTagKeyModel: - """Read a reserved tag key by its ID.""" - if not valid_string_id(reserved_tag_key_id): - raise ValidationError("Invalid reserved tag key ID") - - # Note: Based on the API docs, there's no explicit GET endpoint for individual reserved tag keys - # This method would need to be implemented if such an endpoint exists - raise NotImplementedError( - "Individual reserved tag key read is not supported by the API" - ) - def update( self, reserved_tag_key_id: str, options: ReservedTagKeyUpdateOptions - ) -> ReservedTagKeyModel: + ) -> ReservedTagKey: """Update a reserved tag key.""" if not valid_string_id(reserved_tag_key_id): raise ValidationError("Invalid reserved tag key ID") @@ -125,10 +92,10 @@ def delete(self, reserved_tag_key_id: str) -> None: raise ValidationError("Invalid reserved tag key ID") self.t.request("DELETE", f"/api/v2/reserved-tag-keys/{reserved_tag_key_id}") - # DELETE returns 204 No Content on success + return None - def _parse_reserved_tag_key(self, data: dict[str, Any]) -> ReservedTagKeyModel: + def _parse_reserved_tag_key(self, data: dict[str, Any]) -> ReservedTagKey: """Parse reserved tag key data from API response.""" attrs = data.get("attributes", {}) attrs["id"] = data.get("id") - return ReservedTagKeyModel.model_validate(attrs) + return ReservedTagKey.model_validate(attrs) diff --git a/tests/units/test_oauth_token.py b/tests/units/test_oauth_token.py index 1af0708..b60ee9b 100644 --- a/tests/units/test_oauth_token.py +++ b/tests/units/test_oauth_token.py @@ -5,7 +5,7 @@ """ from datetime import datetime -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest @@ -35,7 +35,6 @@ def test_parse_oauth_token_minimal(self, oauth_tokens_service): data = { "id": "ot-test123", "attributes": { - "uid": "uid-test123", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": False, "service-provider-user": "testuser", @@ -46,7 +45,6 @@ def test_parse_oauth_token_minimal(self, oauth_tokens_service): result = oauth_tokens_service._parse_oauth_token(data) assert result.id == "ot-test123" - assert result.uid == "uid-test123" assert isinstance(result.created_at, datetime) assert result.has_ssh_key is False assert result.service_provider_user == "testuser" @@ -57,7 +55,6 @@ def test_parse_oauth_token_with_oauth_client(self, oauth_tokens_service): data = { "id": "ot-test123", "attributes": { - "uid": "uid-test123", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": True, "service-provider-user": "testuser", @@ -84,7 +81,6 @@ def test_parse_oauth_token_empty_relationships(self, oauth_tokens_service): data = { "id": "ot-test123", "attributes": { - "uid": "uid-test123", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": False, "service-provider-user": "testuser", @@ -119,7 +115,6 @@ def test_list_oauth_tokens_basic(self, oauth_tokens_service, mock_transport): { "id": "ot-test1", "attributes": { - "uid": "uid-test1", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": False, "service-provider-user": "testuser1", @@ -129,7 +124,6 @@ def test_list_oauth_tokens_basic(self, oauth_tokens_service, mock_transport): { "id": "ot-test2", "attributes": { - "uid": "uid-test2", "created-at": "2023-01-02T00:00:00Z", "has-ssh-key": True, "service-provider-user": "testuser2", @@ -149,38 +143,33 @@ def test_list_oauth_tokens_basic(self, oauth_tokens_service, mock_transport): } mock_transport.request.return_value = mock_response - result = oauth_tokens_service.list("test-org") + result = list(oauth_tokens_service.list("test-org")) - mock_transport.request.assert_called_once_with( - "GET", "/api/v2/organizations/test-org/oauth-tokens" - ) - assert len(result.items) == 2 - assert result.items[0].id == "ot-test1" - assert result.items[1].id == "ot-test2" - assert result.current_page == 1 - assert result.total_count == 2 + assert mock_transport.request.call_count == 1 + assert len(result) == 2 + assert result[0].id == "ot-test1" + assert result[1].id == "ot-test2" def test_list_oauth_tokens_with_options(self, oauth_tokens_service, mock_transport): """Test listing OAuth tokens with pagination options.""" - mock_response = Mock() - mock_response.json.return_value = { - "data": [], - "meta": {"pagination": {"current-page": 2}}, - } - mock_transport.request.return_value = mock_response + options = OAuthTokenListOptions(page_size=50) - options = OAuthTokenListOptions(page_number=2, page_size=50) - oauth_tokens_service.list("test-org", options) + with patch.object(oauth_tokens_service, "_list") as mock_list: + mock_list.return_value = [] - mock_transport.request.assert_called_once_with( - "GET", - "/api/v2/organizations/test-org/oauth-tokens?page[number]=2&page[size]=50", - ) + list(oauth_tokens_service.list("test-org", options)) + + expected_params = { + "page[size]": "50", + } + mock_list.assert_called_once_with( + "/api/v2/organizations/test-org/oauth-tokens", params=expected_params + ) def test_list_oauth_tokens_invalid_org(self, oauth_tokens_service): """Test listing OAuth tokens with invalid organization ID.""" with pytest.raises(ValueError, match=ERR_INVALID_ORG): - oauth_tokens_service.list("") + list(oauth_tokens_service.list("")) def test_read_oauth_token_success(self, oauth_tokens_service, mock_transport): """Test reading an OAuth token successfully.""" @@ -189,7 +178,6 @@ def test_read_oauth_token_success(self, oauth_tokens_service, mock_transport): "data": { "id": "ot-test123", "attributes": { - "uid": "uid-test123", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": False, "service-provider-user": "testuser", @@ -205,7 +193,6 @@ def test_read_oauth_token_success(self, oauth_tokens_service, mock_transport): "GET", "/api/v2/oauth-tokens/ot-test123" ) assert result.id == "ot-test123" - assert result.uid == "uid-test123" def test_read_oauth_token_invalid_id(self, oauth_tokens_service): """Test reading an OAuth token with invalid ID.""" @@ -219,7 +206,6 @@ def test_update_oauth_token_success(self, oauth_tokens_service, mock_transport): "data": { "id": "ot-test123", "attributes": { - "uid": "uid-test123", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": True, "service-provider-user": "testuser", @@ -253,7 +239,6 @@ def test_update_oauth_token_no_ssh_key(self, oauth_tokens_service, mock_transpor "data": { "id": "ot-test123", "attributes": { - "uid": "uid-test123", "created-at": "2023-01-01T00:00:00Z", "has-ssh-key": False, "service-provider-user": "testuser", @@ -308,9 +293,8 @@ def oauth_tokens_service(self): def test_oauth_token_list_options(self, oauth_tokens_service): """Test OAuth token list options creation.""" - options = OAuthTokenListOptions(page_number=1, page_size=25) + options = OAuthTokenListOptions(page_size=25) - assert options.page_number == 1 assert options.page_size == 25 def test_oauth_token_update_options(self, oauth_tokens_service): diff --git a/tests/units/test_policy_evaluation.py b/tests/units/test_policy_evaluation.py new file mode 100644 index 0000000..820496b --- /dev/null +++ b/tests/units/test_policy_evaluation.py @@ -0,0 +1,211 @@ +"""Unit tests for the policy evaluation module.""" + +from unittest.mock import Mock + +import pytest + +from pytfe._http import HTTPTransport +from pytfe.errors import InvalidTaskStageIDError +from pytfe.models.policy_evaluation import ( + PolicyEvaluation, + PolicyEvaluationListOptions, + PolicyEvaluationStatus, +) +from pytfe.resources.policy_evaluation import PolicyEvaluations + + +class TestPolicyEvaluations: + """Test the PolicyEvaluations service class.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def policy_evaluations_service(self, mock_transport): + """Create a PolicyEvaluations service with mocked transport.""" + return PolicyEvaluations(mock_transport) + + def test_list_validations(self, policy_evaluations_service): + """Test list method with invalid task stage ID.""" + + # Test empty task stage ID + with pytest.raises(InvalidTaskStageIDError): + list(policy_evaluations_service.list("")) + + # Test None task stage ID + with pytest.raises(InvalidTaskStageIDError): + list(policy_evaluations_service.list(None)) + + def test_list_success_with_options( + self, policy_evaluations_service, mock_transport + ): + """Test successful iteration with custom pagination options.""" + + mock_response_data = { + "data": [ + { + "id": "poleval-456", + "type": "policy-evaluations", + "attributes": { + "status": "failed", + "policy-kind": "opa", + "status-timestamp": { + "passed-at": None, + "failed-at": "2023-01-02T12:00:00Z", + "running-at": "2023-01-02T11:59:00Z", + "canceled-at": None, + "errored-at": None, + }, + "result-count": { + "advisory-failed": 2, + "mandatory-failed": 1, + "passed": 3, + "errored": 0, + }, + "created-at": "2023-01-02T11:58:00Z", + "updated-at": "2023-01-02T12:00:00Z", + }, + "relationships": { + "policy-attachable": { + "data": {"id": "ts-456", "type": "task-stages"} + } + }, + } + ] + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + options = PolicyEvaluationListOptions(page_size=5) + result = list(policy_evaluations_service.list("ts-456", options=options)) + + # Verify the request was made with correct parameters + assert mock_transport.request.call_count == 1 + call_args = mock_transport.request.call_args + assert call_args[0][0] == "GET" + assert call_args[0][1] == "api/v2/task-stages/ts-456/policy-evaluations" + + # Verify custom options were passed and merged with _list defaults + params = call_args[1]["params"] + assert params["page[size]"] == 5 # Custom value from options + + # Verify the result + assert len(result) == 1 + assert isinstance(result[0], PolicyEvaluation) + assert result[0].id == "poleval-456" + assert result[0].status == PolicyEvaluationStatus.POLICYEVALUATIONFAILED + assert result[0].result_count.advisory_failed == 2 + assert result[0].result_count.mandatory_failed == 1 + + def test_list_empty_result(self, policy_evaluations_service, mock_transport): + """Test iteration with no results.""" + + mock_response_data = {"data": []} + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + result = list(policy_evaluations_service.list("ts-empty")) + + # Verify the request was made + assert mock_transport.request.call_count == 1 + + # Verify iterator yields no items + assert len(result) == 0 + assert result == [] + + def test_list_with_different_statuses( + self, policy_evaluations_service, mock_transport + ): + """Test list operation returns evaluations with different statuses.""" + + mock_response_data = { + "data": [ + { + "id": "poleval-pending", + "type": "policy-evaluations", + "attributes": { + "status": "pending", + "policy-kind": "opa", + "status-timestamp": {}, + "result-count": { + "advisory-failed": 0, + "mandatory-failed": 0, + "passed": 0, + "errored": 0, + }, + "created-at": "2023-01-01T11:58:00Z", + "updated-at": "2023-01-01T11:58:00Z", + }, + "relationships": { + "policy-attachable": { + "data": {"id": "ts-multi", "type": "task-stages"} + } + }, + }, + { + "id": "poleval-running", + "type": "policy-evaluations", + "attributes": { + "status": "running", + "policy-kind": "opa", + "status-timestamp": {"running-at": "2023-01-01T11:59:00Z"}, + "result-count": { + "advisory-failed": 0, + "mandatory-failed": 0, + "passed": 0, + "errored": 0, + }, + "created-at": "2023-01-01T11:58:00Z", + "updated-at": "2023-01-01T11:59:00Z", + }, + "relationships": { + "policy-attachable": { + "data": {"id": "ts-multi", "type": "task-stages"} + } + }, + }, + { + "id": "poleval-errored", + "type": "policy-evaluations", + "attributes": { + "status": "errored", + "policy-kind": "opa", + "status-timestamp": {"errored-at": "2023-01-01T12:00:00Z"}, + "result-count": { + "advisory-failed": 0, + "mandatory-failed": 0, + "passed": 0, + "errored": 1, + }, + "created-at": "2023-01-01T11:58:00Z", + "updated-at": "2023-01-01T12:00:00Z", + }, + "relationships": { + "policy-attachable": { + "data": {"id": "ts-multi", "type": "task-stages"} + } + }, + }, + ] + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + result = list(policy_evaluations_service.list("ts-multi")) + + # Verify the iterator yields all items with correct statuses + assert len(result) == 3 + assert result[0].status == PolicyEvaluationStatus.POLICYEVALUATIONPENDING + assert result[1].status == PolicyEvaluationStatus.POLICYEVALUATIONRUNNING + assert result[2].status == PolicyEvaluationStatus.POLICYEVALUATIONERRORED + + # Verify all are PolicyEvaluation instances + assert all(isinstance(item, PolicyEvaluation) for item in result) diff --git a/tests/units/test_reserved_tag_key.py b/tests/units/test_reserved_tag_key.py index 490a93a..d7ca66b 100644 --- a/tests/units/test_reserved_tag_key.py +++ b/tests/units/test_reserved_tag_key.py @@ -14,7 +14,7 @@ ReservedTagKeyListOptions, ReservedTagKeyUpdateOptions, ) -from pytfe.resources.reserved_tag_key import ReservedTagKey +from pytfe.resources.reserved_tag_key import ReservedTagKeys class TestReservedTagKeyParsing: @@ -24,7 +24,7 @@ class TestReservedTagKeyParsing: def reserved_tag_key_service(self): """Create a ReservedTagKey service for testing parsing.""" mock_transport = Mock(spec=HTTPTransport) - return ReservedTagKey(mock_transport) + return ReservedTagKeys(mock_transport) def test_parse_reserved_tag_key_minimal(self, reserved_tag_key_service): """Test _parse_reserved_tag_key with minimal data.""" @@ -68,12 +68,12 @@ class TestReservedTagKey: def reserved_tag_key_service(self): """Create a ReservedTagKey service for testing.""" mock_transport = Mock(spec=HTTPTransport) - return ReservedTagKey(mock_transport) + return ReservedTagKeys(mock_transport) def test_list_reserved_tag_keys_invalid_org(self, reserved_tag_key_service): """Test listing reserved tag keys with invalid organization.""" with pytest.raises(InvalidOrgError): - reserved_tag_key_service.list("") + list(reserved_tag_key_service.list("")) def test_create_reserved_tag_key_invalid_org(self, reserved_tag_key_service): """Test creating reserved tag key with invalid organization.""" @@ -83,11 +83,6 @@ def test_create_reserved_tag_key_invalid_org(self, reserved_tag_key_service): with pytest.raises(InvalidOrgError): reserved_tag_key_service.create("", options) - def test_read_reserved_tag_key_not_implemented(self, reserved_tag_key_service): - """Test reading reserved tag key raises NotImplementedError.""" - with pytest.raises(NotImplementedError): - reserved_tag_key_service.read("rtk-123") - def test_update_reserved_tag_key_invalid_id(self, reserved_tag_key_service): """Test updating reserved tag key with invalid ID.""" options = ReservedTagKeyUpdateOptions(key="updated-key") @@ -115,6 +110,5 @@ def test_reserved_tag_key_update_options_model(self): def test_reserved_tag_key_list_options_model(self): """Test ReservedTagKeyListOptions model validation.""" - options = ReservedTagKeyListOptions(page_number=2, page_size=50) - assert options.page_number == 2 + options = ReservedTagKeyListOptions(page_size=50) assert options.page_size == 50