From 7d27d98f3a9be6ecf28ff598713a6c7e6ccea32a Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Fri, 7 Feb 2025 21:35:20 +0000 Subject: [PATCH] add examples and lib --- .devcontainer/Dockerfile | 5 +- .devcontainer/devcontainer.json | 3 + examples/anthropic_tool_use.py | 140 ++++++++++++++++++ examples/fs_access.py | 109 ++++++++++++++ examples/run_command.py | 64 ++++++++ examples/run_service.py | 80 ++++++++++ pyproject.toml | 3 + requirements-dev.lock | 24 +++ requirements.lock | 13 ++ src/gitpod/lib/__init__.py | 12 ++ src/gitpod/lib/automation.py | 156 ++++++++++++++++++++ src/gitpod/lib/disposables.py | 50 +++++++ src/gitpod/lib/environment.py | 252 ++++++++++++++++++++++++++++++++ 13 files changed, 908 insertions(+), 3 deletions(-) create mode 100755 examples/anthropic_tool_use.py create mode 100755 examples/fs_access.py create mode 100755 examples/run_command.py create mode 100755 examples/run_service.py create mode 100644 src/gitpod/lib/__init__.py create mode 100644 src/gitpod/lib/automation.py create mode 100644 src/gitpod/lib/disposables.py create mode 100644 src/gitpod/lib/environment.py diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index ac9a2e7..14aa9d4 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,9 +1,8 @@ -ARG VARIANT="3.9" -FROM mcr.microsoft.com/vscode/devcontainers/python:0-${VARIANT} +FROM mcr.microsoft.com/devcontainers/python:3.12 USER vscode RUN curl -sSf https://rye.astral.sh/get | RYE_VERSION="0.35.0" RYE_INSTALL_OPTION="--yes" bash ENV PATH=/home/vscode/.rye/shims:$PATH -RUN echo "[[ -d .venv ]] && source .venv/bin/activate" >> /home/vscode/.bashrc +RUN echo "[[ -d .venv ]] && source .venv/bin/activate || export PATH=\$PATH" >> /home/vscode/.bashrc diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index bbeb30b..c17fdc1 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -24,6 +24,9 @@ } } } + }, + "features": { + "ghcr.io/devcontainers/features/node:1": {} } // Features to add to the dev container. More info: https://containers.dev/features. diff --git a/examples/anthropic_tool_use.py b/examples/anthropic_tool_use.py new file mode 100755 index 0000000..ffb5c31 --- /dev/null +++ b/examples/anthropic_tool_use.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python + +from __future__ import annotations + +from typing import cast + +from anthropic import Anthropic +from anthropic.types import ToolParam, MessageParam + +import gitpod.lib as util +from gitpod import AsyncGitpod +from gitpod.types.environment_initializer_param import Spec + +gpclient = AsyncGitpod() +llmclient = Anthropic() + +user_message: MessageParam = { + "role": "user", + "content": "What is the test coverage for this repository: https://github.com/gitpod-io/gitpod-sdk-go", +} +tools: list[ToolParam] = [ + { + "name": "create_environment", + "description": "Create a new environment for a given context URL. This will create a new environment and return the ID of the environment.", + "input_schema": { + "type": "object", + "properties": {"context_url": {"type": "string"}}, + }, + }, + { + "name": "execute_command", + "description": "Execute a command in a given environment ID. This will execute the command in the given environment and return the output of the command.", + "input_schema": { + "type": "object", + "properties": {"environment_id": {"type": "string"}, "command": {"type": "string"}}, + }, + }, +] + +async def create_environment(args: dict[str, str], cleanup: util.Disposables) -> str: + env_class = await util.find_most_used_environment_class(gpclient) + if not env_class: + raise Exception("No environment class found. Please create one first.") + env_class_id = env_class.id + assert env_class_id is not None + + environment = (await gpclient.environments.create( + spec={ + "desired_phase": "ENVIRONMENT_PHASE_RUNNING", + "content": { + "initializer": {"specs": [Spec( + context_url={ + "url": args["context_url"] + } + )]}, + }, + "machine": {"class": env_class_id}, + } + )).environment + assert environment is not None + environment_id = environment.id + assert environment_id is not None + cleanup.add(lambda: asyncio.run(gpclient.environments.delete(environment_id=environment_id))) + + print(f"\nCreated environment: {environment_id} - waiting for it to be ready...") + await util.wait_for_environment_ready(gpclient, environment_id) + print(f"\nEnvironment is ready: {environment_id}") + return environment_id + +async def execute_command(args: dict[str, str]) -> str: + lines_iter = await util.run_command(gpclient, args["environment_id"], args["command"]) + lines: list[str] = [] + async for line in lines_iter: + lines.append(line) + return "\n".join(lines) + +async def main(cleanup: util.Disposables) -> None: + messages = [user_message] + while True: + message = llmclient.messages.create( + model="claude-3-5-sonnet-latest", + max_tokens=1024, + messages=messages, + tools=tools, + ) + print(f"\nResponse: {message.model_dump_json(indent=2)}") + + if message.stop_reason != "tool_use": + print(f"\nFinal response reached! {message.model_dump_json(indent=2)}") + break + + messages.extend([ + {"role": message.role, "content": message.content} + ]) + + # Handle all tool calls in this response + for tool in (c for c in message.content if c.type == "tool_use"): + try: + if tool.name == "create_environment": + args = cast(dict[str, str], tool.input) + environment_id = await create_environment(args, cleanup) + messages.append({ + "role": "user", + "content": [{ + "type": "tool_result", + "tool_use_id": tool.id, + "content": [{"type": "text", "text": f"The environment ID is {environment_id}"}], + }], + }) + elif tool.name == "execute_command": + args = cast(dict[str, str], tool.input) + output = await execute_command(args) + messages.append({ + "role": "user", + "content": [{ + "type": "tool_result", + "tool_use_id": tool.id, + "content": [{"type": "text", "text": output}], + }], + }) + else: + raise Exception(f"Unknown tool: {tool.name}") + except Exception as e: + messages.append({ + "role": "user", + "content": [{ + "type": "tool_result", + "tool_use_id": tool.id, + "is_error": True, + "content": [{"type": "text", "text": f"Error: {e}"}], + }], + }) + + print("\nFinal response reached!") + +if __name__ == "__main__": + import asyncio + disposables = util.Disposables() + with disposables: + asyncio.run(main(disposables)) diff --git a/examples/fs_access.py b/examples/fs_access.py new file mode 100755 index 0000000..24ddf66 --- /dev/null +++ b/examples/fs_access.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python + +import sys +import asyncio +from io import StringIO + +import paramiko + +import gitpod.lib as util +from gitpod import AsyncGitpod +from gitpod.types.environment_spec_param import EnvironmentSpecParam +from gitpod.types.environment_initializer_param import Spec + + +# Examples: +# - ./examples/fs_access.py +# - ./examples/fs_access.py https://github.com/gitpod-io/empty +async def main(cleanup: util.Disposables) -> None: + client = AsyncGitpod() + + context_url = sys.argv[1] if len(sys.argv) > 1 else None + + env_class = await util.find_most_used_environment_class(client) + if not env_class: + print("Error: No environment class found. Please create one first.") + sys.exit(1) + print(f"Found environment class: {env_class.display_name} ({env_class.description})") + env_class_id = env_class.id + assert env_class_id is not None + + print("Generating SSH key pair") + key = paramiko.RSAKey.generate(2048) + private_key_file = StringIO() + key.write_private_key(private_key_file) + private_key_file.seek(0) # Reset position to start + public_key = f"{key.get_name()} {key.get_base64()}" + + print("Creating environment with SSH access") + key_id = "fs-access-example" + spec: EnvironmentSpecParam = { + "desired_phase": "ENVIRONMENT_PHASE_RUNNING", + "machine": {"class": env_class_id}, + "ssh_public_keys": [{ + "id": key_id, + "value": public_key + }] + } + if context_url: + spec["content"] = { + "initializer": {"specs": [Spec( + context_url={ + "url": context_url + } + )]} + } + + print("Creating environment") + environment = (await client.environments.create(spec=spec)).environment + assert environment is not None + environment_id = environment.id + assert environment_id is not None + cleanup.add(lambda: asyncio.run(client.environments.delete(environment_id=environment_id))) + + env = util.EnvironmentState(client, environment_id) + cleanup.add(lambda: asyncio.run(env.close())) + + print("Waiting for environment to be running") + await env.wait_until_running() + + print("Waiting for SSH key to be applied") + await env.wait_for_ssh_key_applied(key_id=key_id, key_value=public_key) + + print("Waiting for SSH URL") + ssh_url = await env.wait_for_ssh_url() + + print(f"Setting up SSH connection to {ssh_url}") + # Parse ssh://username@host:port format + url_parts = ssh_url.split('://')[-1] + username, rest = url_parts.split('@') + host, port_str = rest.split(':') + port = int(port_str) + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect( + hostname=host, + port=port, + username=username, + pkey=key + ) + cleanup.add(lambda: ssh.close()) + + print("Creating SFTP client") + sftp = ssh.open_sftp() + cleanup.add(lambda: sftp.close()) + + print("Writing test file") + test_content = "Hello from Gitpod Python SDK!" + with sftp.file('test.txt', 'w') as f: + f.write(test_content) + + with sftp.file('test.txt', 'r') as f: + content = f.read() + print(f"File content: {content.decode()}") + +if __name__ == "__main__": + disposables = util.Disposables() + with disposables: + asyncio.run(main(disposables)) \ No newline at end of file diff --git a/examples/run_command.py b/examples/run_command.py new file mode 100755 index 0000000..9c63b12 --- /dev/null +++ b/examples/run_command.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +import sys +import asyncio + +import gitpod.lib as util +from gitpod import AsyncGitpod +from gitpod.types.environment_spec_param import EnvironmentSpecParam +from gitpod.types.environment_initializer_param import Spec + + +# Examples: +# - ./examples/run_command.py 'echo "Hello World!"' +# - ./examples/run_command.py 'echo "Hello World!"' https://github.com/gitpod-io/empty +async def main(cleanup: util.Disposables) -> None: + client = AsyncGitpod() + + if len(sys.argv) < 2: + print("Usage: ./examples/run_command.py '' [CONTEXT_URL]") + sys.exit(1) + + command = sys.argv[1] + context_url = sys.argv[2] if len(sys.argv) > 2 else None + + env_class = await util.find_most_used_environment_class(client) + if not env_class: + print("Error: No environment class found. Please create one first.") + sys.exit(1) + print(f"Found environment class: {env_class.display_name} ({env_class.description})") + env_class_id = env_class.id + assert env_class_id is not None + + spec: EnvironmentSpecParam = { + "desired_phase": "ENVIRONMENT_PHASE_RUNNING", + "machine": {"class": env_class_id}, + } + if context_url: + spec["content"] = { + "initializer": {"specs": [Spec( + context_url={ + "url": context_url + } + )]} + } + + print("Creating environment") + environment = (await client.environments.create(spec=spec)).environment + assert environment is not None + environment_id = environment.id + assert environment_id is not None + cleanup.add(lambda: asyncio.run(client.environments.delete(environment_id=environment_id))) + + print("Waiting for environment to be ready") + await util.wait_for_environment_ready(client, environment_id) + + print("Running command") + lines = await util.run_command(client, environment_id, command) + async for line in lines: + print(line) + +if __name__ == "__main__": + disposables = util.Disposables() + with disposables: + asyncio.run(main(disposables)) diff --git a/examples/run_service.py b/examples/run_service.py new file mode 100755 index 0000000..d51d889 --- /dev/null +++ b/examples/run_service.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +import sys +import asyncio + +import gitpod.lib as util +from gitpod import AsyncGitpod +from gitpod.types.environment_spec_param import EnvironmentSpecParam +from gitpod.types.environment_initializer_param import Spec + + +# Examples: +# - ./examples/run_service.py +# - ./examples/run_service.py https://github.com/gitpod-io/empty +async def main(cleanup: util.Disposables) -> None: + client = AsyncGitpod() + + context_url = sys.argv[1] if len(sys.argv) > 1 else None + + env_class = await util.find_most_used_environment_class(client) + if not env_class: + print("Error: No environment class found. Please create one first.") + sys.exit(1) + print(f"Found environment class: {env_class.display_name} ({env_class.description})") + env_class_id = env_class.id + assert env_class_id is not None + + port = 8888 + spec: EnvironmentSpecParam = { + "desired_phase": "ENVIRONMENT_PHASE_RUNNING", + "machine": {"class": env_class_id}, + "ports": [{ + "name": "Lama Service", + "port": port, + "admission": "ADMISSION_LEVEL_EVERYONE" + }] + } + if context_url: + spec["content"] = { + "initializer": {"specs": [Spec( + context_url={ + "url": context_url + } + )]} + } + + print("Creating environment") + environment = (await client.environments.create(spec=spec)).environment + assert environment is not None + environment_id = environment.id + assert environment_id is not None + cleanup.add(lambda: asyncio.run(client.environments.delete(environment_id=environment_id))) + + print("Waiting for environment to be ready") + env = util.EnvironmentState(client, environment_id) + cleanup.add(lambda: asyncio.run(env.close())) + await env.wait_until_running() + + print("Starting Lama Service") + lines = await util.run_service(client, environment_id, { + "name":"Lama Service", + "description":"Lama Service", + "reference":"lama-service" + }, { + "commands": { + "start":f"curl lama.sh | LAMA_PORT={port} sh", + "ready":f"curl -s http://localhost:{port}" + } + }) + + port_url = await env.wait_for_port_url(port) + print(f"Lama Service is running at {port_url}") + + async for line in lines: + print(line) + +if __name__ == "__main__": + disposables = util.Disposables() + with disposables: + asyncio.run(main(disposables)) diff --git a/pyproject.toml b/pyproject.toml index 7c84667..76c3f69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,9 @@ dev-dependencies = [ "importlib-metadata>=6.7.0", "rich>=13.7.1", "nest_asyncio==1.6.0", + "paramiko>=3.5.1", + "anthropic>=0.45.2", + "types-paramiko>=3.5.0.20240928", ] [tool.rye.scripts] diff --git a/requirements-dev.lock b/requirements-dev.lock index cc60f65..4ec4622 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -11,20 +11,31 @@ -e file:. annotated-types==0.6.0 # via pydantic +anthropic==0.45.2 anyio==4.4.0 + # via anthropic # via gitpod-sdk # via httpx argcomplete==3.1.2 # via nox +bcrypt==4.2.1 + # via paramiko certifi==2023.7.22 # via httpcore # via httpx +cffi==1.17.1 + # via cryptography + # via pynacl colorlog==6.7.0 # via nox +cryptography==44.0.1 + # via paramiko + # via types-paramiko dirty-equals==0.6.0 distlib==0.3.7 # via virtualenv distro==1.8.0 + # via anthropic # via gitpod-sdk exceptiongroup==1.2.2 # via anyio @@ -36,6 +47,7 @@ h11==0.14.0 httpcore==1.0.2 # via httpx httpx==0.28.1 + # via anthropic # via gitpod-sdk # via respx idna==3.4 @@ -44,6 +56,8 @@ idna==3.4 importlib-metadata==7.0.0 iniconfig==2.0.0 # via pytest +jiter==0.8.2 + # via anthropic markdown-it-py==3.0.0 # via rich mdurl==0.1.2 @@ -58,16 +72,23 @@ nox==2023.4.22 packaging==23.2 # via nox # via pytest +paramiko==3.5.1 + # via gitpod-sdk platformdirs==3.11.0 # via virtualenv pluggy==1.5.0 # via pytest +pycparser==2.22 + # via cffi pydantic==2.10.3 + # via anthropic # via gitpod-sdk pydantic-core==2.27.1 # via pydantic pygments==2.18.0 # via rich +pynacl==1.5.0 + # via paramiko pyright==1.1.392.post0 pytest==8.3.3 # via pytest-asyncio @@ -84,13 +105,16 @@ setuptools==68.2.2 six==1.16.0 # via python-dateutil sniffio==1.3.0 + # via anthropic # via anyio # via gitpod-sdk time-machine==2.9.0 tomli==2.0.2 # via mypy # via pytest +types-paramiko==3.5.0.20240928 typing-extensions==4.12.2 + # via anthropic # via anyio # via gitpod-sdk # via mypy diff --git a/requirements.lock b/requirements.lock index 65e7618..f8d5205 100644 --- a/requirements.lock +++ b/requirements.lock @@ -14,9 +14,16 @@ annotated-types==0.6.0 anyio==4.4.0 # via gitpod-sdk # via httpx +bcrypt==4.2.1 + # via paramiko certifi==2023.7.22 # via httpcore # via httpx +cffi==1.17.1 + # via cryptography + # via pynacl +cryptography==44.0.1 + # via paramiko distro==1.8.0 # via gitpod-sdk exceptiongroup==1.2.2 @@ -30,10 +37,16 @@ httpx==0.28.1 idna==3.4 # via anyio # via httpx +paramiko==3.5.1 + # via gitpod-sdk +pycparser==2.22 + # via cffi pydantic==2.10.3 # via gitpod-sdk pydantic-core==2.27.1 # via pydantic +pynacl==1.5.0 + # via paramiko sniffio==1.3.0 # via anyio # via gitpod-sdk diff --git a/src/gitpod/lib/__init__.py b/src/gitpod/lib/__init__.py new file mode 100644 index 0000000..be9bc21 --- /dev/null +++ b/src/gitpod/lib/__init__.py @@ -0,0 +1,12 @@ +from .automation import run_command, run_service +from .disposables import Disposables +from .environment import EnvironmentState, wait_for_environment_ready, find_most_used_environment_class + +__all__ = [ + 'find_most_used_environment_class', + 'run_command', + 'run_service', + 'EnvironmentState', + 'Disposables', + 'wait_for_environment_ready', +] \ No newline at end of file diff --git a/src/gitpod/lib/automation.py b/src/gitpod/lib/automation.py new file mode 100644 index 0000000..c0022de --- /dev/null +++ b/src/gitpod/lib/automation.py @@ -0,0 +1,156 @@ +import asyncio +from typing import Callable, Optional, Awaitable, AsyncIterator + +import httpx + +from gitpod._client import AsyncGitpod +from gitpod.types.environments.automations import service_create_params + +TASK_REFERENCE = "gitpod-python-sdk" + +async def run_service( + client: AsyncGitpod, + environment_id: str, + metadata: service_create_params.ServiceMetadataParam, + spec: service_create_params.ServiceSpecParam +) -> AsyncIterator[str]: + reference = metadata.get("reference") + if not reference: + raise ValueError("metadata.reference is required") + + services = (await client.environments.automations.services.list( + filter={ + "references": [reference], + "environment_ids": [environment_id] + } + )).services + + service_id: Optional[str] = None + if not services: + service = (await client.environments.automations.services.create( + environment_id=environment_id, + spec=spec, + metadata=metadata + )).service + assert service is not None + service_id = service.id + else: + service_id = services[0].id + assert service_id is not None + + await client.environments.automations.services.start(id=service_id) + log_url = await wait_for_service_log_url(client, environment_id, service_id) + return stream_logs(client, environment_id, log_url) + +async def run_command(client: AsyncGitpod, environment_id: str, command: str) -> AsyncIterator[str]: + tasks = (await client.environments.automations.tasks.list( + filter={ + "references": [TASK_REFERENCE], + "environment_ids": [environment_id] + } + )).tasks + + task_id: Optional[str] = None + if not tasks: + task = (await client.environments.automations.tasks.create( + spec={ + "command": command, + }, + environment_id=environment_id, + metadata={ + "name": "Gitpod Python SDK Task", + "description": "Gitpod Python SDK Task", + "reference": TASK_REFERENCE, + }, + )).task + assert task is not None + task_id = task.id + else: + task_id = tasks[0].id + assert task_id is not None + await client.environments.automations.tasks.update( + id=task_id, + spec={ + "command": command, + }, + ) + assert task_id is not None + task_execution = (await client.environments.automations.tasks.start(id=task_id)).task_execution + assert task_execution is not None + task_execution_id = task_execution.id + assert task_execution_id is not None + log_url = await wait_for_task_log_url(client, environment_id, task_execution_id) + return stream_logs(client, environment_id, log_url) + +async def wait_for_task_log_url(client: AsyncGitpod, environment_id: str, task_execution_id: str) -> str: + async def get_log_url() -> Optional[str]: + execution = (await client.environments.automations.tasks.executions.retrieve(id=task_execution_id)).task_execution + if not execution or not execution.status: + return None + return execution.status.log_url + + return await wait_for_log_url(client, environment_id, task_execution_id, get_log_url, "RESOURCE_TYPE_TASK_EXECUTION") + +async def wait_for_service_log_url(client: AsyncGitpod, environment_id: str, service_id: str) -> str: + async def get_log_url() -> Optional[str]: + service = (await client.environments.automations.services.retrieve(id=service_id)).service + if not service or not service.status: + return None + if service.status.phase != "SERVICE_PHASE_RUNNING": + return None + return service.status.log_url + + return await wait_for_log_url(client, environment_id, service_id, get_log_url, "RESOURCE_TYPE_SERVICE") + +async def wait_for_log_url(client: AsyncGitpod, environment_id: str, resource_id: str, get_log_url_fn: Callable[[], Awaitable[Optional[str]]], resource_type: str) -> str: + log_url = await get_log_url_fn() + if log_url: + return log_url + + event_stream = await client.events.watch(environment_id=environment_id, timeout=None) + try: + log_url = await get_log_url_fn() + if log_url: + return log_url + + async for event in event_stream: + if event.resource_type == resource_type and event.resource_id == resource_id: + log_url = await get_log_url_fn() + if log_url is not None: + return log_url + finally: + await event_stream.http_response.aclose() + + raise Exception("Failed to get log URL") + +async def stream_logs(client: AsyncGitpod, environment_id: str, log_url: str) -> AsyncIterator[str]: + logs_access_token = (await client.environments.create_logs_token(environment_id=environment_id)).access_token + async with httpx.AsyncClient() as http_client: + retries = 3 + while retries > 0: + try: + async with http_client.stream("GET", log_url, headers={"Authorization": f"Bearer {logs_access_token}"}, timeout=None) as response: + if response.status_code == 502: # Bad Gateway + retries -= 1 + if retries == 0: + raise Exception("Failed to stream logs after 3 retries") + await asyncio.sleep(1) # Wait before retrying + continue + + buffer = "" + async for chunk in response.aiter_text(): + buffer += chunk + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + if line: + yield line + if buffer: + yield buffer + break # Success - exit retry loop + + except httpx.HTTPError as e: + if retries > 0 and (isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 502): + retries -= 1 + await asyncio.sleep(1) # Wait before retrying + continue + raise # Re-raise if not a 502 or out of retries \ No newline at end of file diff --git a/src/gitpod/lib/disposables.py b/src/gitpod/lib/disposables.py new file mode 100644 index 0000000..55bed4d --- /dev/null +++ b/src/gitpod/lib/disposables.py @@ -0,0 +1,50 @@ +import logging +from typing import Any, List, Callable + +log = logging.getLogger(__name__) + +class Disposables: + """A utility class to manage cleanup actions (disposables) in a LIFO order. + + Example: + ```python + with Disposables() as disposables: + # Add cleanup actions + disposables.add(lambda: cleanup_something()) + disposables.add(lambda: cleanup_something_else()) + + # Do work that needs cleanup + do_something() + do_something_else() + # Cleanup actions will be executed in reverse order when exiting the context + ``` + """ + + def __init__(self) -> None: + self._actions: List[Callable[[], Any]] = [] + + def add(self, action: Callable[[], Any]) -> None: + """Add a cleanup action to be executed when the context exits. + + Args: + action: A callable that performs cleanup when called + """ + self._actions.append(action) + + def __enter__(self) -> 'Disposables': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.cleanup() + + def cleanup(self) -> None: + """Execute all cleanup actions in reverse order. + + If any cleanup action raises an exception, it will be logged but won't prevent + other cleanup actions from executing. + """ + for action in reversed(self._actions): + try: + action() + except BaseException: + log.exception("cleanup action failed") \ No newline at end of file diff --git a/src/gitpod/lib/environment.py b/src/gitpod/lib/environment.py new file mode 100644 index 0000000..08c2e47 --- /dev/null +++ b/src/gitpod/lib/environment.py @@ -0,0 +1,252 @@ +import asyncio +import logging +from typing import List, TypeVar, Callable, Optional + +from gitpod import AsyncGitpod +from gitpod.types.shared import EnvironmentClass +from gitpod.types.environment import Environment + +T = TypeVar('T') + +log = logging.getLogger(__name__) + +class EnvironmentState: + """ + Maintains the current state of an environment and updates it via event stream. + Uses simple threading and callbacks for state updates. + """ + def __init__(self, client: AsyncGitpod, environment_id: str): + self.client = client + self.environment_id = environment_id + self._ready = asyncio.Event() + self._environment: Optional[Environment] = None + + # State management + self._listeners: List[Callable[[Environment], None]] = [] + self._should_stop = False + self.mutex = asyncio.Lock() + + # Start background task + loop = asyncio.get_event_loop() + self._update_task: asyncio.Task[None] = loop.create_task(self._start_update_loop()) + + async def get_environment(self) -> Environment: + await self._ready.wait() + assert self._environment is not None + return self._environment + + async def _update_environment(self) -> None: + try: + resp = await self.client.environments.retrieve(environment_id=self.environment_id) + env = resp.environment + assert env is not None + self._environment = env + self._ready.set() + for listener in list(self._listeners): + try: + listener(env) + except Exception: + log.exception("failed to call listener") + except BaseException: + log.exception("failed to update environment") + + async def _start_update_loop(self) -> None: + """Background coroutine that maintains the event stream""" + retry_delay = 1.0 # Initial retry delay in seconds + max_delay = 32.0 # Maximum retry delay + + await self._update_environment() + + while not self._should_stop: + try: + async with self.mutex: + if self._should_stop: + return # type: ignore[unreachable] + + event_stream = await self.client.events.watch(environment_id=self.environment_id, timeout=None) + self.event_stream = event_stream + + retry_delay = 1.0 # Reset delay on successful connection + if self._should_stop: + return # type: ignore[unreachable] + + # Immediately do one update so we don't miss changes + await self._update_environment() + if self._should_stop: + return # type: ignore[unreachable] + + try: + async for event in event_stream: + if self._should_stop: + return # type: ignore[unreachable] + + if event.resource_type == "RESOURCE_TYPE_ENVIRONMENT" and event.resource_id == self.environment_id: + await self._update_environment() + finally: + await event_stream.http_response.aclose() + + except BaseException as e: + if self._should_stop or isinstance(e, asyncio.CancelledError): + return + + log.exception("error in event stream, retrying in %s seconds", retry_delay) + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, max_delay) + + async def close(self) -> None: + """Stop the update loop and clean up resources""" + async with self.mutex: + if self._should_stop: + return + self._should_stop = True + + if self.event_stream: + await self.event_stream.http_response.aclose() + + await self._update_task + + async def wait_until(self, check_fn: Callable[[Environment], Optional[T]]) -> T: + """Wait until check function returns a value + + Args: + check_fn: Function that checks the environment and returns a value when condition is met + + Returns: + The value returned by check_fn + """ + # Check current state first + initial_env = await self.get_environment() + initial_check = check_fn(initial_env) + if initial_check is not None: + return initial_check + + event = asyncio.Event() + result: Optional[T] = None + + def listener(env: Environment) -> None: + nonlocal result + check_result = check_fn(env) + if check_result is not None: + result = check_result + event.set() + + self._listeners.append(listener) + try: + await event.wait() + if result is None: + raise RuntimeError("wait_until completed but result is None") + return result + finally: + self._listeners.remove(listener) + + def is_running(self, env: Environment) -> bool: + """Check if environment is ready""" + if not env.status: + return False + + if env.status.phase in ["ENVIRONMENT_PHASE_STOPPING", "ENVIRONMENT_PHASE_STOPPED", + "ENVIRONMENT_PHASE_DELETING", "ENVIRONMENT_PHASE_DELETED"]: + raise RuntimeError(f"Environment {env.id} is in unexpected phase: {env.status.phase}") + elif env.status.failure_message: + raise RuntimeError(f"Environment {env.id} failed: {'; '.join(env.status.failure_message)}") + + return env.status.phase == "ENVIRONMENT_PHASE_RUNNING" + + def get_ssh_url(self, env: Environment) -> Optional[str]: + """Get SSH URL if available""" + if not env.status or not env.status.environment_urls or not env.status.environment_urls.ssh: + return None + return env.status.environment_urls.ssh.url + + def get_port_url(self, env: Environment, port: int) -> Optional[str]: + """Get port URL if available""" + if not env.status or not env.status.environment_urls or not env.status.environment_urls.ports: + return None + + for p in env.status.environment_urls.ports: + if p.port == port: + return p.url + return None + + def check_ssh_key_applied(self, env: Environment, key_id: str, key_value: str) -> bool: + """Check if SSH key is applied""" + if not env.spec or not env.spec.ssh_public_keys: + return False + + key = next((k for k in env.spec.ssh_public_keys if k.id == key_id), None) + if not key: + raise RuntimeError(f"SSH key '{key_id}' not found in environment spec") + + if key.value != key_value: + raise RuntimeError(f"SSH key '{key_id}' has incorrect value") + + if not env.status or not env.status.ssh_public_keys: + return False + + key_status = next((ks for ks in env.status.ssh_public_keys if ks.id == key_id), None) + if not key_status: + return False + + if key_status.phase == "CONTENT_PHASE_FAILED": + raise RuntimeError(f"SSH key '{key_id}' failed to apply") + + return key_status.phase == "CONTENT_PHASE_READY" + + async def wait_until_running(self) -> None: + def check_running(env: Environment) -> Optional[bool]: + return True if self.is_running(env) else None + await self.wait_until(check_running) + + async def wait_for_ssh_url(self) -> str: + def get_url(env: Environment) -> Optional[str]: + return self.get_ssh_url(env) + return await self.wait_until(get_url) + + async def wait_for_port_url(self, port: int) -> str: + def check_port(env: Environment) -> Optional[str]: + return self.get_port_url(env, port) + return await self.wait_until(check_port) + + async def wait_for_ssh_key_applied(self, key_id: str, key_value: str) -> None: + def check_key(env: Environment) -> Optional[bool]: + return True if self.check_ssh_key_applied(env, key_id, key_value) else None + await self.wait_until(check_key) + +async def wait_for_environment_ready(client: AsyncGitpod, environment_id: str) -> None: + env = EnvironmentState(client, environment_id) + try: + await env.wait_until_running() + finally: + await env.close() + +async def find_most_used_environment_class(client: AsyncGitpod) -> Optional[EnvironmentClass]: + """ + Find the most used environment class. + """ + class_usage: dict[str, int] = {} + envs_resp = await client.environments.list() + while envs_resp: + for env in envs_resp.environments: + if env.spec and env.spec.machine and env.spec.machine.class_: + env_class = env.spec.machine.class_ + class_usage[env_class] = class_usage.get(env_class, 0) + 1 + if envs_resp.pagination and envs_resp.pagination.next_token: + envs_resp = await client.environments.list(token=envs_resp.pagination.next_token) + else: + break + + sorted_classes = sorted(class_usage.items(), key=lambda item: -item[1]) + environment_class_id: Optional[str] = sorted_classes[0][0] if sorted_classes else None + if not environment_class_id: + return None + + classes_resp = await client.environments.classes.list(filter={"can_create_environments": True}) + while classes_resp: + for cls in classes_resp.environment_classes: + if cls.id == environment_class_id: + return cls + if classes_resp.pagination and classes_resp.pagination.next_token: + classes_resp = await client.environments.classes.list(token=classes_resp.pagination.next_token) + else: + break + return None \ No newline at end of file