diff --git a/examples/workspace_resources.py b/examples/workspace_resources.py new file mode 100644 index 0000000..634aa98 --- /dev/null +++ b/examples/workspace_resources.py @@ -0,0 +1,119 @@ +"""Example script for working with workspace resources in Terraform Enterprise. + +This script demonstrates how to list resources within a workspace. +""" + +import argparse +import sys + +from pytfe import TFEClient +from pytfe.models import WorkspaceResourceListOptions + + +def list_workspace_resources( + client: TFEClient, + workspace_id: str, + page_number: int | None = None, + page_size: int | None = None, +) -> None: + """List all resources in a workspace.""" + try: + print(f"Listing resources for workspace: {workspace_id}") + + # Prepare list options + options = None + if page_number or page_size: + options = WorkspaceResourceListOptions() + if page_number: + options.page_number = page_number + if page_size: + options.page_size = page_size + + # List workspace resources (returns an iterator) + resources = list(client.workspace_resources.list(workspace_id, options)) + + if not resources: + print("No resources found in this workspace.") + return + + print(f"\nFound {len(resources)} resource(s):") + print("-" * 80) + + for resource in resources: + print(f"ID: {resource.id}") + print(f"Address: {resource.address}") + print(f"Name: {resource.name}") + print(f"Module: {resource.module}") + print(f"Provider: {resource.provider}") + print(f"Provider Type: {resource.provider_type}") + print(f"Created At: {resource.created_at}") + print(f"Updated At: {resource.updated_at}") + print(f"Modified By State Version: {resource.modified_by_state_version_id}") + if resource.name_index: + print(f"Name Index: {resource.name_index}") + print("-" * 80) + + except Exception as e: + print(f"Error listing workspace resources: {e}", file=sys.stderr) + sys.exit(1) + + +def main(): + """Main function to handle command line arguments and execute operations.""" + parser = argparse.ArgumentParser( + description="Manage workspace resources in Terraform Enterprise", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # List all resources in a workspace + python workspace_resources.py list ws-abc123 + + # List with pagination + python workspace_resources.py list ws-abc123 --page-number 2 --page-size 50 + +Environment variables: + TFE_TOKEN: Your Terraform Enterprise API token + TFE_URL: Your Terraform Enterprise URL (default: https://app.terraform.io) + TFE_ORG: Your Terraform Enterprise organization name + """, + ) + + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # List command + list_parser = subparsers.add_parser("list", help="List workspace resources") + list_parser.add_argument("workspace_id", help="ID of the workspace") + list_parser.add_argument( + "--page-number", type=int, help="Page number for pagination" + ) + list_parser.add_argument("--page-size", type=int, help="Page size for pagination") + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + # Initialize TFE client + try: + client = TFEClient() + except Exception as e: + print(f"Error initializing TFE client: {e}", file=sys.stderr) + print( + "Make sure TFE_TOKEN and TFE_URL environment variables are set.", + file=sys.stderr, + ) + sys.exit(1) + + # Execute the requested command + if args.command == "list": + list_workspace_resources( + client, + args.workspace_id, + args.page_number, + args.page_size, + ) + + +if __name__ == "__main__": + main() diff --git a/examples/workspace_run_tasks.py b/examples/workspace_run_tasks.py new file mode 100644 index 0000000..d67b1aa --- /dev/null +++ b/examples/workspace_run_tasks.py @@ -0,0 +1,404 @@ +""" +Terraform Cloud/Enterprise Workspace Run Tasks Management Example + +This comprehensive example demonstrates workspace run task operations using the python-tfe SDK, +providing a complete command-line interface for managing workspace run tasks with advanced +operations including list, get, create, update, and delete operations. + +Prerequisites: + - Set TFE_TOKEN environment variable with your Terraform Cloud API token + - Ensure you have access to the target organization and workspace + - Have a run task created in your organization + +Quick Start: + python examples/workspace_run_tasks.py --help + +Core Operations: + +1. List Workspace Run Tasks: + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --page-size 20 + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --include task + +2. Get Workspace Run Task: + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --task-id "wsrt-abc123xyz" + +3. Create Workspace Run Task: + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --create --run-task-id "task-abc123xyz" + +4. Update Workspace Run Task: + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --task-id "wsrt-abc123xyz" --update + +5. Delete Workspace Run Task: + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --task-id "wsrt-abc123xyz" --delete + +6. Comprehensive Testing: + python examples/workspace_run_tasks.py --org my-org --workspace "my-workspace" --all-tests +""" + +from __future__ import annotations + +import argparse +import os + +from pytfe import TFEClient, TFEConfig +from pytfe.models import ( + RunTask, + Stage, + TaskEnforcementLevel, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskIncludeOpt, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskReadOptions, + WorkspaceRunTaskUpdateOptions, +) + + +def _print_header(title: str): + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def _print_workspace_run_task(task, title: str = "Workspace Run Task Details"): + """Print workspace run task details in a formatted way.""" + print(f"\n{title}") + print("-" * 40) + print(f"ID: {task.id}") + print(f"Type: {task.type}") + print(f"Stage: {task.stage}") + print(f"Enforcement Level: {task.enforcement_level}") + + if hasattr(task, "created_at") and task.created_at: + print(f"Created At: {task.created_at}") + if hasattr(task, "updated_at") and task.updated_at: + print(f"Updated At: {task.updated_at}") + + +def list_workspace_run_tasks(client: TFEClient, args): + """List workspace run tasks with various filters.""" + _print_header(f"Listing Workspace Run Tasks for Workspace: {args.workspace}") + + # Build list options + options = WorkspaceRunTaskListOptions() + + if args.page_size: + options.page_size = args.page_size + if args.page: + options.page_number = args.page + if args.include: + include_opts = [] + for inc in args.include.split(","): + inc = inc.strip() + if inc == "task": + include_opts.append(WorkspaceRunTaskIncludeOpt.RUN_TASK) + if include_opts: + options.include = include_opts + + # Get workspace ID from workspace name + try: + workspace = client.workspaces.read(args.workspace, organization=args.org) + workspace_id = workspace.id + + task_count = 0 + for task in client.workspace_run_tasks.list(workspace_id, options=options): + task_count += 1 + _print_workspace_run_task(task, f"Workspace Run Task #{task_count}") + + if task_count == 0: + print("No workspace run tasks found.") + else: + print(f"\nTotal workspace run tasks: {task_count}") + + except Exception as e: + print(f"Error listing workspace run tasks: {e}") + + +def get_workspace_run_task(client: TFEClient, args): + """Get a specific workspace run task.""" + _print_header(f"Getting Workspace Run Task: {args.task_id}") + + try: + # Get workspace ID from workspace name + workspace = client.workspaces.read(args.workspace, organization=args.org) + workspace_id = workspace.id + + # Build read options + options = WorkspaceRunTaskReadOptions() + if args.include: + include_opts = [] + for inc in args.include.split(","): + inc = inc.strip() + if inc == "task": + include_opts.append(WorkspaceRunTaskIncludeOpt.RUN_TASK) + if include_opts: + options.include = include_opts + + task = client.workspace_run_tasks.get( + workspace_id, args.task_id, options=options + ) + _print_workspace_run_task(task) + + except Exception as e: + print(f"Error getting workspace run task: {e}") + + +def create_workspace_run_task(client: TFEClient, args): + """Create a new workspace run task.""" + _print_header("Creating Workspace Run Task") + + try: + # Get workspace ID from workspace name + workspace = client.workspaces.read(args.workspace, organization=args.org) + workspace_id = workspace.id + + # Build create options + enforcement_level = TaskEnforcementLevel.ADVISORY # Default + if args.enforcement_level: + if args.enforcement_level.lower() == "advisory": + enforcement_level = TaskEnforcementLevel.ADVISORY + elif args.enforcement_level.lower() == "mandatory": + enforcement_level = TaskEnforcementLevel.MANDATORY + + stage = None + if args.stage: + if args.stage.lower() == "pre_plan": + stage = Stage.PRE_PLAN + elif args.stage.lower() == "post_plan": + stage = Stage.POST_PLAN + elif args.stage.lower() == "pre_apply": + stage = Stage.PRE_APPLY + elif args.stage.lower() == "post_apply": + stage = Stage.POST_APPLY + + options = WorkspaceRunTaskCreateOptions( + enforcement_level=enforcement_level, + stage=stage, + run_task=RunTask(id=args.run_task_id), + ) + + task = client.workspace_run_tasks.create(workspace_id, options) + _print_workspace_run_task(task, "Created Workspace Run Task") + + except Exception as e: + print(f"Error creating workspace run task: {e}") + + +def update_workspace_run_task(client: TFEClient, args): + """Update a workspace run task.""" + _print_header(f"Updating Workspace Run Task: {args.task_id}") + + try: + # Get workspace ID from workspace name + workspace = client.workspaces.read(args.workspace, organization=args.org) + workspace_id = workspace.id + + # Build update options + options = WorkspaceRunTaskUpdateOptions() + + # Set optional fields based on args + if args.enforcement_level: + if args.enforcement_level.lower() == "advisory": + options.enforcement_level = TaskEnforcementLevel.ADVISORY + elif args.enforcement_level.lower() == "mandatory": + options.enforcement_level = TaskEnforcementLevel.MANDATORY + + if args.stage: + if args.stage.lower() == "pre_plan": + options.stage = Stage.PRE_PLAN + elif args.stage.lower() == "post_plan": + options.stage = Stage.POST_PLAN + elif args.stage.lower() == "pre_apply": + options.stage = Stage.PRE_APPLY + elif args.stage.lower() == "post_apply": + options.stage = Stage.POST_APPLY + + task = client.workspace_run_tasks.update(workspace_id, args.task_id, options) + _print_workspace_run_task(task, "Updated Workspace Run Task") + + except Exception as e: + print(f"Error updating workspace run task: {e}") + + +def delete_workspace_run_task(client: TFEClient, args): + """Delete a workspace run task.""" + _print_header(f"Deleting Workspace Run Task: {args.task_id}") + + try: + # Get workspace ID from workspace name + workspace = client.workspaces.read(args.workspace, organization=args.org) + workspace_id = workspace.id + + client.workspace_run_tasks.delete(workspace_id, args.task_id) + print("Workspace run task deleted successfully.") + + except Exception as e: + print(f"Error deleting workspace run task: {e}") + + +def run_all_tests(client: TFEClient, args): + """Run comprehensive tests of all workspace run task operations.""" + _print_header("Running All Workspace Run Task Tests") + + if not args.run_task_id: + print("Error: --run-task-id is required for comprehensive testing") + return + + test_workspace_id = None + test_task_id = None + + try: + # Get workspace ID + workspace = client.workspaces.read(args.workspace, organization=args.org) + test_workspace_id = workspace.id + + print("\n1. Testing LIST operation...") + task_count = 0 + for task in client.workspace_run_tasks.list(test_workspace_id): + task_count += 1 + if task_count <= 3: # Show first 3 + _print_workspace_run_task(task, f"Task #{task_count}") + print(f"Found {task_count} existing workspace run tasks") + + print("\n2. Testing CREATE operation...") + create_options = WorkspaceRunTaskCreateOptions( + run_task=RunTask(id=args.run_task_id), + enforcement_level=TaskEnforcementLevel.ADVISORY, + stage=Stage.POST_PLAN, + ) + created_task = client.workspace_run_tasks.create( + test_workspace_id, create_options + ) + test_task_id = created_task.id + _print_workspace_run_task(created_task, "Created Test Task") + + print("\n3. Testing GET operation...") + retrieved_task = client.workspace_run_tasks.get(test_workspace_id, test_task_id) + _print_workspace_run_task(retrieved_task, "Retrieved Task") + + print("\n4. Testing UPDATE operation...") + update_options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY + ) + updated_task = client.workspace_run_tasks.update( + test_workspace_id, test_task_id, update_options + ) + _print_workspace_run_task(updated_task, "Updated Task") + + print("\n5. Testing DELETE operation...") + client.workspace_run_tasks.delete(test_workspace_id, test_task_id) + print("Test task deleted successfully") + + print("\nAll tests completed successfully!") + + except Exception as e: + print(f"Test failed: {e}") + # Cleanup on failure + if test_workspace_id and test_task_id: + try: + client.workspace_run_tasks.delete(test_workspace_id, test_task_id) + print("Cleaned up test task") + except Exception: + pass + + +def main(): + parser = argparse.ArgumentParser( + description="Workspace Run Tasks demo for python-tfe SDK", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + # Connection settings + parser.add_argument( + "--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io") + ) + parser.add_argument("--token", default=os.getenv("TFE_TOKEN", "")) + + # Required arguments + parser.add_argument("--org", required=True, help="Organization name") + parser.add_argument("--workspace", required=True, help="Workspace name") + + # Identification arguments + parser.add_argument( + "--task-id", help="Workspace run task ID for get/update/delete operations" + ) + parser.add_argument("--run-task-id", help="Run task ID for create operations") + + # Operations + parser.add_argument( + "--create", action="store_true", help="Create a new workspace run task" + ) + parser.add_argument( + "--update", action="store_true", help="Update a workspace run task" + ) + parser.add_argument( + "--delete", action="store_true", help="Delete a workspace run task" + ) + parser.add_argument( + "--all-tests", action="store_true", help="Run all operation tests" + ) + + # Listing options + parser.add_argument("--page", type=int, default=1, help="Page number for listing") + parser.add_argument("--page-size", type=int, help="Page size for listing") + parser.add_argument("--include", help="Include options (task)") + + # Task configuration options + parser.add_argument( + "--enforcement-level", + choices=["advisory", "mandatory"], + help="Enforcement level for the task", + ) + parser.add_argument( + "--stage", + choices=["pre_plan", "post_plan", "pre_apply", "post_apply"], + help="Stage when the task should run", + ) + + args = parser.parse_args() + + # Validate token + if not args.token: + print("Error: TFE_TOKEN environment variable or --token argument is required") + return 1 + + # Create client + cfg = TFEConfig(address=args.address, token=args.token) + client = TFEClient(cfg) + + try: + # Determine operation + if args.all_tests: + run_all_tests(client, args) + elif args.create: + if not args.run_task_id: + print("Error: --run-task-id is required for create operation") + return 1 + create_workspace_run_task(client, args) + elif args.update: + if not args.task_id: + print("Error: --task-id is required for update operation") + return 1 + update_workspace_run_task(client, args) + elif args.delete: + if not args.task_id: + print("Error: --task-id is required for delete operation") + return 1 + delete_workspace_run_task(client, args) + elif args.task_id: + get_workspace_run_task(client, args) + else: + list_workspace_run_tasks(client, args) + + except KeyboardInterrupt: + print("\nOperation cancelled by user") + return 1 + except Exception as e: + print(f"Error: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 7aa1bd8..bc4ccc4 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -33,6 +33,8 @@ from .resources.state_versions import StateVersions from .resources.variable import Variables from .resources.variable_sets import VariableSets, VariableSetVariables +from .resources.workspace_resources import WorkspaceResourcesService +from .resources.workspace_run_tasks import WorkspaceRunTasksService from .resources.workspaces import Workspaces @@ -72,6 +74,8 @@ def __init__(self, config: TFEConfig | None = None): self.variable_sets = VariableSets(self._transport) self.variable_set_variables = VariableSetVariables(self._transport) self.workspaces = Workspaces(self._transport) + self.workspace_resources = WorkspaceResourcesService(self._transport) + self.workspace_run_tasks = WorkspaceRunTasksService(self._transport) self.registry_modules = RegistryModules(self._transport) self.registry_providers = RegistryProviders(self._transport) diff --git a/src/pytfe/errors.py b/src/pytfe/errors.py index 61853d1..3f120a9 100644 --- a/src/pytfe/errors.py +++ b/src/pytfe/errors.py @@ -470,6 +470,14 @@ def __init__(self, message: str = "invalid value for policy evaluation ID"): super().__init__(message) +# Workspace Run Task errors +class InvalidWorkspaceRunTaskIDError(InvalidValues): + """Raised when an invalid workspace run task ID is provided.""" + + def __init__(self, task_id: str): + super().__init__(f"invalid value for workspace run task ID: {task_id}") + + # Policy Set Parameter errors class InvalidParamIDError(InvalidValues): """Raised when an invalid policy set parameter ID is provided.""" diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index a3cc71f..4d5905d 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -353,6 +353,23 @@ WorkspaceUpdateRemoteStateConsumersOptions, ) +# ── Workspace Resources ──────────────────────────────────────────────────────── +from .workspace_resource import ( + WorkspaceResource, + WorkspaceResourceListOptions, +) + +# ── Workspace Run Tasks ──────────────────────────────────────────────────────── +from .workspace_run_task import ( + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskIncludeOpt, + WorkspaceRunTaskList, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskReadOptions, + WorkspaceRunTaskUpdateOptions, +) + # ── Public surface ──────────────────────────────────────────────────────────── __all__ = [ # OAuth @@ -524,6 +541,17 @@ "WorkspaceTagListOptions", "WorkspaceUpdateOptions", "WorkspaceUpdateRemoteStateConsumersOptions", + # Workspace Resources + "WorkspaceResource", + "WorkspaceResourceListOptions", + # Workspace Run Tasks + "WorkspaceRunTask", + "WorkspaceRunTaskCreateOptions", + "WorkspaceRunTaskIncludeOpt", + "WorkspaceRunTaskList", + "WorkspaceRunTaskListOptions", + "WorkspaceRunTaskReadOptions", + "WorkspaceRunTaskUpdateOptions", "RunQueue", "ReadRunQueueOptions", # Runs diff --git a/src/pytfe/models/run_task.py b/src/pytfe/models/run_task.py index 8741162..67a4f07 100644 --- a/src/pytfe/models/run_task.py +++ b/src/pytfe/models/run_task.py @@ -1,28 +1,47 @@ from __future__ import annotations from enum import Enum +from typing import TYPE_CHECKING from pydantic import BaseModel, Field from ..models.common import Pagination from .agent import AgentPool from .organization import Organization -from .workspace_run_task import WorkspaceRunTask + +if TYPE_CHECKING: + from .workspace_run_task import WorkspaceRunTask + + +class Stage(str, Enum): + """Task stage options.""" + + PRE_PLAN = "pre-plan" + POST_PLAN = "post-plan" + PRE_APPLY = "pre-apply" + POST_APPLY = "post-apply" + + +class TaskEnforcementLevel(str, Enum): + """Task enforcement level options.""" + + ADVISORY = "advisory" + MANDATORY = "mandatory" class RunTask(BaseModel): id: str - name: str + name: str | None = None description: str | None = None - url: str - category: str + url: str | None = None + category: str | None = None hmac_key: str | None = None - enabled: bool + enabled: bool | None = None global_configuration: GlobalRunTask | None = None agent_pool: AgentPool | None = None organization: Organization | None = None - workspace_run_tasks: list[WorkspaceRunTask] = Field(default_factory=list) + workspace_run_tasks: list[WorkspaceRunTask] | None = None class GlobalRunTask(BaseModel): @@ -37,18 +56,6 @@ class GlobalRunTaskOptions(BaseModel): enforcement_level: TaskEnforcementLevel | None = None -class Stage(str, Enum): - PRE_PLAN = "pre-plan" - POST_PLAN = "post-plan" - PRE_APPLY = "pre-apply" - POST_APPLY = "post-apply" - - -class TaskEnforcementLevel(str, Enum): - ADVISORY = "advisory" - MANDATORY = "mandatory" - - class RunTaskIncludeOptions(str, Enum): RUN_TASK_WORKSPACE_TASKS = "workspace_tasks" RUN_TASK_WORKSPACE = "workspace_tasks.workspace" @@ -91,3 +98,13 @@ class RunTaskUpdateOptions(BaseModel): enabled: bool | None = None global_configuration: GlobalRunTaskOptions | None = None agent_pool: AgentPool | None = None + + +def _rebuild_models() -> None: + """Rebuild models to resolve forward references.""" + from .workspace_run_task import WorkspaceRunTask # noqa: F401 + + RunTask.model_rebuild() + + +_rebuild_models() diff --git a/src/pytfe/models/workspace_resource.py b/src/pytfe/models/workspace_resource.py new file mode 100644 index 0000000..78eaa40 --- /dev/null +++ b/src/pytfe/models/workspace_resource.py @@ -0,0 +1,29 @@ +"""Workspace resources models for Terraform Enterprise.""" + +from pydantic import BaseModel + + +class WorkspaceResource(BaseModel): + """Represents a Terraform Enterprise workspace resource. + + These are resources managed by Terraform in a workspace's state. + """ + + id: str + address: str + name: str + created_at: str + updated_at: str + module: str + provider: str + provider_type: str + modified_by_state_version_id: str + name_index: str | None = None + + +class WorkspaceResourceListOptions(BaseModel): + """Options for listing workspace resources.""" + + # Pagination + page_number: int | None = None + page_size: int | None = None diff --git a/src/pytfe/models/workspace_run_task.py b/src/pytfe/models/workspace_run_task.py index b5072a6..610c8aa 100644 --- a/src/pytfe/models/workspace_run_task.py +++ b/src/pytfe/models/workspace_run_task.py @@ -1,7 +1,95 @@ from __future__ import annotations -from pydantic import BaseModel +from enum import Enum +from typing import TYPE_CHECKING + +from pydantic import BaseModel, Field + +from ..models.common import Pagination +from .run_task import RunTask, Stage, TaskEnforcementLevel + +if TYPE_CHECKING: + from .workspace import Workspace class WorkspaceRunTask(BaseModel): + """A workspace run task represents the association between a run task and a workspace.""" + id: str + type: str = "workspace-tasks" + enforcement_level: TaskEnforcementLevel + # Deprecated: Use stages property instead + stage: Stage | None = None + # List of stages for the task + stages: list[Stage] | None = None + created_at: str | None = None + updated_at: str | None = None + + # Relationships + workspace: Workspace | None = None + run_task: RunTask | None = None + + +class WorkspaceRunTaskList(BaseModel): + """A list of workspace run tasks.""" + + data: list[WorkspaceRunTask] = Field(default_factory=list) + pagination: Pagination | None = None + + +class WorkspaceRunTaskCreateOptions(BaseModel): + """Options for creating a workspace run task.""" + + type: str = "workspace-tasks" + enforcement_level: TaskEnforcementLevel + # Deprecated: Use stages property instead + stage: Stage | None = None + # Optional: The stages to run the task in + stages: list[Stage] | None = None + run_task: RunTask + + +class WorkspaceRunTaskUpdateOptions(BaseModel): + """Options for updating a workspace run task.""" + + type: str = "workspace-tasks" + enforcement_level: TaskEnforcementLevel | None = None + # Deprecated: Use stages property instead + stage: Stage | None = None + # Optional: The stages to run the task in + stages: list[Stage] | None = None + + +class WorkspaceRunTaskListOptions(BaseModel): + """Options for listing workspace run tasks.""" + + # Pagination + page_number: int | None = None + page_size: int | None = None + + # Includes + include: list[WorkspaceRunTaskIncludeOpt] | None = None + + +class WorkspaceRunTaskReadOptions(BaseModel): + """Options for reading a workspace run task.""" + + # Includes + include: list[WorkspaceRunTaskIncludeOpt] | None = None + + +class WorkspaceRunTaskIncludeOpt(str, Enum): + """Include options for workspace run task API calls.""" + + RUN_TASK = "run_task" + WORKSPACE = "workspace" + + +def _rebuild_models() -> None: + """Rebuild models to resolve forward references.""" + from .workspace import Workspace # noqa: F401 + + WorkspaceRunTask.model_rebuild() + + +_rebuild_models() diff --git a/src/pytfe/resources/run_task.py b/src/pytfe/resources/run_task.py index 853eab6..52432b8 100644 --- a/src/pytfe/resources/run_task.py +++ b/src/pytfe/resources/run_task.py @@ -98,11 +98,14 @@ def _run_task_from(d: dict[str, Any], org: str | None = None) -> RunTask: wrt_data = relationships.get("workspace-tasks", {}).get("data", []) if isinstance(wrt_data, list): # Note: Full WorkspaceRunTask objects would need to be fetched separately - # Here we just create minimal objects with IDs + # Here we just create minimal objects with IDs and default enforcement level for item in wrt_data: if isinstance(item, dict) and "id" in item: workspace_run_tasks.append( - WorkspaceRunTask(id=_safe_str(item.get("id"))) + WorkspaceRunTask( + id=_safe_str(item.get("id")), + enforcement_level=TaskEnforcementLevel.ADVISORY, + ) ) return RunTask( @@ -116,7 +119,7 @@ def _run_task_from(d: dict[str, Any], org: str | None = None) -> RunTask: global_configuration=global_config, agent_pool=agent_pool, organization=organization, - workspace_run_tasks=workspace_run_tasks, + workspace_run_tasks=workspace_run_tasks if workspace_run_tasks else None, ) diff --git a/src/pytfe/resources/workspace_resources.py b/src/pytfe/resources/workspace_resources.py new file mode 100644 index 0000000..ff3e0ee --- /dev/null +++ b/src/pytfe/resources/workspace_resources.py @@ -0,0 +1,69 @@ +"""Workspace resources service for Terraform Enterprise.""" + +import urllib.parse +from collections.abc import Iterator +from typing import Any + +from pytfe.models import ( + WorkspaceResource, + WorkspaceResourceListOptions, +) + +from ._base import _Service + + +def _workspace_resource_from(data: dict[str, Any]) -> WorkspaceResource: + """Convert API response data to WorkspaceResource model.""" + attributes = data.get("attributes", {}) + + return WorkspaceResource( + id=data.get("id", ""), + address=attributes.get("address", ""), + name=attributes.get("name", ""), + created_at=attributes.get("created-at", ""), + updated_at=attributes.get("updated-at", ""), + module=attributes.get("module", ""), + provider=attributes.get("provider", ""), + provider_type=attributes.get("provider-type", ""), + modified_by_state_version_id=attributes.get("modified-by-state-version-id", ""), + name_index=attributes.get("name-index"), + ) + + +class WorkspaceResourcesService(_Service): + """Service for managing workspace resources in Terraform Enterprise. + + Workspace resources represent the infrastructure resources + managed by Terraform in a workspace's state file. + """ + + def list( + self, workspace_id: str, options: WorkspaceResourceListOptions | None = None + ) -> Iterator[WorkspaceResource]: + """List workspace resources for a given workspace. + + Args: + workspace_id: The ID of the workspace to list resources for + options: Optional query parameters for filtering and pagination + + Yields: + WorkspaceResource objects + """ + if not workspace_id or not workspace_id.strip(): + raise ValueError("workspace_id is required") + + # URL encode the workspace ID and construct URL + encoded_workspace_id = urllib.parse.quote(workspace_id, safe="") + url = f"/api/v2/workspaces/{encoded_workspace_id}/resources" + + # Handle parameters + params: dict[str, int] = {} + if options: + if options.page_number is not None: + params["page[number]"] = options.page_number + if options.page_size is not None: + params["page[size]"] = options.page_size + + # Use the _list method from base service to handle pagination + for item in self._list(url, params=params): + yield _workspace_resource_from(item) diff --git a/src/pytfe/resources/workspace_run_tasks.py b/src/pytfe/resources/workspace_run_tasks.py new file mode 100644 index 0000000..6c08cbf --- /dev/null +++ b/src/pytfe/resources/workspace_run_tasks.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any + +from pytfe.models import ( + RunTask, + Stage, + TaskEnforcementLevel, + Workspace, + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskReadOptions, + WorkspaceRunTaskUpdateOptions, +) + +from ..errors import ( + InvalidWorkspaceIDError, + InvalidWorkspaceRunTaskIDError, +) +from ..utils import _safe_str, valid_string_id +from ._base import _Service + + +def _workspace_run_task_from(d: dict[str, Any]) -> WorkspaceRunTask: + """ + Convert JSON API response data to WorkspaceRunTask object. + + Maps the JSON API format to Python model fields, handling: + - Basic attributes (id, enforcement_level, stage, timestamps) + - Relationships (workspace, run_task) + """ + attr: dict[str, Any] = d.get("attributes", {}) or {} + relationships: dict[str, Any] = d.get("relationships", {}) or {} + + id_str: str = _safe_str(d.get("id")) + type_str: str = _safe_str(d.get("type", "workspace-tasks")) + + # Extract enforcement level and stage + enforcement_level_str = attr.get("enforcement-level") + stage_str = attr.get("stage") + + # Convert to enum values + enforcement_level = TaskEnforcementLevel.ADVISORY # Default + if isinstance(enforcement_level_str, str): + try: + enforcement_level = TaskEnforcementLevel(enforcement_level_str) + except ValueError: + enforcement_level = TaskEnforcementLevel.ADVISORY + + stage = Stage.POST_PLAN # Default + if isinstance(stage_str, str): + try: + # API returns kebab-case (pre-plan, post-plan, etc.) + stage = Stage(stage_str) + except ValueError: + stage = Stage.POST_PLAN + + # Handle timestamps + created_at = attr.get("created-at") + updated_at = attr.get("updated-at") + + # Handle relationships + workspace_data = relationships.get("workspace", {}).get("data") + run_task_data = relationships.get("task", {}).get("data") + + workspace = None + if workspace_data and isinstance(workspace_data, dict): + workspace = Workspace(id=_safe_str(workspace_data.get("id"))) + + run_task = None + if run_task_data and isinstance(run_task_data, dict): + run_task = RunTask( + id=_safe_str(run_task_data.get("id")), + name="", # Will be populated when included in API response + description=None, + url="", # Will be populated when included + category="", # Will be populated when included + hmac_key=None, + enabled=True, + ) + + return WorkspaceRunTask( + id=id_str, + type=type_str, + enforcement_level=enforcement_level, + stage=stage, + created_at=created_at, + updated_at=updated_at, + workspace=workspace, + run_task=run_task, + ) + + +class WorkspaceRunTasksService(_Service): + """Service for managing workspace run tasks.""" + + def list( + self, + workspace_id: str, + *, + options: WorkspaceRunTaskListOptions | None = None, + ) -> Iterator[WorkspaceRunTask]: + """ + List workspace run tasks. + + Args: + workspace_id: The workspace ID to list run tasks for + options: Optional list options for pagination and includes + + Yields: + WorkspaceRunTask objects + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError(workspace_id) + + url = f"/api/v2/workspaces/{workspace_id}/tasks" + + params: dict[str, Any] = {} + + if options: + if options.page_number is not None: + params["page[number]"] = options.page_number + if options.page_size is not None: + params["page[size]"] = options.page_size + if options.include: + params["include"] = ",".join([opt.value for opt in options.include]) + + # Use the list method from base service + for item in self._list(url, params=params): + yield _workspace_run_task_from(item) + + def get( + self, + workspace_id: str, + task_id: str, + *, + options: WorkspaceRunTaskReadOptions | None = None, + ) -> WorkspaceRunTask: + """ + Get a specific workspace run task. + + Args: + workspace_id: The workspace ID + task_id: The workspace run task ID + options: Optional read options for includes + + Returns: + The WorkspaceRunTask object + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidWorkspaceRunTaskIDError: If task_id is invalid + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError(workspace_id) + if not valid_string_id(task_id): + raise InvalidWorkspaceRunTaskIDError(task_id) + + url = f"/api/v2/workspaces/{workspace_id}/tasks/{task_id}" + + params: dict[str, Any] = {} + if options and options.include: + params["include"] = ",".join([opt.value for opt in options.include]) + + response = self.t.request("GET", url, params=params) + json_response = response.json() or {} + return _workspace_run_task_from(json_response["data"]) + + def create( + self, + workspace_id: str, + options: WorkspaceRunTaskCreateOptions, + ) -> WorkspaceRunTask: + """ + Create a new workspace run task. + + Args: + workspace_id: The workspace ID + options: Create options + + Returns: + The created WorkspaceRunTask + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError(workspace_id) + + url = f"/api/v2/workspaces/{workspace_id}/tasks" + + # Build the request payload + data: dict[str, Any] = { + "data": { + "type": options.type, + "attributes": { + "enforcement-level": options.enforcement_level.value, + }, + "relationships": { + "task": {"data": {"type": "tasks", "id": options.run_task.id}} + }, + } + } + + # Add optional stages if provided (stages is the recommended approach) + if options.stages is not None: + data["data"]["attributes"]["stages"] = [s.value for s in options.stages] + + response = self.t.request("POST", url, json_body=data) + + # API returns 204 No Content on success + if response.status_code == 204: + # Try to parse response body if present, otherwise list tasks to get the created one + try: + json_response = response.json() + if json_response and "data" in json_response: + return _workspace_run_task_from(json_response["data"]) + except Exception: + pass + # If no response body, list tasks to find the newly created one + for task in self.list(workspace_id): + # Return the most recently created task (should be first after sorting) + return task + raise ValueError("Could not parse workspace run task creation response") + + json_response = response.json() or {} + return _workspace_run_task_from(json_response["data"]) + + def update( + self, + workspace_id: str, + task_id: str, + options: WorkspaceRunTaskUpdateOptions, + ) -> WorkspaceRunTask: + """ + Update a workspace run task. + + Args: + workspace_id: The workspace ID + task_id: The workspace run task ID + options: Update options + + Returns: + The updated WorkspaceRunTask + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidWorkspaceRunTaskIDError: If task_id is invalid + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError(workspace_id) + if not valid_string_id(task_id): + raise InvalidWorkspaceRunTaskIDError(task_id) + + url = f"/api/v2/workspaces/{workspace_id}/tasks/{task_id}" + + # Build the request payload + data: dict[str, Any] = { + "data": {"type": options.type, "id": task_id, "attributes": {}} + } + + # Add optional fields if provided - convert underscore to kebab-case for API + if options.enforcement_level is not None: + data["data"]["attributes"]["enforcement-level"] = ( + options.enforcement_level.value + ) + if options.stage is not None: + data["data"]["attributes"]["stage"] = options.stage.value + if options.stages is not None: + data["data"]["attributes"]["stages"] = [ + stage.value for stage in options.stages + ] + + response = self.t.request("PATCH", url, json_body=data) + json_response = response.json() or {} + return _workspace_run_task_from(json_response["data"]) + + def delete( + self, + workspace_id: str, + task_id: str, + ) -> None: + """ + Delete a workspace run task. + + Args: + workspace_id: The workspace ID + task_id: The workspace run task ID + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidWorkspaceRunTaskIDError: If task_id is invalid + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError(workspace_id) + if not valid_string_id(task_id): + raise InvalidWorkspaceRunTaskIDError(task_id) + + url = f"/api/v2/workspaces/{workspace_id}/tasks/{task_id}" + self.t.request("DELETE", url) diff --git a/tests/units/test_run_task.py b/tests/units/test_run_task.py index a428d79..75fcb6b 100644 --- a/tests/units/test_run_task.py +++ b/tests/units/test_run_task.py @@ -79,7 +79,8 @@ def test_run_task_from_comprehensive(self): assert result.organization is not None assert result.organization.id == "org-123" assert result.organization.name == "org-123" - assert isinstance(result.workspace_run_tasks, list) + # workspace_run_tasks are parsed from the relationship data + assert result.workspace_run_tasks is not None assert len(result.workspace_run_tasks) == 2 assert result.workspace_run_tasks[0].id == "wstask-1" assert result.workspace_run_tasks[1].id == "wstask-2" @@ -409,7 +410,7 @@ def test_update_task_all_fields(self, run_tasks_service): assert result.enabled is False assert result.hmac_key == "new-secret-key" assert result.organization is None - assert result.workspace_run_tasks == [] + assert result.workspace_run_tasks is None def test_update_task_validation_errors(self, run_tasks_service): """Test update method validation errors.""" diff --git a/tests/units/test_workspace_resources.py b/tests/units/test_workspace_resources.py new file mode 100644 index 0000000..05f7c15 --- /dev/null +++ b/tests/units/test_workspace_resources.py @@ -0,0 +1,297 @@ +"""Unit tests for workspace resources service.""" + +from unittest.mock import Mock + +import pytest + +from pytfe.models.workspace_resource import ( + WorkspaceResource, + WorkspaceResourceListOptions, +) +from pytfe.resources.workspace_resources import WorkspaceResourcesService + + +class TestWorkspaceResourcesService: + """Test suite for WorkspaceResourcesService.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock transport for testing.""" + return Mock() + + @pytest.fixture + def service(self, mock_transport): + """Create a WorkspaceResourcesService instance for testing.""" + return WorkspaceResourcesService(mock_transport) + + @pytest.fixture + def sample_workspace_resource_response(self): + """Sample API response for workspace resources list.""" + return { + "data": [ + { + "id": "resource-1", + "type": "resources", + "attributes": { + "address": "media_bucket.aws_s3_bucket_public_access_block.this[0]", + "name": "this", + "created-at": "2023-01-01T00:00:00Z", + "updated-at": "2023-01-01T00:00:00Z", + "module": "media_bucket", + "provider": "hashicorp/aws", + "provider-type": "aws", + "modified-by-state-version-id": "sv-abc123", + "name-index": "0", + }, + }, + { + "id": "resource-2", + "type": "resources", + "attributes": { + "address": "aws_instance.example", + "name": "example", + "created-at": "2023-01-02T00:00:00Z", + "updated-at": "2023-01-02T00:00:00Z", + "module": "root", + "provider": "hashicorp/aws", + "provider-type": "aws", + "modified-by-state-version-id": "sv-def456", + "name-index": None, + }, + }, + ], + "meta": { + "pagination": { + "current_page": 1, + "total_pages": 1, + "total_count": 2, + "page_size": 20, + } + }, + } + + @pytest.fixture + def sample_empty_response(self): + """Sample API response for empty workspace resources list.""" + return { + "data": [], + "meta": { + "pagination": { + "current_page": 1, + "total_pages": 1, + "total_count": 0, + "page_size": 20, + } + }, + } + + def test_list_workspace_resources_success( + self, service, mock_transport, sample_workspace_resource_response + ): + """Test successful listing of workspace resources.""" + # Mock the transport response + mock_response = Mock() + mock_response.json.return_value = sample_workspace_resource_response + mock_transport.request.return_value = mock_response + + # Call the service + result = list(service.list("ws-abc123")) + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/workspaces/ws-abc123/resources", + params={"page[number]": 1, "page[size]": 100}, + ) + + # Verify response parsing + assert isinstance(result, list) + assert len(result) == 2 + + # Check first resource + resource1 = result[0] + assert isinstance(resource1, WorkspaceResource) + assert resource1.id == "resource-1" + assert ( + resource1.address + == "media_bucket.aws_s3_bucket_public_access_block.this[0]" + ) + assert resource1.name == "this" + assert resource1.module == "media_bucket" + assert resource1.provider == "hashicorp/aws" + assert resource1.provider_type == "aws" + assert resource1.modified_by_state_version_id == "sv-abc123" + assert resource1.name_index == "0" + assert resource1.created_at == "2023-01-01T00:00:00Z" + assert resource1.updated_at == "2023-01-01T00:00:00Z" + + # Check second resource + resource2 = result[1] + assert resource2.id == "resource-2" + assert resource2.address == "aws_instance.example" + assert resource2.name == "example" + assert resource2.module == "root" + assert resource2.name_index is None + + def test_list_workspace_resources_with_options( + self, service, mock_transport, sample_workspace_resource_response + ): + """Test listing workspace resources with pagination options.""" + # Mock the transport response + mock_response = Mock() + mock_response.json.return_value = sample_workspace_resource_response + mock_transport.request.return_value = mock_response + + # Create options + options = WorkspaceResourceListOptions(page_number=2, page_size=50) + + # Call the service + result = list(service.list("ws-abc123", options)) + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/workspaces/ws-abc123/resources", + params={"page[number]": 2, "page[size]": 50}, + ) + + # Verify response + assert isinstance(result, list) + assert len(result) == 2 + + def test_list_workspace_resources_empty( + self, service, mock_transport, sample_empty_response + ): + """Test listing workspace resources when no resources exist.""" + # Mock the transport response + mock_response = Mock() + mock_response.json.return_value = sample_empty_response + mock_transport.request.return_value = mock_response + + # Call the service + result = list(service.list("ws-abc123")) + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/workspaces/ws-abc123/resources", + params={"page[number]": 1, "page[size]": 100}, + ) + + # Verify response + assert isinstance(result, list) + assert len(result) == 0 + + def test_list_workspace_resources_invalid_workspace_id(self, service): + """Test listing workspace resources with invalid workspace ID.""" + with pytest.raises(ValueError, match="workspace_id is required"): + list(service.list("")) + + with pytest.raises(ValueError, match="workspace_id is required"): + list(service.list(None)) + + def test_list_workspace_resources_url_encoding( + self, service, mock_transport, sample_workspace_resource_response + ): + """Test that workspace ID is properly URL encoded.""" + # Mock the transport response + mock_response = Mock() + mock_response.json.return_value = sample_workspace_resource_response + mock_transport.request.return_value = mock_response + + # Call with workspace ID that needs encoding + list(service.list("ws-abc/123")) + + # Verify the URL was properly encoded + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/workspaces/ws-abc%2F123/resources", + params={"page[number]": 1, "page[size]": 100}, + ) + + def test_list_workspace_resources_malformed_response(self, service, mock_transport): + """Test handling of malformed API response.""" + # Mock malformed response + mock_response = Mock() + mock_response.json.return_value = {"invalid": "response"} + mock_transport.request.return_value = mock_response + + # Call the service + result = list(service.list("ws-abc123")) + + # Should handle gracefully and return empty list + assert isinstance(result, list) + assert len(result) == 0 + + def test_list_workspace_resources_api_error(self, service, mock_transport): + """Test handling of API errors.""" + # Mock API error + mock_transport.request.side_effect = Exception("API Error") + + # Should propagate the exception + with pytest.raises(Exception, match="API Error"): + list(service.list("ws-abc123")) + + +class TestWorkspaceResourceModel: + """Test suite for WorkspaceResource model.""" + + def test_workspace_resource_creation(self): + """Test creating a WorkspaceResource instance.""" + resource = WorkspaceResource( + id="resource-1", + address="aws_instance.example", + name="example", + created_at="2023-01-01T00:00:00Z", + updated_at="2023-01-01T00:00:00Z", + module="root", + provider="hashicorp/aws", + provider_type="aws", + modified_by_state_version_id="sv-abc123", + name_index="0", + ) + + assert resource.id == "resource-1" + assert resource.address == "aws_instance.example" + assert resource.name == "example" + assert resource.module == "root" + assert resource.provider == "hashicorp/aws" + assert resource.provider_type == "aws" + assert resource.modified_by_state_version_id == "sv-abc123" + assert resource.name_index == "0" + + def test_workspace_resource_optional_fields(self): + """Test WorkspaceResource with optional fields.""" + resource = WorkspaceResource( + id="resource-1", + address="aws_instance.example", + name="example", + created_at="2023-01-01T00:00:00Z", + updated_at="2023-01-01T00:00:00Z", + module="root", + provider="hashicorp/aws", + provider_type="aws", + modified_by_state_version_id="sv-abc123", + # name_index is optional + ) + + assert resource.name_index is None + + +class TestWorkspaceResourceListOptions: + """Test suite for WorkspaceResourceListOptions model.""" + + def test_workspace_resource_list_options_creation(self): + """Test creating WorkspaceResourceListOptions.""" + options = WorkspaceResourceListOptions(page_number=2, page_size=50) + + assert options.page_number == 2 + assert options.page_size == 50 + + def test_workspace_resource_list_options_defaults(self): + """Test WorkspaceResourceListOptions with defaults.""" + options = WorkspaceResourceListOptions() + + # Should use default values from BaseListOptions + assert options.page_number is None + assert options.page_size is None diff --git a/tests/units/test_workspace_run_tasks.py b/tests/units/test_workspace_run_tasks.py new file mode 100644 index 0000000..b7ca496 --- /dev/null +++ b/tests/units/test_workspace_run_tasks.py @@ -0,0 +1,619 @@ +"""Unit tests for the workspace run tasks module.""" + +from unittest.mock import Mock + +import pytest + +from pytfe._http import HTTPTransport +from pytfe.errors import ( + InvalidWorkspaceIDError, + InvalidWorkspaceRunTaskIDError, +) +from pytfe.models.run_task import RunTask +from pytfe.models.workspace_run_task import ( + Stage, + TaskEnforcementLevel, + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskIncludeOpt, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskReadOptions, + WorkspaceRunTaskUpdateOptions, +) +from pytfe.resources.workspace_run_tasks import ( + WorkspaceRunTasksService, + _workspace_run_task_from, +) + + +class TestWorkspaceRunTaskFrom: + """Test the _workspace_run_task_from function.""" + + def test_workspace_run_task_from_comprehensive(self): + """Test _workspace_run_task_from with all fields populated.""" + data = { + "id": "wsrt-123456789", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "mandatory", + "stage": "post-plan", + "created-at": "2023-01-01T00:00:00Z", + "updated-at": "2023-01-02T00:00:00Z", + }, + "relationships": { + "workspace": {"data": {"id": "ws-abc123", "type": "workspaces"}}, + "task": {"data": {"id": "task-xyz789", "type": "tasks"}}, + }, + } + + result = _workspace_run_task_from(data) + + assert result.id == "wsrt-123456789" + assert result.type == "workspace-tasks" + assert result.enforcement_level == TaskEnforcementLevel.MANDATORY + assert result.stage == Stage.POST_PLAN + assert result.created_at == "2023-01-01T00:00:00Z" + assert result.updated_at == "2023-01-02T00:00:00Z" + assert result.workspace is not None + assert result.workspace.id == "ws-abc123" + assert result.run_task is not None + assert result.run_task.id == "task-xyz789" + + def test_workspace_run_task_from_minimal(self): + """Test _workspace_run_task_from with minimal required fields.""" + data = { + "id": "wsrt-minimal", + "attributes": { + "enforcement-level": "advisory", + "stage": "pre-plan", + }, + } + + result = _workspace_run_task_from(data) + + assert result.id == "wsrt-minimal" + assert result.type == "workspace-tasks" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY + assert result.stage == Stage.PRE_PLAN + assert result.created_at is None + assert result.updated_at is None + assert result.workspace is None + assert result.run_task is None + + def test_workspace_run_task_from_invalid_enum_values(self): + """Test _workspace_run_task_from with invalid enum values falls back to defaults.""" + data = { + "id": "wsrt-invalid", + "attributes": { + "enforcement-level": "invalid-level", + "stage": "invalid-stage", + }, + } + + result = _workspace_run_task_from(data) + + assert result.id == "wsrt-invalid" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY # Default + assert result.stage == Stage.POST_PLAN # Default + + def test_workspace_run_task_from_missing_attributes(self): + """Test _workspace_run_task_from handles missing attributes gracefully.""" + data = {"id": "wsrt-missing", "attributes": {}} + + result = _workspace_run_task_from(data) + + assert result.id == "wsrt-missing" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY # Default + assert result.stage == Stage.POST_PLAN # Default + + def test_workspace_run_task_from_none_relationships(self): + """Test _workspace_run_task_from handles None relationship data.""" + data = { + "id": "wsrt-none-rels", + "attributes": { + "enforcement-level": "mandatory", + "stage": "pre-apply", + }, + "relationships": { + "workspace": {"data": None}, + "run-task": {"data": None}, + }, + } + + result = _workspace_run_task_from(data) + + assert result.id == "wsrt-none-rels" + assert result.workspace is None + assert result.run_task is None + + +class TestWorkspaceRunTasksService: + """Test the WorkspaceRunTasksService class.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def service(self, mock_transport): + """Create a WorkspaceRunTasksService with mocked transport.""" + return WorkspaceRunTasksService(mock_transport) + + def test_list_valid_workspace_id(self, service, mock_transport): + """Test list method with valid workspace ID.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": [ + { + "id": "wsrt-1", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "mandatory", + "stage": "post-plan", + }, + }, + { + "id": "wsrt-2", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "advisory", + "stage": "pre-plan", + }, + }, + ] + } + mock_transport.request.return_value = mock_response + + # Mock the _list method from base class + service._list = Mock( + return_value=[ + { + "id": "wsrt-1", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "mandatory", + "stage": "post-plan", + }, + }, + { + "id": "wsrt-2", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "advisory", + "stage": "pre-plan", + }, + }, + ] + ) + + result = list(service.list("ws-valid123")) + + assert len(result) == 2 + assert result[0].id == "wsrt-1" + assert result[0].enforcement_level == TaskEnforcementLevel.MANDATORY + assert result[0].stage == Stage.POST_PLAN + assert result[1].id == "wsrt-2" + assert result[1].enforcement_level == TaskEnforcementLevel.ADVISORY + assert result[1].stage == Stage.PRE_PLAN + + service._list.assert_called_once_with( + "/api/v2/workspaces/ws-valid123/tasks", params={} + ) + + def test_list_with_options(self, service): + """Test list method with pagination and include options.""" + options = WorkspaceRunTaskListOptions( + page_number=2, + page_size=10, + include=[ + WorkspaceRunTaskIncludeOpt.RUN_TASK, + WorkspaceRunTaskIncludeOpt.WORKSPACE, + ], + ) + + # Mock the _list method + service._list = Mock(return_value=[]) + + list(service.list("ws-123", options=options)) + + service._list.assert_called_once_with( + "/api/v2/workspaces/ws-123/tasks", + params={ + "page[number]": 2, + "page[size]": 10, + "include": "run_task,workspace", + }, + ) + + def test_list_invalid_workspace_id(self, service): + """Test list method with invalid workspace ID.""" + with pytest.raises(InvalidWorkspaceIDError): + list(service.list("")) + + with pytest.raises(InvalidWorkspaceIDError): + list(service.list(" ")) + + def test_get_valid_ids(self, service, mock_transport): + """Test get method with valid IDs.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wsrt-get123", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "mandatory", + "stage": "post-apply", + "created-at": "2023-01-01T00:00:00Z", + }, + "relationships": { + "workspace": {"data": {"id": "ws-123", "type": "workspaces"}}, + "task": {"data": {"id": "task-456", "type": "tasks"}}, + }, + } + } + mock_transport.request.return_value = mock_response + + result = service.get("ws-123", "wsrt-get123") + + assert result.id == "wsrt-get123" + assert result.enforcement_level == TaskEnforcementLevel.MANDATORY + assert result.stage == Stage.POST_APPLY + assert result.created_at == "2023-01-01T00:00:00Z" + assert result.workspace.id == "ws-123" + assert result.run_task.id == "task-456" + + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/workspaces/ws-123/tasks/wsrt-get123", params={} + ) + + def test_get_with_include_options(self, service, mock_transport): + """Test get method with include options.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wsrt-include", + "type": "workspace-tasks", + "attributes": {"enforcement-level": "advisory", "stage": "pre-plan"}, + } + } + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskReadOptions( + include=[WorkspaceRunTaskIncludeOpt.RUN_TASK] + ) + + result = service.get("ws-123", "wsrt-include", options=options) + + assert result.id == "wsrt-include" + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/workspaces/ws-123/tasks/wsrt-include", + params={"include": "run_task"}, + ) + + def test_get_invalid_workspace_id(self, service): + """Test get method with invalid workspace ID.""" + with pytest.raises(InvalidWorkspaceIDError): + service.get("", "wsrt-123") + + def test_get_invalid_task_id(self, service): + """Test get method with invalid task ID.""" + with pytest.raises(InvalidWorkspaceRunTaskIDError): + service.get("ws-123", "") + + def test_create_success(self, service, mock_transport): + """Test create method success.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wsrt-created", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "mandatory", + "stage": "post-plan", + }, + "relationships": { + "run-task": {"data": {"id": "task-123", "type": "tasks"}}, + }, + } + } + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY, + stage=Stage.POST_PLAN, + run_task=RunTask(id="task-123"), + ) + + result = service.create("ws-123", options) + + assert result.id == "wsrt-created" + assert result.enforcement_level == TaskEnforcementLevel.MANDATORY + assert result.stage == Stage.POST_PLAN + + expected_data = { + "data": { + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "mandatory", + }, + "relationships": { + "task": {"data": {"type": "tasks", "id": "task-123"}} + }, + } + } + + mock_transport.request.assert_called_once_with( + "POST", "/api/v2/workspaces/ws-123/tasks", json_body=expected_data + ) + + def test_create_minimal_options(self, service, mock_transport): + """Test create method with minimal options (no stage).""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wsrt-minimal", + "type": "workspace-tasks", + "attributes": {"enforcement-level": "advisory"}, + } + } + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY, + run_task=RunTask(id="task-456"), + ) + + result = service.create("ws-456", options) + + assert result.id == "wsrt-minimal" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY + + expected_data = { + "data": { + "type": "workspace-tasks", + "attributes": {"enforcement-level": "advisory"}, + "relationships": { + "task": {"data": {"type": "tasks", "id": "task-456"}} + }, + } + } + + mock_transport.request.assert_called_once_with( + "POST", "/api/v2/workspaces/ws-456/tasks", json_body=expected_data + ) + + def test_create_invalid_workspace_id(self, service): + """Test create method with invalid workspace ID.""" + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY, + run_task=RunTask(id="task-123"), + ) + + with pytest.raises(InvalidWorkspaceIDError): + service.create("", options) + + def test_update_success(self, service, mock_transport): + """Test update method success.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wsrt-updated", + "type": "workspace-tasks", + "attributes": { + "enforcement-level": "advisory", + "stage": "pre-apply", + }, + } + } + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY, stage=Stage.PRE_APPLY + ) + + result = service.update("ws-123", "wsrt-update", options) + + assert result.id == "wsrt-updated" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY + assert result.stage == Stage.PRE_APPLY + + expected_data = { + "data": { + "type": "workspace-tasks", + "id": "wsrt-update", + "attributes": { + "enforcement-level": "advisory", + "stage": "pre-apply", + }, + } + } + + mock_transport.request.assert_called_once_with( + "PATCH", + "/api/v2/workspaces/ws-123/tasks/wsrt-update", + json_body=expected_data, + ) + + def test_update_partial_options(self, service, mock_transport): + """Test update method with partial options.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wsrt-partial", + "type": "workspace-tasks", + "attributes": {"enforcement-level": "mandatory"}, + } + } + mock_transport.request.return_value = mock_response + + # Only update enforcement level, not stage + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY + ) + + result = service.update("ws-789", "wsrt-partial", options) + + assert result.id == "wsrt-partial" + + expected_data = { + "data": { + "type": "workspace-tasks", + "id": "wsrt-partial", + "attributes": {"enforcement-level": "mandatory"}, + } + } + + mock_transport.request.assert_called_once_with( + "PATCH", + "/api/v2/workspaces/ws-789/tasks/wsrt-partial", + json_body=expected_data, + ) + + def test_update_invalid_workspace_id(self, service): + """Test update method with invalid workspace ID.""" + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY + ) + + with pytest.raises(InvalidWorkspaceIDError): + service.update("", "wsrt-123", options) + + def test_update_invalid_task_id(self, service): + """Test update method with invalid task ID.""" + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY + ) + + with pytest.raises(InvalidWorkspaceRunTaskIDError): + service.update("ws-123", "", options) + + def test_delete_success(self, service, mock_transport): + """Test delete method success.""" + # DELETE requests typically return no content + mock_response = Mock() + mock_response.json.return_value = None + mock_transport.request.return_value = mock_response + + # Should not raise any exception + service.delete("ws-123", "wsrt-delete") + + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/workspaces/ws-123/tasks/wsrt-delete" + ) + + def test_delete_invalid_workspace_id(self, service): + """Test delete method with invalid workspace ID.""" + with pytest.raises(InvalidWorkspaceIDError): + service.delete("", "wsrt-123") + + def test_delete_invalid_task_id(self, service): + """Test delete method with invalid task ID.""" + with pytest.raises(InvalidWorkspaceRunTaskIDError): + service.delete("ws-123", "") + + +class TestWorkspaceRunTaskModels: + """Test workspace run task model classes.""" + + def test_workspace_run_task_creation(self): + """Test WorkspaceRunTask model creation.""" + from pytfe.models.run_task import RunTask + from pytfe.models.workspace import Workspace + + workspace = Workspace(id="ws-123") + run_task = RunTask( + id="task-456", + name="Test Task", + url="https://example.com/webhook", + category="test", + enabled=True, + ) + + task = WorkspaceRunTask( + id="wsrt-model123", + enforcement_level=TaskEnforcementLevel.MANDATORY, + stage=Stage.PRE_PLAN, + created_at="2023-01-01T00:00:00Z", + updated_at="2023-01-02T00:00:00Z", + workspace=workspace, + run_task=run_task, + ) + + assert task.id == "wsrt-model123" + assert task.type == "workspace-tasks" # Default value + assert task.enforcement_level == TaskEnforcementLevel.MANDATORY + assert task.stage == Stage.PRE_PLAN + assert task.created_at == "2023-01-01T00:00:00Z" + assert task.updated_at == "2023-01-02T00:00:00Z" + assert task.workspace == workspace + assert task.run_task == run_task + + def test_workspace_run_task_create_options(self): + """Test WorkspaceRunTaskCreateOptions model.""" + run_task_ref = RunTask(id="task-123") + + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY, + stage=Stage.POST_APPLY, + run_task=run_task_ref, + ) + + assert options.type == "workspace-tasks" + assert options.enforcement_level == TaskEnforcementLevel.ADVISORY + assert options.stage == Stage.POST_APPLY + assert options.run_task == run_task_ref + + def test_workspace_run_task_update_options(self): + """Test WorkspaceRunTaskUpdateOptions model.""" + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY, + stage=Stage.PRE_PLAN, + ) + + assert options.type == "workspace-tasks" + assert options.enforcement_level == TaskEnforcementLevel.MANDATORY + assert options.stage == Stage.PRE_PLAN + + def test_workspace_run_task_list_options(self): + """Test WorkspaceRunTaskListOptions model.""" + options = WorkspaceRunTaskListOptions( + page_number=3, + page_size=25, + include=[WorkspaceRunTaskIncludeOpt.RUN_TASK], + ) + + assert options.page_number == 3 + assert options.page_size == 25 + assert options.include == [WorkspaceRunTaskIncludeOpt.RUN_TASK] + + def test_workspace_run_task_read_options(self): + """Test WorkspaceRunTaskReadOptions model.""" + options = WorkspaceRunTaskReadOptions( + include=[ + WorkspaceRunTaskIncludeOpt.RUN_TASK, + WorkspaceRunTaskIncludeOpt.WORKSPACE, + ] + ) + + assert len(options.include) == 2 + assert WorkspaceRunTaskIncludeOpt.RUN_TASK in options.include + assert WorkspaceRunTaskIncludeOpt.WORKSPACE in options.include + + def test_stage_enum_values(self): + """Test Stage enum values.""" + assert Stage.PRE_PLAN.value == "pre-plan" + assert Stage.POST_PLAN.value == "post-plan" + assert Stage.PRE_APPLY.value == "pre-apply" + assert Stage.POST_APPLY.value == "post-apply" + + def test_task_enforcement_level_enum_values(self): + """Test TaskEnforcementLevel enum values.""" + assert TaskEnforcementLevel.ADVISORY.value == "advisory" + assert TaskEnforcementLevel.MANDATORY.value == "mandatory" + + def test_workspace_run_task_include_opt_enum_values(self): + """Test WorkspaceRunTaskIncludeOpt enum values.""" + assert WorkspaceRunTaskIncludeOpt.RUN_TASK.value == "run_task" + assert WorkspaceRunTaskIncludeOpt.WORKSPACE.value == "workspace"