diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b48d894..3e047ad 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -66,12 +66,12 @@ repos: - id: "editorconfig-checker" - repo: "https://github.com/python-jsonschema/check-jsonschema" - rev: "0.36.1" + rev: "0.36.2" hooks: - id: "check-dependabot" - repo: "https://github.com/rhysd/actionlint" - rev: "v1.7.10" + rev: "v1.7.11" hooks: - id: "actionlint" diff --git a/changelog.d/20260224_121813_derek_cli_config_management_sc_48273.rst b/changelog.d/20260224_121813_derek_cli_config_management_sc_48273.rst new file mode 100644 index 0000000..c2189e2 --- /dev/null +++ b/changelog.d/20260224_121813_derek_cli_config_management_sc_48273.rst @@ -0,0 +1,11 @@ + +Added +----- + +* Add ``gra init`` command to initialize a new gra repository. +* Add ``gra manage`` command to configure a gra repository. + +Development +----------- + +* Add re-usable constructs for requesting single- and multi-value user selections. diff --git a/pyproject.toml b/pyproject.toml index 3ead69a..5c5a89d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,8 @@ classifiers = [ dependencies = [ "click >=8,<9", + "prompt-toolkit >=3.0,<4.0", + "rich >=14.0,<15.0", "globus-sdk >=4", "requests", "types-requests (>=2.32.4.20260107,<3.0.0.0)", diff --git a/requirements/mypy/requirements.txt b/requirements/mypy/requirements.txt index 4824a99..b02443b 100644 --- a/requirements/mypy/requirements.txt +++ b/requirements/mypy/requirements.txt @@ -1,4 +1,4 @@ -librt==0.7.8 ; python_version >= "3.10" and platform_python_implementation != "PyPy" +librt==0.8.1 ; python_version >= "3.10" and platform_python_implementation != "PyPy" mypy-extensions==1.1.0 ; python_version >= "3.10" mypy==1.19.1 ; python_version >= "3.10" pathspec==1.0.4 ; python_version >= "3.10" diff --git a/src/globus_registered_api/cli.py b/src/globus_registered_api/cli.py index dcd6862..10b259c 100644 --- a/src/globus_registered_api/cli.py +++ b/src/globus_registered_api/cli.py @@ -8,18 +8,18 @@ import json import os import typing as t -from collections.abc import Iterable from uuid import UUID import click import click.exceptions -from globus_sdk import AuthClient from globus_sdk import ClientApp -from globus_sdk import FlowsClient from globus_sdk import GlobusAPIError from globus_sdk import GlobusAppConfig -from globus_sdk import Scope from globus_sdk import UserApp +from globus_sdk.scopes import AuthScopes +from globus_sdk.scopes import FlowsScopes +from globus_sdk.scopes import GroupsScopes +from globus_sdk.scopes import SearchScopes from globus_sdk.token_storage import JSONTokenStorage from globus_sdk.token_storage import TokenStorage @@ -96,9 +96,15 @@ def for_globus_app( ) -SCOPE_REQUIREMENTS: dict[str, str | Scope | Iterable[str | Scope]] = { - AuthClient.scopes.resource_server: [AuthClient.scopes.openid], - FlowsClient.scopes.resource_server: [FlowsClient.scopes.all], +SCOPE_REQUIREMENTS = { + AuthScopes.resource_server: [ + AuthScopes.openid, + AuthScopes.profile, + AuthScopes.email, + ], + GroupsScopes.resource_server: [GroupsScopes.view_my_groups_and_memberships], + SearchScopes.resource_server: [SearchScopes.search], + FlowsScopes.resource_server: [FlowsScopes.all], } diff --git a/src/globus_registered_api/clients.py b/src/globus_registered_api/clients.py index fb0cf69..d138960 100644 --- a/src/globus_registered_api/clients.py +++ b/src/globus_registered_api/clients.py @@ -5,6 +5,8 @@ from globus_sdk import AuthClient from globus_sdk import GlobusApp +from globus_sdk import GroupsClient +from globus_sdk import SearchClient from globus_registered_api import ExtendedFlowsClient @@ -31,3 +33,27 @@ def create_flows_client(app: GlobusApp) -> ExtendedFlowsClient: :return: An ExtendedFlowsClient configured with the provided app """ return ExtendedFlowsClient(app=app) + + +def create_groups_client(app: GlobusApp) -> GroupsClient: + """ + Create a GroupsClient for the given app. + + This function mostly exists for unified testing fixtures. + + :param app: A Globus app instance to use for authentication + :return: A GroupsClient configured with the provided app + """ + return GroupsClient(app=app) + + +def create_search_client(app: GlobusApp) -> SearchClient: + """ + Create a SearchClient for the given app. + + This function mostly exists for unified testing fixtures. + + :param app: A Globus app instance to use for authentication + :return: A SearchClient configured with the provided app + """ + return SearchClient(app=app) diff --git a/src/globus_registered_api/commands/init.py b/src/globus_registered_api/commands/init.py index 69dc26e..157f125 100644 --- a/src/globus_registered_api/commands/init.py +++ b/src/globus_registered_api/commands/init.py @@ -3,15 +3,209 @@ # Copyright 2025-2026 Globus # SPDX-License-Identifier: Apache-2.0 +import os +import typing as t + import click +import openapi_pydantic as oa +import prompt_toolkit +from prompt_toolkit.completion import CompleteEvent +from prompt_toolkit.completion import Completer +from prompt_toolkit.completion import Completion +from prompt_toolkit.completion import PathCompleter +from prompt_toolkit.document import Document +from prompt_toolkit.validation import ValidationError +from prompt_toolkit.validation import Validator +from globus_registered_api.clients import create_auth_client +from globus_registered_api.config import CoreConfig +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.config import RoleConfig from globus_registered_api.context import CLIContext from globus_registered_api.context import with_cli_context +from globus_registered_api.openapi import OpenAPISpecAnalyzer +from globus_registered_api.openapi.loader import OpenAPILoadError +from globus_registered_api.openapi.loader import load_openapi_spec +from globus_registered_api.rendering import prompt_selection + + +class OpenAPISpecPath(click.ParamType): + name = "openapi_spec_path" + + def convert( + self, + value: t.Any, + param: click.Parameter | None, + ctx: click.Context | None, + ) -> str: + if not isinstance(value, str): + self.fail(f"{value!r} is not a valid string", param, ctx) + + try: + load_openapi_spec(value) + except OpenAPILoadError as e: + self.fail(str(e), param, ctx) + + return value + + +class OpenAPISpecPathValidator(Validator): + def validate(self, document: Document) -> None: + try: + load_openapi_spec(document.text) + except OpenAPILoadError as e: + raise ValidationError(cursor_position=len(document.text), message=str(e)) + + +class ClickURLParam(click.ParamType): + name = "url" + def convert( + self, + value: t.Any, + param: click.Parameter | None, + ctx: click.Context | None, + ) -> str: + if not isinstance(value, str): + self.fail(f"{value!r} is not a valid string", param, ctx) -@click.command + if not (value.startswith("http://") or value.startswith("https://")): + self.fail(f"{value!r} is not a valid URL", param, ctx) + + return value + + +@click.command("init") @with_cli_context def init_command(ctx: CLIContext) -> None: + """Initialize a local Registered API Repository.""" + if RegisteredAPIConfig.exists(): + click.echo("Error: Cannot re-initialize an existing repository.") + click.echo("Please use 'gra manage' instead.") + raise click.Abort() + + identity_id = create_auth_client(ctx.globus_app).userinfo()["sub"] + + click.echo("Initializing a new local registered API repository...") + click.echo("Registered APIs leverage the OpenAPI specification.") + click.echo("Many common frameworks, like FastAPI, will generate these for you.") + click.echo("For more information see: https://learn.openapis.org/") + click.echo() + + if click.confirm("Does your service have an OpenAPI specification?", default=True): + core_config = _prompt_for_reference_core_config() + else: + core_config = _prompt_for_inline_core_config() + + initial_role = RoleConfig(type="identity", id=identity_id, access_level="owner") + config = RegisteredAPIConfig(core=core_config, targets=[], roles=[initial_role]) + config.commit() + + click.echo() + click.echo("Successfully initialized repository!") + click.echo() + click.echo("To start configuring Registered APIs, use 'gra manage'.") + + +def _prompt_for_inline_core_config() -> CoreConfig: + click.echo("No problem, I'll need 2 pieces of basic info about it then.") + click.echo("You can always update the OpenAPI spec later.") + click.echo() + + # Prompt the user to fill in basic openapi.info fields + # https://swagger.io/specification/#info-object + click.echo("1. What is your service's name?") + click.echo("This should be something human readable, for example, 'Globus Search'.") + name = click.prompt("Service Name", type=str) + click.echo() + + click.echo("2. What is the base URL for your service?") + click.echo("This is the common http prefix for all of the API endpoints.") + click.echo("For example, 'https://api.example.com'.") + base_url = click.prompt("Base URL", type=ClickURLParam()) + + return CoreConfig( + base_url=base_url, + specification=oa.OpenAPI( + openapi="3.1.0", + info=oa.Info(title=name, version="-1"), + paths={}, + ), + ) + + +def _prompt_for_reference_core_config() -> CoreConfig: + click.echo("Great, can I find this specification?") + click.echo("This may be either a URL or a local filesystem path.") + click.echo() + + spec_path = prompt_toolkit.prompt( + "Specification Location (Path or URL): ", + completer=OpenAPISpecCompleter(), + validator=OpenAPISpecPathValidator(), + validate_while_typing=False, + ) + click.echo("Looks like an OpenAPI specification, perfect.") + + openapi_spec = load_openapi_spec(spec_path) + analysis = OpenAPISpecAnalyzer().analyze(openapi_spec) + + if len(analysis.https_servers) == 1: + server = analysis.https_servers[0] + click.echo(f"I found one HTTPS server in the specification: {server}.") + if click.confirm("Should I use this as the base URL of your service?"): + return CoreConfig(base_url=server, specification=spec_path) + else: + click.echo("No problem, I'll need you to tell me the base URL then.") + + elif len(analysis.https_servers) > 1: + click.echo("I found multiple HTTPS servers in the specification:") + for server in analysis.https_servers: + click.echo(f" - {server}") + if click.confirm("Would you like to use one of these as the service base URL?"): + server_options = [(server, server) for server in analysis.https_servers] + selected_server = prompt_selection("Server", server_options) + return CoreConfig(base_url=selected_server, specification=spec_path) + else: + click.echo("No problem, I'll need you to tell me the base URL then.") + + else: + click.echo("I couldn't find any HTTPS servers in the specification.") + click.echo("Please enter the base URL manually.") + + base_url = click.prompt("Base URL", type=ClickURLParam()) + return CoreConfig(base_url=base_url, specification=spec_path) + + +class OpenAPISpecCompleter(Completer): """ - [Placeholder] Initialize Repository Config. + Specialized Completer for OpenAPI specs. + + If the input looks like it could be a URL, do nothing. + Otherwise, complete filesystem paths. """ + + def __init__(self) -> None: + self._path_completer = PathCompleter( + expanduser=True, + file_filter=_is_dir_or_data_file, + ) + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> t.Iterable[Completion]: + text = document.text_before_cursor + + could_input_be_http = text.startswith( + "http://"[: len(text)] + ) or text.startswith("https://"[: len(text)]) + if not could_input_be_http: + yield from self._path_completer.get_completions(document, complete_event) + + +def _is_dir_or_data_file(filename: str) -> bool: + if os.path.isdir(filename): + return True + + _, ext = os.path.splitext(filename) + return ext.lower() in {".json", ".yaml", ".yml"} diff --git a/src/globus_registered_api/commands/manage/dispatch.py b/src/globus_registered_api/commands/manage/dispatch.py index 6e4d295..1cf7430 100644 --- a/src/globus_registered_api/commands/manage/dispatch.py +++ b/src/globus_registered_api/commands/manage/dispatch.py @@ -3,15 +3,85 @@ # Copyright 2025-2026 Globus # SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + import click +from globus_registered_api.config import RegisteredAPIConfig from globus_registered_api.context import CLIContext from globus_registered_api.context import with_cli_context +from globus_registered_api.openapi import OpenAPISpecAnalyzer +from globus_registered_api.openapi.loader import load_openapi_spec +from globus_registered_api.rendering import prompt_selection +from .domain import BACK_SENTINEL +from .domain import EXIT_SENTINEL +from .domain import BackSentinel +from .domain import ConfiguratorMenu +from .domain import ExitSentinel +from .domain import MainMenu +from .domain import ManageContext +from .roles import RoleConfigurator +from .targets import TargetConfigurator -@click.command + +@click.command("manage") @with_cli_context def manage_command(ctx: CLIContext) -> None: - """ - [Placeholder] Manage Repository Config. - """ + """Interactively configure your GRA repo.""" + + # This command has a lot of back and forth interaction with the user. + # At a high level, it offers a main menu to select different subcommands from + # either the `TargetConfigurator` or the `RoleConfigurator`. + # Subcommands may be invoked over and over until the user chooses to "exit" from + # the dispatch menu handled through this function. + + manage_context = _create_manage_context(ctx) + main_menu = _create_main_menu(manage_context) + + current_menu: MainMenu | ConfiguratorMenu = main_menu + with manage_context.globus_app: + while True: + option = prompt_selection("Menu Option", current_menu, show_selection=False) + if isinstance(option, ExitSentinel): + # Exit the command. + break + elif isinstance(option, BackSentinel): + # Return to the main menu. + current_menu = main_menu + elif callable(option): + # Invoke a subcommand, remaining at the current menu. + option() + else: + # Descend into a sub-menu. + current_menu = option + + +def _create_main_menu(manage_context: ManageContext) -> MainMenu: + """Create a main navigation menu.""" + target_menu = TargetConfigurator(manage_context).menu_options + role_menu = RoleConfigurator(manage_context).menu_options + + target_menu += [(BACK_SENTINEL, ""), (EXIT_SENTINEL, "")] + role_menu += [(BACK_SENTINEL, ""), (EXIT_SENTINEL, "")] + return [ + (target_menu, "Targets"), + (role_menu, "Roles"), + (EXIT_SENTINEL, ""), + ] + + +def _create_manage_context(ctx: CLIContext) -> ManageContext: + """Create a context object to be provided to configurator subcommand objects.""" + config = RegisteredAPIConfig.load() + if isinstance(config.core.specification, str): + spec = load_openapi_spec(config.core.specification) + else: + spec = config.core.specification + analysis = OpenAPISpecAnalyzer().analyze(spec) + + context = ManageContext(config=config, analysis=analysis, globus_app=ctx.globus_app) + context.globus_app.login() + + return context diff --git a/src/globus_registered_api/commands/manage/domain.py b/src/globus_registered_api/commands/manage/domain.py new file mode 100644 index 0000000..c48498f --- /dev/null +++ b/src/globus_registered_api/commands/manage/domain.py @@ -0,0 +1,41 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +import typing as t +from dataclasses import dataclass + +from globus_sdk import GlobusApp + +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.openapi import SpecAnalysis + + +@dataclass +class ManageContext: + """Context object for configurator subcommands.""" + + config: RegisteredAPIConfig + analysis: SpecAnalysis + globus_app: GlobusApp + + +# Navigation Controls and Types +class BackSentinel: ... + + +BACK_SENTINEL = BackSentinel() + + +class ExitSentinel: ... + + +EXIT_SENTINEL = ExitSentinel() + +ControlSignal: t.TypeAlias = BackSentinel | ExitSentinel + +ConfiguratorSubcommand: t.TypeAlias = t.Callable[[], None] +ConfiguratorMenu: t.TypeAlias = list[tuple[ConfiguratorSubcommand | ControlSignal, str]] + +MainMenu: t.TypeAlias = list[tuple[ConfiguratorMenu | ExitSentinel, str]] diff --git a/src/globus_registered_api/commands/manage/roles.py b/src/globus_registered_api/commands/manage/roles.py new file mode 100644 index 0000000..bbe00ce --- /dev/null +++ b/src/globus_registered_api/commands/manage/roles.py @@ -0,0 +1,376 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import typing as t +from uuid import UUID + +import click +from globus_sdk import GlobusApp +from rich.console import Console +from rich.table import Table + +from globus_registered_api.clients import create_auth_client +from globus_registered_api.clients import create_groups_client +from globus_registered_api.clients import create_search_client +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.config import RoleAccessLevel +from globus_registered_api.config import RoleConfig +from globus_registered_api.rendering import prompt_selection + +from .domain import ConfiguratorMenu +from .domain import ManageContext + +_GROUPS_SEARCH_INDEX_ID = "fcd4d0ab-f48d-4b13-af61-a5d40832192f" + + +class RoleSummaryTable(Table): + _console: t.ClassVar[Console] = Console() + + def __init__(self, roles: list[RoleConfig], *, resolver: _RoleNameResolver) -> None: + super().__init__() + self.add_column() + self.add_column("Entity Type", style="red") + self.add_column("Identifier", style="magenta") + self.add_column("Access Level", style="cyan") + + self.resolver = resolver + self.resolver.populate_cache(roles) + + for role in sorted(roles, key=lambda r: r.sort_key): + self.add_row( + str(self.row_count + 1), + role.type, + self.resolver.resolve(role) or str(role.id), + role.access_level, + ) + + def print(self) -> None: + self._console.print(self) + + +class RoleConfigurator: + """ + Central management pane of role configuration. + + Each method corresponds to a subcommand of `gra manage` related to role management. + """ + + def __init__(self, context: ManageContext) -> None: + self.config = context.config + self._name_resolver = _RoleNameResolver(context.globus_app) + self._role_prompter = _RolePrompter( + context.config, context.globus_app, self._name_resolver + ) + + @property + def menu_options(self) -> ConfiguratorMenu: + """The available role management subcommands.""" + return [ + (self.list_roles, "List Roles"), + (self.modify_role, "Modify Role"), + (self.add_role, "Add Role"), + (self.remove_role, "Remove Role"), + ] + + def list_roles(self) -> None: + """Display a summary table of all configured roles.""" + table = RoleSummaryTable(self.config.roles, resolver=self._name_resolver) + table.print() + + def modify_role(self) -> None: + """Modify a configured role's access level.""" + role = self._role_prompter.prompt_for_config() + + access_level_options: list[tuple[RoleAccessLevel, str]] = [ + ("owner", "Owner"), + ("admin", "Admin"), + ("viewer", "Viewer"), + ] + access_level = prompt_selection( + "Access Level", access_level_options, default=role.access_level + ) + + role.access_level = access_level + self.config.roles.sort(key=lambda r: r.sort_key) + self.config.commit() + + def add_role(self) -> None: + """Add a new role to the configuration.""" + role_type, entity_id = self._role_prompter.prompt_for_entity() + existing_roles = {(r.type, r.id) for r in self.config.roles} + if (role_type, entity_id) in existing_roles: + click.echo("This entity already has a role configured.") + click.echo("Please use the 'Modify Role' option instead.") + return + + access_level_options: list[tuple[RoleAccessLevel, str]] = [ + ("owner", "Owner"), + ("admin", "Admin"), + ("viewer", "Viewer"), + ] + access_level = prompt_selection("Access Level", options=access_level_options) + + new_role = RoleConfig(type=role_type, id=entity_id, access_level=access_level) + self.config.roles.append(new_role) + self.config.roles.sort(key=lambda r: r.sort_key) + self.config.commit() + + def remove_role(self) -> None: + """Remove a role from the configuration.""" + role = self._role_prompter.prompt_for_config() + role_name = self._name_resolver.resolve(role) or str(role.id) + + if click.confirm(f"You'd like to remove access from '{role_name}'?"): + self.config.roles.remove(role) + self.config.commit() + + +class _RolePrompter: + """ + Common and/or complex user prompting logic for role management subcommands. + """ + + def __init__( + self, + config: RegisteredAPIConfig, + globus_app: GlobusApp, + resolver: _RoleNameResolver, + ) -> None: + self._config = config + self._resolver = resolver + self._groups_client = create_groups_client(globus_app) + + def prompt_for_config(self) -> RoleConfig: + """ + Prompt the user to select an already-configured RoleConfig object. + """ + + self._resolver.populate_cache(self._config.roles) + + role_options = [ + (role, f"{self._resolver.resolve(role) or role.id} ({role.type})") + for role in sorted(self._config.roles, key=lambda role: role.sort_key) + ] + return prompt_selection("Role", role_options) + + def prompt_for_entity(self) -> tuple[t.Literal["group", "identity"], UUID]: + """ + Prompt the user to select an unconfigured entity. + """ + + role_type_options = [("identity", "User"), ("group", "Group")] + role_type = prompt_selection("Role Type", options=role_type_options) + + if role_type == "group": + return "group", self._prompt_for_group_id() + elif role_type == "identity": + return "identity", self._prompt_for_entity_id("identity") + raise RuntimeError(f"Unrecognized role type: {role_type}") + + def _prompt_for_group_id(self) -> UUID: + """ + Prompt the user to select a group from a list of groups which they are both + 1. currently a member of + 2. have not already configured a role for. + + Additionally, they may choose to enter a UUID manually. + :return: A group UUID. + """ + + config_ids = {role.id for role in self._config.roles if role.type == "group"} + groups_resp = self._groups_client.get_my_groups() + + # Process the groups response into a list of known_group options. + known_group_options: list[tuple[UUID, str]] = [ + (UUID(group["id"]), group["name"]) + for group in groups_resp + if UUID(group["id"]) not in config_ids + ] + known_group_options.sort(key=lambda g: g[1].lower()) + + # Prepend an option to manually enter a Group ID. + group_options = [(None, "")] + known_group_options + + selection = prompt_selection("Group", group_options) + if selection is not None: + return selection + + return self._prompt_for_entity_id("group") + + def _prompt_for_entity_id(self, role_type: t.Literal["group", "identity"]) -> UUID: + """ + Prompt the user to enter a UUID for the specified role type. + + :return: A UUID of the specified role type. + """ + resolver = self._resolver.get_specific_resolver(role_type) + + while True: + prompt_msg = f"{role_type.capitalize()} UUID" + entity_id: UUID = click.prompt(prompt_msg, type=click.UUID) + resolved_name = resolver.resolve(entity_id) + + if resolved_name is None: + click.echo("The UUID you provided could not be resolved to a name.") + if click.confirm("Would you like to proceed anyways?"): + return entity_id + else: + click.echo(f"The UUID you provided resolved to: '{resolved_name}'") + if click.confirm("Would you like to proceed?"): + return entity_id + + +class _RoleNameResolver: + """ + An ID -> display name resolver. + + Usage: + resolver = _RoleNameResolver(globus_app) + display_name = resolver.resolve(role_config) + + A `populate_cache` method if provided to facilitate batch resolution. + Unresolvable IDs fail silently (returning None). + + Groups display names are sourced from the groups-maintained search index. + Identity usernames are sourced from the auth service. + """ + + def __init__(self, globus_app: GlobusApp) -> None: + self._group_name_resolver = _GroupNameResolver(globus_app) + self._identity_name_resolver = _IdentityNameResolver(globus_app) + + def resolve(self, role_config: RoleConfig) -> str | None: + resolver = self.get_specific_resolver(role_config.type) + return resolver.resolve(role_config.id) + + def populate_cache(self, role_configs: list[RoleConfig]) -> None: + group_ids = [r.id for r in role_configs if r.type == "group"] + identity_ids = [r.id for r in role_configs if r.type == "identity"] + + self._group_name_resolver.populate_cache(group_ids) + self._identity_name_resolver.populate_cache(identity_ids) + + def get_specific_resolver( + self, role_type: t.Literal["identity", "group"] + ) -> _GroupNameResolver | _IdentityNameResolver: + if role_type == "group": + return self._group_name_resolver + else: + return self._identity_name_resolver + + +class _GroupNameResolver: + """ + A group ID -> name resolver. + Failures are silently ignored and will return None on resolution attempts. + """ + + def __init__(self, globus_app: GlobusApp) -> None: + self._search_client = create_search_client(globus_app) + self._name_cache: dict[UUID, str | None] = {} + + def resolve(self, group_id: UUID) -> str | None: + """ + Resolve a group ID to its display name, if possible. + + :returns: The group's display name or None if the name could not be resolved. + """ + if group_id not in self._name_cache: + self.populate_cache([group_id]) + + return self._name_cache[group_id] + + def populate_cache(self, group_ids: list[UUID]) -> None: + """ + Populate the name cache for a list of group IDs. + This batch operation optimizes the number of API calls needed for a list of + group IDs. + + Groups will only ever have resolution attempted once. Subsequent attempts will + silently fail. + """ + group_ids = [gid for gid in group_ids if gid not in self._name_cache] + if len(group_ids) == 0: + return + + search_resp = self._search_client.paginated.post_search( + _GROUPS_SEARCH_INDEX_ID, + { + "filters": [ + { + "type": "match_any", + "field_name": "id", + "values": [str(gid) for gid in group_ids], + } + ], + }, + ) + + for page in search_resp.pages(): + for result in page["gmeta"]: + try: + group_id = UUID(result["entries"][0]["content"]["id"]) + group_name = result["entries"][0]["content"]["name"] + self._name_cache[group_id] = group_name + except (KeyError, IndexError): + # Malformed Group Index Data + continue + + # Mark any IDs we attempted to resolve but didn't. + for gid in group_ids: + if gid not in self._name_cache: + self._name_cache[gid] = None + + +class _IdentityNameResolver: + """ + An identity ID -> username resolver. + Failures are silently ignored and will return None on resolution attempts. + """ + + def __init__(self, globus_app: GlobusApp) -> None: + self._auth_client = create_auth_client(globus_app) + self._name_cache: dict[UUID, str | None] = {} + + def resolve(self, identity_id: UUID) -> str | None: + """ + Resolve an identity ID to its display name, if possible. + + :returns: The identity's display name or None if the name could not be resolved. + """ + if identity_id not in self._name_cache: + self.populate_cache([identity_id]) + + return self._name_cache[identity_id] + + def populate_cache(self, identity_ids: list[UUID]) -> None: + """ + Populate the name cache for a list of identity IDs. + This batch operation optimizes the number of API calls needed for a list of + identity IDs. + + Identities will only ever have resolution attempted once. Subsequent attempts + will silently fail. + """ + identity_ids = [iid for iid in identity_ids if iid not in self._name_cache] + if len(identity_ids) == 0: + return + + resp = self._auth_client.get_identities(ids=identity_ids) + + for identity_info in resp["identities"]: + try: + identity_id = UUID(identity_info["id"]) + display_name = identity_info["username"] + self._name_cache[identity_id] = display_name + except (KeyError, ValueError): + # Malformed identity response from auth. + continue + + for identity_id in identity_ids: + if identity_id not in self._name_cache: + self._name_cache[identity_id] = None diff --git a/src/globus_registered_api/commands/manage/targets.py b/src/globus_registered_api/commands/manage/targets.py new file mode 100644 index 0000000..3765be2 --- /dev/null +++ b/src/globus_registered_api/commands/manage/targets.py @@ -0,0 +1,243 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import click +from rich.console import Console +from rich.panel import Panel +from rich.pretty import Pretty +from rich.table import Table + +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.config import TargetConfig +from globus_registered_api.domain import HTTP_METHODS +from globus_registered_api.domain import TargetSpecifier +from globus_registered_api.openapi import SpecAnalysis +from globus_registered_api.rendering import prompt_multiselection +from globus_registered_api.rendering import prompt_selection + +from .domain import ConfiguratorMenu +from .domain import ManageContext + +console = Console() + + +class TargetSummaryTable(Table): + def __init__(self, targets: list[TargetConfig]) -> None: + super().__init__(title="Target List") + self.add_column() + self.add_column("Alias", style="red") + self.add_column("Path", style="magenta") + self.add_column("Method", style="cyan") + + for target in sorted(targets, key=lambda ta: ta.sort_key): + self.add_row( + str(self.row_count + 1), + target.alias, + target.path, + target.method, + ) + + def print(self) -> None: + console.print(self) + + +class TargetConfigurator: + def __init__(self, context: ManageContext) -> None: + self.config = context.config + self.analysis = context.analysis + self._target_prompter = _TargetPrompter(context.config, context.analysis) + self._scope_prompter = _TargetScopePrompter(context.config, context.analysis) + + @property + def menu_options(self) -> ConfiguratorMenu: + """The available target management subcommands.""" + return [ + (self.display_target, "Display Target"), + (self.list_targets, "List Targets"), + (self.modify_target, "Modify Target"), + (self.add_target, "Add Target"), + (self.remove_target, "Remove Target"), + ] + + def display_target(self, target: TargetConfig | None = None) -> None: + if target is None: + target = self._target_prompter.prompt_for_config() + + if not target.scope_strings: + # Impute scopes from the spec analysis if they aren't defined explicitly. + if spec_scopes := self.analysis.scopes_by_target.get(target.specifier): + target = target.model_copy() + target.scope_strings = [f" {scope}" for scope in spec_scopes] + + panel = Panel( + Pretty(target, expand_all=True), + title=str(target), + ) + console.print(panel) + + def list_targets(self) -> None: + table = TargetSummaryTable(self.config.targets) + table.print() + + def modify_target(self) -> None: + target = self._target_prompter.prompt_for_config() + target.alias = click.prompt("Target Alias", type=str, default=target.alias) + target.scope_strings = self._scope_prompter.prompt_for_existing_target(target) + self.config.targets.sort(key=lambda ta: ta.sort_key) + self.config.commit() + self.display_target(target) + + def add_target(self) -> None: + target_specifier = self._target_prompter.prompt_specifier() + + click.echo(f"\nRegistering Target: {target_specifier}") + click.echo("Provide a human friendly name like 'create-resource'.") + target_alias = click.prompt("Target Alias", type=str) + + scope_strings = self._scope_prompter.prompt_for_new_target(target_specifier) + + target = TargetConfig( + path=target_specifier.path, + method=target_specifier.method, + alias=target_alias, + scope_strings=scope_strings, + ) + self.config.targets.append(target) + self.config.targets.sort(key=lambda ta: ta.sort_key) + self.config.commit() + self.display_target(target) + + def remove_target(self) -> None: + target = self._target_prompter.prompt_for_config() + self.config.targets.remove(target) + self.config.commit() + + +class _TargetPrompter: + def __init__(self, config: RegisteredAPIConfig, analysis: SpecAnalysis) -> None: + self._config = config + self._analysis = analysis + + def prompt_for_config(self) -> TargetConfig: + """ + Prompt for a target config from the existing config. + """ + target_options = [ + (target, str(target)) + for target in sorted(self._config.targets, key=lambda ta: ta.sort_key) + ] + return prompt_selection("Target", target_options) + + def prompt_specifier(self) -> TargetSpecifier: + """ + Prompt for a target specifier (a method and path). + + Target specifiers are derived from the OpenAPI spec. An escape hatch however + allows inputting explicit method and path strings in the case that the spec + is missing targets or targets are being registered multiple times. + """ + selection = self._prompt_specifier_from_spec() + if selection is not None: + return selection + + return self._prompt_specifier_from_user_input() + + def _prompt_specifier_from_spec(self) -> TargetSpecifier | None: + """ + Prompt for a target specifier that exists in the OpenAPI spec but isn't already + registered in the config. + """ + + configured_specifiers = {target.specifier for target in self._config.targets} + spec_targets = set(self._analysis.targets) - configured_specifiers + if not spec_targets: + return None + + target_options: list[tuple[TargetSpecifier | None, str]] = [ + (None, "") + ] + [ + (target, f"{target.path} ({target.method})") + for target in sorted(spec_targets, key=lambda target: target.path) + ] + return prompt_selection("Target", target_options) + + def _prompt_specifier_from_user_input(self) -> TargetSpecifier: + """ + Prompt for a target specifier by explicitly entering a path and method. + """ + + click.echo("Enter the path and method for the target you want to register.") + path = click.prompt("Path", type=str) + + method_options = [(m, m) for m in HTTP_METHODS] + method = prompt_selection("Method", method_options) + return TargetSpecifier(path=path, method=method) + + +class _TargetScopePrompter: + """ + Class responsible for prompting the user to select scopes for a target. + + Scope prompting is skipped if the spec already defines scopes for the target. + """ + + def __init__(self, config: RegisteredAPIConfig, analysis: SpecAnalysis) -> None: + self._config = config + self._analysis = analysis + + def prompt_for_new_target(self, target_specifier: TargetSpecifier) -> list[str]: + """ + Prompt for scopes to include in a newly registered target config. + + Target scope prompting is silently skipped if the spec explicitly defines them. + """ + if self._analysis.scopes_by_target.get(target_specifier): + return [] + + click.echo( + f"\nThe {target_specifier} OpenAPI specification doesn't define any " + "Globus Auth scopes." + ) + # TODO - link to gra docs on GlobusAuth scopes once they exist. + if not click.confirm("Would you like to register any?", default=True): + return [] + + all_scopes = self._all_scopes() + return prompt_multiselection( + "Scope", + [(scope, scope) for scope in sorted(all_scopes)], + defaults=[scope for scope in all_scopes if scope.endswith(":all")], + custom_input=True, + ) + + def prompt_for_existing_target(self, target: TargetConfig) -> list[str]: + """ + Prompt for scopes to include in an already registered target config. + + Target scope prompting is silently skipped if the spec explicitly defines them. + """ + if self._analysis.scopes_by_target.get(target.specifier): + # Special case - + # The `gra manage` command only allows attaching scopes to targets that + # lack them in the specification. But since the config file can be + # manually edited; always respect an explicitly defined scope list. + if not target.scope_strings: + return [] + + return prompt_multiselection( + "Scope", + [(scope, scope) for scope in sorted(self._all_scopes())], + defaults=target.scope_strings, + custom_input=True, + ) + + def _all_scopes(self) -> set[str]: + """Get the set of all scope strings between the spec and config.""" + scope_strings = set(self._analysis.scope_strings) + for target in self._config.targets: + scope_strings.update(target.scope_strings) + return scope_strings diff --git a/src/globus_registered_api/config.py b/src/globus_registered_api/config.py index bd5ec53..1ba8569 100644 --- a/src/globus_registered_api/config.py +++ b/src/globus_registered_api/config.py @@ -49,7 +49,7 @@ def load(cls) -> RegisteredAPIConfig: """ if not cls.exists(): click.echo("Error: Missing repository config file.") - click.echo("Run 'globus-registered-api init' first to create a repository.") + click.echo("Run 'gra init' first to create a repository.") raise click.Abort() return cls.model_validate_json(_CONFIG_PATH.read_text()) @@ -107,6 +107,9 @@ def __str__(self) -> str: return f"{self.alias} ({self.method} {self.path})" +RoleAccessLevel = t.Literal["owner", "admin", "viewer"] + + class RoleConfig(BaseModel): """ A configuration entry for a single identity or group. @@ -121,7 +124,7 @@ class RoleConfig(BaseModel): id: UUID # The degree of permission granted to this entity. - access_level: t.Literal["owner", "admin", "viewer"] + access_level: RoleAccessLevel @property def sort_key(self) -> tuple[str, ...]: diff --git a/src/globus_registered_api/domain.py b/src/globus_registered_api/domain.py index e9f116a..686972a 100644 --- a/src/globus_registered_api/domain.py +++ b/src/globus_registered_api/domain.py @@ -9,10 +9,19 @@ import typing as t from dataclasses import dataclass -HTTP_METHODS = ("GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "TRACE", "OPTIONS") HTTPMethod = t.Literal[ "GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "TRACE", "OPTIONS" ] +HTTP_METHODS: tuple[HTTPMethod, ...] = ( + "GET", + "POST", + "PUT", + "DELETE", + "PATCH", + "HEAD", + "TRACE", + "OPTIONS", +) _TARGET_SPECIFIER_REGEX = re.compile( diff --git a/src/globus_registered_api/openapi/__init__.py b/src/globus_registered_api/openapi/__init__.py index aa1b673..03a67f4 100644 --- a/src/globus_registered_api/openapi/__init__.py +++ b/src/globus_registered_api/openapi/__init__.py @@ -9,10 +9,15 @@ from globus_registered_api.openapi.selector import AmbiguousContentTypeError from globus_registered_api.openapi.selector import TargetNotFoundError +from .analyzer import OpenAPISpecAnalyzer +from .analyzer import SpecAnalysis + __all__ = [ "OpenAPILoadError", "process_target", "ProcessingResult", "AmbiguousContentTypeError", "TargetNotFoundError", + "OpenAPISpecAnalyzer", + "SpecAnalysis", ] diff --git a/src/globus_registered_api/openapi/analyzer.py b/src/globus_registered_api/openapi/analyzer.py new file mode 100644 index 0000000..195e205 --- /dev/null +++ b/src/globus_registered_api/openapi/analyzer.py @@ -0,0 +1,65 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +import typing as t +from dataclasses import dataclass +from urllib.parse import urlparse + +import openapi_pydantic as oa + +from globus_registered_api.domain import HTTP_METHODS +from globus_registered_api.domain import TargetSpecifier + + +@dataclass +class SpecAnalysis: + # A list of all targets defined in the specification + targets: list[TargetSpecifier] + + # A list of all unique scope strings used in operations across the specification. + # Note: these only include "well-formed" scopes in the shape + # `{"GlobusAuth": [scope_string]}` with a single scope string in the list. + scope_strings: list[str] + + # A mapping of targets and scopes, identifying well-defined scopes per target. + scopes_by_target: dict[TargetSpecifier, list[str]] + + # All servers defined in the specification with an HTTPS scheme. + https_servers: list[str] + + +class OpenAPISpecAnalyzer: + def analyze(self, spec: oa.OpenAPI) -> SpecAnalysis: + targets: list[TargetSpecifier] = [] + scopes: t.Set[str] = set() + scopes_by_target: dict[TargetSpecifier, list[str]] = {} + + for path, path_schema in (spec.paths or {}).items(): + for method in HTTP_METHODS: + if operation := getattr(path_schema, method.lower(), None): + target = TargetSpecifier.create(method, path) + targets.append(target) + scopes_by_target[target] = [] + for requirement in operation.security or []: + if ( + len(requirement) == 1 + and (globus_auth_scopes := requirement.get("GlobusAuth")) + and len(globus_auth_scopes) == 1 + ): + scopes.add(globus_auth_scopes[0]) + scopes_by_target[target].append(globus_auth_scopes[0]) + + https_servers: list[str] = [ + server.url + for server in spec.servers + if urlparse(server.url).scheme == "https" + ] + + return SpecAnalysis( + targets=targets, + scope_strings=sorted(scopes), + scopes_by_target=scopes_by_target, + https_servers=https_servers, + ) diff --git a/src/globus_registered_api/rendering/__init__.py b/src/globus_registered_api/rendering/__init__.py new file mode 100644 index 0000000..b0a00c4 --- /dev/null +++ b/src/globus_registered_api/rendering/__init__.py @@ -0,0 +1,12 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from .prompt import prompt_multiselection +from .prompt import prompt_selection + +__all__ = ( + "prompt_selection", + "prompt_multiselection", +) diff --git a/src/globus_registered_api/rendering/prompt/__init__.py b/src/globus_registered_api/rendering/prompt/__init__.py new file mode 100644 index 0000000..71e8e67 --- /dev/null +++ b/src/globus_registered_api/rendering/prompt/__init__.py @@ -0,0 +1,12 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from .multiselector import prompt_multiselection +from .selector import prompt_selection + +__all__ = ( + "prompt_selection", + "prompt_multiselection", +) diff --git a/src/globus_registered_api/rendering/prompt/multiselector.py b/src/globus_registered_api/rendering/prompt/multiselector.py new file mode 100644 index 0000000..f0c5c0a --- /dev/null +++ b/src/globus_registered_api/rendering/prompt/multiselector.py @@ -0,0 +1,259 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass + +from prompt_toolkit import Application +from prompt_toolkit.filters import is_done +from prompt_toolkit.formatted_text import AnyFormattedText +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding import KeyPressEvent +from prompt_toolkit.layout import AnyContainer +from prompt_toolkit.layout import ConditionalContainer +from prompt_toolkit.layout import HSplit +from prompt_toolkit.layout import Layout +from prompt_toolkit.widgets import Box +from prompt_toolkit.widgets import CheckboxList +from prompt_toolkit.widgets import Label +from prompt_toolkit.widgets import TextArea + +T = t.TypeVar("T") + + +class _SubmitSentinel: ... + + +class _CustomInputSentinel: ... + + +_SUBMIT_SENTINEL = _SubmitSentinel() +_CUSTOM_INPUT_SENTINEL = _CustomInputSentinel() + + +def prompt_multiselection( + option_type: str, + options: t.Sequence[tuple[T, AnyFormattedText]], + *, + defaults: t.Sequence[T] | None = None, + custom_input: bool = False, +) -> list[T]: + """ + Prompt the user to select 0 or more options from a supplied list. + + Note: for single selection, use ``prompt_selection`` instead. + + Usage: + >>> from globus_registered_api.rendering.prompt import prompt_selection + >>> options = [(1, "Red"), (2, "Green"), (3, "Blue")] + >>> # color_index will be a list of 0 or more values from 1, 2, and 3 + >>> color_index = prompt_selection("Color", options, defaults=[2, 3]) + + :param option_type: A string describing the category of options being presented. + e.g., "Color", "File", "Path", etc. + :param options: A sequence of tuples with 2 elements, each representing an option. + The first element of each option represents the value to be returned. + The second represents the text to be displayed to the user during selection. + e.g., [(1, "Red"), (2, "Green"), (3, "Blue")] + :param defaults: The values that should start as selected. + If None, no options will be selected by default. + :param custom_input: If True, the user will have the ability to enter custom + input that is not in the original list. This return type of this custom input + will be a string. + :returns: The list of values selected by the user. + """ + + message = f"Select one or more {option_type}s:" + selector = MultiSelector( + message=message, + options=options, + defaults=list(defaults) if defaults else None, + custom_input=custom_input, + ) + return selector.prompt() + + +@dataclass +class _CustomCheckboxResponse(t.Generic[T]): + add_custom_input: bool + current_values: list[T] + + +class MultiSelector(t.Generic[T]): + def __init__( + self, + *, + message: str, + options: t.Sequence[tuple[T, AnyFormattedText]], + defaults: list[T] | None, + custom_input: bool, + ) -> None: + if custom_input and not all(isinstance(options[0], str) for options in options): + raise ValueError("Custom input is only supported for string options.") + + self.message = message + self.options = list(options) + self.defaults = defaults + self.custom_input = custom_input + + def prompt(self) -> list[T]: + response = self._create_selection_application().run() + while response.add_custom_input: + custom_input_str = self._create_custom_input_application().run() + self.options.append((custom_input_str, custom_input_str)) # type: ignore[arg-type] + self.defaults = response.current_values + [custom_input_str] # type: ignore[list-item] + + response = self._create_selection_application().run() + + return response.current_values + + def _create_selection_application(self) -> Application[_CustomCheckboxResponse[T]]: + checkbox_list = self._create_checkbox_list() + header = Box( + Label(text=self.message, dont_extend_height=True), + padding_top=0, + padding_left=1, + padding_right=1, + padding_bottom=0, + ) + body = Box( + checkbox_list, + padding_top=0, + padding_left=2, + padding_right=1, + padding_bottom=0, + ) + container: AnyContainer = HSplit([header, body]) + # Use a conditional container to hide the checkbox list after submission. + container = ConditionalContainer(content=container, filter=~is_done) + layout = Layout(container=container, focused_element=checkbox_list) + + kb = KeyBindings() + + @kb.add("c-c") + @kb.add("") + def _keyboard_interrupt(event: KeyPressEvent) -> None: + """Abort when Control-C has been pressed.""" + event.app.exit(exception=KeyboardInterrupt(), style="class:aborting") + + return Application(layout=layout, full_screen=False, key_bindings=kb) + + def _create_custom_input_application(self) -> Application[str]: + text_area = TextArea( + prompt="Enter a new value: ", + multiline=False, + style="class:custom-input", + ) + container = ConditionalContainer(content=text_area, filter=~is_done) + layout = Layout(container=container, focused_element=text_area) + + kb = KeyBindings() + + @kb.add("enter", eager=True) + def _accept_input(event: KeyPressEvent) -> None: + event.app.exit(result=text_area.text, style="class:accepted") + + @kb.add("c-c") + @kb.add("") + def _keyboard_interrupt(event: KeyPressEvent) -> None: + """Abort when Control-C has been pressed.""" + event.app.exit(exception=KeyboardInterrupt(), style="class:aborting") + + return Application( + layout=layout, + full_screen=False, + key_bindings=kb, + ) + + def _create_checkbox_list(self) -> _CustomCheckboxList[T]: + return _CustomCheckboxList( + values=self.options, + custom_input=self.custom_input, + default_values=self.defaults, + open_character="", + select_character=">", + close_character="", + container_style="class:input-selection", + default_style="class:option", + selected_style="", + checked_style="class:selected-option", + ) + + +class _CustomCheckboxList( + t.Generic[T], CheckboxList[T | _SubmitSentinel | _CustomInputSentinel] +): + """ + An extended CheckboxList widget. + + In addition to normal CheckboxList behavior, this widget includes: + - a "Submit" option which causes the current application to exit, returning the + current selection. + - an optional "" option which causes the current application to exit, + returning the current selection and a signal that additional input is desired. + """ + + def __init__( + self, + values: t.Sequence[tuple[T, AnyFormattedText]], + default_values: t.Sequence[T] | None, + open_character: str, + select_character: str, + close_character: str, + container_style: str, + default_style: str, + selected_style: str, + checked_style: str, + custom_input: bool, + ) -> None: + combined_values: list[ + tuple[T | _SubmitSentinel | _CustomInputSentinel, AnyFormattedText] + ] = list(values) + + # Attach custom control buttons to the end of the list. + if custom_input: + combined_values += [(_CUSTOM_INPUT_SENTINEL, "")] + combined_values += [(_SUBMIT_SENTINEL, "")] + + super().__init__( + values=combined_values, + default_values=default_values, + open_character=open_character, + select_character=select_character, + close_character=close_character, + container_style=container_style, + default_style=default_style, + selected_style=selected_style, + checked_style=checked_style, + ) + + key_bindings = self.control.key_bindings + if not isinstance(key_bindings, KeyBindings): + raise RuntimeError("Unexpected key bindings type on CheckboxList control.") + + key_bindings.add_binding("enter")(self._intercept_enter) + if default_values: + # If defaults are provided, start on the "Submit" option. + self._selected_index = len(combined_values) - 1 + + def _intercept_enter(self, event: KeyPressEvent) -> None: + # Override default enter behavior if control options are selected. + selected_val = self.values[self._selected_index][0] + if selected_val is _SUBMIT_SENTINEL: + result = _CustomCheckboxResponse( + add_custom_input=False, + current_values=self.current_values, + ) + event.app.exit(result=result) + elif selected_val is _CUSTOM_INPUT_SENTINEL: + result = _CustomCheckboxResponse( + add_custom_input=True, + current_values=self.current_values, + ) + event.app.exit(result=result) + else: + self._handle_enter() diff --git a/src/globus_registered_api/rendering/prompt/selector.py b/src/globus_registered_api/rendering/prompt/selector.py new file mode 100644 index 0000000..b6d0f1f --- /dev/null +++ b/src/globus_registered_api/rendering/prompt/selector.py @@ -0,0 +1,149 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import typing as t + +from prompt_toolkit import Application +from prompt_toolkit.filters import is_done +from prompt_toolkit.formatted_text import AnyFormattedText +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding import KeyPressEvent +from prompt_toolkit.layout import AnyContainer +from prompt_toolkit.layout import ConditionalContainer +from prompt_toolkit.layout import HSplit +from prompt_toolkit.layout import Layout +from prompt_toolkit.widgets import Box +from prompt_toolkit.widgets import Label +from prompt_toolkit.widgets import RadioList + +T = t.TypeVar("T") + + +def prompt_selection( + option_type: str, + options: t.Sequence[tuple[T, AnyFormattedText]], + *, + default: T | None = None, + show_selection: bool = True, +) -> T: + """ + Prompt the user to select a single option from a supplied list. + + Note: for multiple selection, use ``prompt_multiselection`` instead. + + Usage: + >>> from globus_registered_api.rendering.prompt import prompt_selection + >>> options = [(1, "Red"), (2, "Green"), (3, "Blue")] + >>> # color_index will be one of 1, 2, or 3 + >>> color_index = prompt_selection("Color", options, default=2) + + :param option_type: A string describing the category of options being presented. + e.g., "Color", "File", "Path", etc. + :param options: A sequence of tuples with 2 elements, each representing an option. + The first element of each option represents the value to be returned. + The second represents the text to be displayed to the user during selection. + e.g., [(1, "Red"), (2, "Green"), (3, "Blue")] + :param default: The value of the option to start as 'selected'. If None, the + first option will be selected by default. + :param show_selection: Whether to print the selected option after selection. + :returns: The value of the user's selected option. + """ + n = "n" if option_type[0] in "aeiouAEIOU" else "" + message = f"Select a{n} {option_type}:" + selector = Selector( + message=message, + options=options, + default=default, + show_selection=show_selection, + ) + return selector.prompt() + + +class Selector(t.Generic[T]): + def __init__( + self, + *, + message: str | None, + options: t.Sequence[tuple[T, AnyFormattedText]], + default: T | None = None, + show_selection: bool = True, + ) -> None: + self.message = message + self.options = options + self.default = default + self.show_selection = show_selection + + def prompt(self) -> T: + return self._create_selection_application().run() + + def _create_selection_application(self) -> Application[T]: + radio_list = self._create_radio_list() + container: AnyContainer = Box( + radio_list, + padding_top=0, + padding_left=2, + padding_right=1, + padding_bottom=0, + ) + if self.message: + header = Box( + Label(text=self.message, dont_extend_height=True), + padding_top=0, + padding_left=1, + padding_right=1, + padding_bottom=0, + ) + container = HSplit([header, container]) + + selection_label = Label("") + layout = Layout( + container=ConditionalContainer( + content=container, + alternative_content=selection_label if self.show_selection else None, + filter=~is_done, + ), + focused_element=radio_list, + ) + + kb = KeyBindings() + + @kb.add("enter", eager=True) + def _accept_input(event: KeyPressEvent) -> None: + """Accept input when enter has been pressed.""" + current_key = radio_list.values[radio_list._selected_index][1] + selection_label.text = f"Selected: {current_key}\n" + event.app.exit(result=radio_list.current_value, style="class:accepted") + + @kb.add("c-c") + @kb.add("") + def _keyboard_interrupt(event: KeyPressEvent) -> None: + """Abort when Control-C has been pressed.""" + event.app.exit(exception=KeyboardInterrupt(), style="class:aborting") + + return Application( + layout=layout, + full_screen=False, + key_bindings=kb, + ) + + def _create_radio_list(self) -> RadioList[T]: + return RadioList( + values=self.options, + default=self.default, + select_on_focus=True, + open_character="", + select_character=">", + close_character="", + show_cursor=False, + show_numbers=True, + container_style="class:input-selection", + default_style="class:option", + selected_style="", + checked_style="class:selected-option", + number_style="class:number", + show_scrollbar=False, + ) diff --git a/tests/commands/conftest.py b/tests/commands/conftest.py index e902c19..ab12890 100644 --- a/tests/commands/conftest.py +++ b/tests/commands/conftest.py @@ -5,11 +5,21 @@ import functools import typing as t +from unittest.mock import MagicMock +import click +import openapi_pydantic as oa +import prompt_toolkit import pytest from click.testing import CliRunner +from prompt_toolkit.formatted_text import AnyFormattedText +import globus_registered_api.cli as cli_module +import globus_registered_api.rendering.prompt.multiselector as multiselector_module +import globus_registered_api.rendering.prompt.selector as selector_module from globus_registered_api.cli import cli as root_gra_command +from globus_registered_api.config import CoreConfig +from globus_registered_api.config import RegisteredAPIConfig @pytest.fixture @@ -24,3 +34,165 @@ def test_something(invoke_gra): """ runner = CliRunner() yield functools.partial(runner.invoke, root_gra_command) + + +@pytest.fixture(autouse=True) +def patched_globusapp(monkeypatch): + """ + Always patch out the creation of a GlobusApp to avoid real authentication attempts. + """ + monkeypatch.setattr(cli_module, "_create_globus_app", lambda: MagicMock()) + + +@pytest.fixture +def openapi_schema() -> oa.OpenAPI: + schema = { + "openapi": "3.1.0", + "info": {"title": "Minimal API", "version": "1.0.0"}, + "paths": { + "/example": { + "get": {"summary": "Example GET endpoint"}, + "post": {"summary": "Example POST endpoint"}, + } + }, + } + return oa.OpenAPI.model_validate(schema) + + +@pytest.fixture +def config(openapi_schema) -> RegisteredAPIConfig: + core = CoreConfig( + base_url="https://api.example.com", + specification=openapi_schema, + ) + config = RegisteredAPIConfig(core=core, targets=[], roles=[]) + return config + + +@pytest.fixture +def prompt_patcher(monkeypatch): + return PromptPatcher(monkeypatch) + + +_PromptType = t.Literal[ + "click_prompt", + "prompt_toolkit_prompt", + "confirmation", + "selection", + "multiselection", +] +T = t.TypeVar("T") + + +class PromptPatcher: + """ + Utility to aid tests in patching out user input from various prompt types: + + Supported prompt types: + - click_prompt: click.prompt + - prompt_toolkit_prompt: prompt_toolkit.prompt + - confirmation: click.confirm + - selection: rendering.prompt_selection + - Input are compared against the supplied list of keys. + If a match is found the corresponding value is returned, otherwise + the input is returned as the value. + - multiselection: rendering.prompt_multiselection + - Similar behavior to `selection`, both keys and values are accepted. + """ + + def __init__(self, monkeypatch: pytest.MonkeyPatch) -> None: + self._monkeypatch = monkeypatch + + self._click_prompt_responses = self._patch_function(click, "prompt") + self._confirm_responses = self._patch_function(click, "confirm") + self._prompt_toolkit_responses = self._patch_function(prompt_toolkit, "prompt") + + # Selectors and MultiSelector get special handling to account for key mapping. + self._select_responses = self._patch_selector( + selector_module, + "Selector", + ) + self._multiselect_responses = self._patch_selector( + multiselector_module, + "MultiSelector", + ) + + def add_input(self, prompt_type: _PromptType, response: t.Any) -> None: + if prompt_type == "click_prompt": + self._click_prompt_responses.append(response) + elif prompt_type == "confirmation": + self._confirm_responses.append(response) + elif prompt_type == "prompt_toolkit_prompt": + self._prompt_toolkit_responses.append(response) + elif prompt_type == "selection": + self._select_responses.append(response) + elif prompt_type == "multiselection": + self._multiselect_responses.append(response) + else: + raise ValueError(f"Invalid prompt type: {prompt_type}") + + def _patch_function(self, target: object, func_name: str) -> list[t.Any]: + """ + Monkeypatch out a target's function. + + :target: The object containing the element (func_name). + :func_name: The attribute to patch on the target. + :return: An empty list of responses to be subsequently populated by tests. + """ + responses: list[t.Any] = [] + response_idx = 0 + + def return_responses(*args, **kwargs): + nonlocal response_idx + if response_idx >= len(responses): + name = f"{target.__name__}.{func_name}" + raise AssertionError(f"Ran out of prompt inputs for function '{name}'") + resp = responses[response_idx] + + response_idx += 1 + return resp + + self._monkeypatch.setattr(target, func_name, return_responses) + return responses + + def _patch_selector(self, target: object, clazz: str) -> list[t.Any]: + """ + Monkeypatch out a target's Selector-style class. + + In addition to regular response values, this patching will compare responses + against user-facing keys. So if an option is (complex_value, "user key"), the + response may be passed as either `complex_value` or `"user key"`. + + :target: The object containing the element (clazz). + :clazz: The class to patch on the target. + :meth: The method of the class to patch to return responses. + :return: An empty list of responses to be subsequently populated by tests. + """ + responses: list[t.Any] = [] + response_idx = 0 + + class PatchedSelector(t.Generic[T]): + def __init__( + self, /, options: t.Sequence[tuple[T, AnyFormattedText]], **__: t.Any + ) -> None: + self.value_map = {k: v for v, k in options if isinstance(k, str)} + + def prompt(self) -> t.Any: + nonlocal response_idx + if response_idx >= len(responses): + name = f"{target.__name__}.{clazz}.prompt" + raise AssertionError(f"Ran out of prompt inputs for '{name}'") + resp = responses[response_idx] + if clazz == "Selector": + if isinstance(resp, str) and resp in self.value_map: + resp = self.value_map[resp] + elif clazz == "MultiSelector": + for i, r in enumerate(resp): + if isinstance(r, str) and r in self.value_map: + resp[i] = self.value_map[r] + + response_idx += 1 + return resp + + self._monkeypatch.setattr(target, clazz, PatchedSelector) + return responses diff --git a/tests/commands/manage/test_dispatch.py b/tests/commands/manage/test_dispatch.py new file mode 100644 index 0000000..45b2081 --- /dev/null +++ b/tests/commands/manage/test_dispatch.py @@ -0,0 +1,48 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from uuid import uuid4 + +from globus_registered_api.commands.manage.domain import BACK_SENTINEL +from globus_registered_api.commands.manage.domain import EXIT_SENTINEL +from globus_registered_api.config import RoleConfig + + +def test_manage_dispatch_requires_a_config(gra): + result = gra("manage") + + assert result.exit_code == 1 + assert "Missing repository config file" in result.output + assert "gra init" in result.output + + +def test_manage_dispatch_selecting_exit(prompt_patcher, config, gra): + config.commit() + + prompt_patcher.add_input("selection", EXIT_SENTINEL) + + result = gra("manage", catch_exceptions=False) + + assert result.exit_code == 0 + assert result.output == "" + + +def test_manage_dispatch_selecting_subcommands(prompt_patcher, config, gra): + group_id = uuid4() + config.roles.append(RoleConfig(type="group", id=group_id, access_level="owner")) + config.commit() + + prompt_patcher.add_input("selection", "Roles") + prompt_patcher.add_input("selection", "List Roles") + prompt_patcher.add_input("selection", BACK_SENTINEL) + prompt_patcher.add_input("selection", "Targets") + prompt_patcher.add_input("selection", "List Targets") + prompt_patcher.add_input("selection", EXIT_SENTINEL) + + result = gra("manage", catch_exceptions=False) + + assert result.exit_code == 0 + assert str(group_id) in result.output + assert "Path" in result.output diff --git a/tests/commands/manage/test_roles.py b/tests/commands/manage/test_roles.py new file mode 100644 index 0000000..6a7a72b --- /dev/null +++ b/tests/commands/manage/test_roles.py @@ -0,0 +1,222 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +import enum +from unittest.mock import MagicMock +from uuid import UUID +from uuid import uuid4 + +import pytest + +from globus_registered_api.commands.manage.domain import ManageContext +from globus_registered_api.commands.manage.roles import RoleConfigurator +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.config import RoleConfig + + +class Identity(enum.Enum): + """ + The suite of identities to be "registered" in auth. + + These are patched out in `setup_globus_responses` to be returned by + mocked `auth.get_identities` calls. + """ + + Alice = "alice@gmail.com" + Bob = "bob@hotmail.edu" + Carol = "carol@globus.org" + + def __init__(self, username: str) -> None: + self.id = uuid4() + self.username = username + self.data = {"id": str(self.id), "username": username} + + @classmethod + def get_identities(cls, ids: list[UUID]) -> dict[str, list[dict[str, str]]]: + return {"identities": [identity.data for identity in cls if identity.id in ids]} + + +class Group(enum.Enum): + """ + The suite of groups to be "registered" in groups. + + These are patched out in `setup_globus_responses` to be returned by mocked + `groups.get_my_groups` and `search.post_search` calls. + (Only two are returned by `get_my_groups`) + """ + + Leos = "leos" + Pisceses = "pisceses" + Toruses = "toruses" + + def __init__(self, name: str) -> None: + self.id = uuid4() + # Note: `name` is a protected attribute of Enum. + self.gname = name + + self.data = {"id": str(self.id), "name": name} + + @classmethod + def get_my_groups(cls) -> list[dict[str, str]]: + return [cls.Leos.data, cls.Pisceses.data] + + @classmethod + def search_paginated_groups(cls, _: str, query: dict): + filter_ids = [UUID(gid) for gid in query["filters"][0]["values"]] + gmeta = [{"entries": [{"content": g.data}]} for g in cls if g.id in filter_ids] + resp = MagicMock() + resp.pages.return_value = [{"gmeta": gmeta}] + return resp + + +@pytest.fixture(autouse=True) +def setup_globus_responses(mock_auth_client, mock_groups_client, mock_search_client): + """ + Configure: + * auth-service -> response from `get_identities` + * groups-service -> response from `get_my_groups` + * search-service -> response from `post_search` against the groups index. + """ + mock_auth_client.get_identities.side_effect = Identity.get_identities + mock_groups_client.get_my_groups.side_effect = Group.get_my_groups + + mock_search_client.paginated.post_search = Group.search_paginated_groups + + +@pytest.fixture +def role_configurator(config): + ctx = ManageContext(config=config, analysis=MagicMock(), globus_app=MagicMock()) + return RoleConfigurator(ctx) + + +def test_role_management_add_group(prompt_patcher, role_configurator): + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "Group") + prompt_patcher.add_input("selection", Group.Leos.id) + prompt_patcher.add_input("selection", "owner") + + # Execute + role_configurator.add_role() + + # Verify we've added the expected role to the config and committed it. + expected = RoleConfig(type="group", id=Group.Leos.id, access_level="owner") + assert RegisteredAPIConfig.load().roles == [expected] + + +def test_role_management_add_identity(prompt_patcher, role_configurator): + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "User") + prompt_patcher.add_input("click_prompt", Identity.Alice.id) + prompt_patcher.add_input("confirmation", True) + prompt_patcher.add_input("selection", "viewer") + + # Execute + role_configurator.add_role() + + # Verify we've added the expected role to the config and committed it. + expected = RoleConfig(type="identity", id=Identity.Alice.id, access_level="viewer") + assert RegisteredAPIConfig.load().roles == [expected] + + +def test_role_management_add_duplicate_identity_is_rejected( + prompt_patcher, role_configurator, config, capsys +): + # Configure a role to be duplicated. + role = RoleConfig(type="identity", id=Identity.Alice.id, access_level="viewer") + config.roles = [role] + config.commit() + + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "User") + prompt_patcher.add_input("click_prompt", Identity.Alice.id) + prompt_patcher.add_input("confirmation", True) + + # Execute + role_configurator.add_role() + + # Verify that Alice still has viewer access and that we printed a warning. + expected = RoleConfig(type="identity", id=Identity.Alice.id, access_level="viewer") + assert RegisteredAPIConfig.load().roles == [expected] + + outstream = capsys.readouterr().out + assert "use the 'Modify Role' option instead" in outstream + + +def test_role_management_remove_role(prompt_patcher, role_configurator, config): + # Configure a role to be removed. + initial_role = RoleConfig(type="group", id=Group.Pisceses.id, access_level="viewer") + config.roles = [initial_role] + config.commit() + + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", initial_role) + prompt_patcher.add_input("confirmation", True) + + # Execute + role_configurator.remove_role() + + # Verify we've removed the role from the config and committed it. + assert RegisteredAPIConfig.load().roles == [] + + +def test_role_management_modify_role(prompt_patcher, role_configurator, config): + # Configure some roles to be displayed. + leos = RoleConfig(type="group", id=Group.Leos.id, access_level="owner") + bob = RoleConfig(type="identity", id=Identity.Bob.id, access_level="viewer") + config.roles = [leos, bob] + config.commit() + + # Set up a selection to be made by the mocked selector. + prompt_patcher.add_input("selection", bob) + prompt_patcher.add_input("selection", "admin") + + # Execute + role_configurator.modify_role() + + # Verify we've updated the role in the config and committed it. + old_bob = RoleConfig(type="identity", id=Identity.Bob.id, access_level="viewer") + expected_bob = RoleConfig(type="identity", id=Identity.Bob.id, access_level="admin") + committed_roles = RegisteredAPIConfig.load().roles + assert expected_bob in committed_roles + assert old_bob not in committed_roles + assert leos in committed_roles + + +def test_role_management_list_roles_group_resolution(role_configurator, config, capsys): + # Configure some roles to be displayed. + leos = RoleConfig(type="group", id=Group.Leos.id, access_level="owner") + toruses = RoleConfig(type="group", id=Group.Toruses.id, access_level="viewer") + other = RoleConfig(type="group", id=uuid4(), access_level="admin") + config.roles = [leos, toruses, other] + config.commit() + + # Execute + role_configurator.list_roles() + + # Verify that known groups were rendered with names and unknown ones with IDs. + outstream = capsys.readouterr().out + assert str(other.id) in outstream + for group in (Group.Leos, Group.Toruses): + assert group.gname in outstream + assert str(group.id) not in outstream + + +def test_role_management_list_roles_identity_resolution( + role_configurator, config, capsys +): + # Configure some roles to be displayed. + alice = RoleConfig(type="identity", id=Identity.Alice.id, access_level="owner") + other = RoleConfig(type="identity", id=uuid4(), access_level="viewer") + config.roles = [alice, other] + config.commit() + + # Execute + role_configurator.list_roles() + + # Verify that known users were rendered with usernames and unknown ones with IDs. + outstream = capsys.readouterr().out + assert str(other.id) in outstream + assert Identity.Alice.username in outstream + assert str(Identity.Alice.id) not in outstream diff --git a/tests/commands/manage/test_targets.py b/tests/commands/manage/test_targets.py new file mode 100644 index 0000000..23e765b --- /dev/null +++ b/tests/commands/manage/test_targets.py @@ -0,0 +1,176 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import MagicMock + +import pytest +from rich.console import Console + +import globus_registered_api.commands.manage.targets as targets_module +from globus_registered_api.commands.manage.domain import ManageContext +from globus_registered_api.commands.manage.targets import TargetConfigurator +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.config import TargetConfig +from globus_registered_api.openapi import OpenAPISpecAnalyzer + + +@pytest.fixture +def rich_disabled_colors(monkeypatch): + monkeypatch.setattr(targets_module, "console", Console(color_system=None)) + + +@pytest.fixture +def target_configurator(config): + analysis = OpenAPISpecAnalyzer().analyze(config.core.specification) + ctx = ManageContext(config=config, analysis=analysis, globus_app=MagicMock()) + return TargetConfigurator(ctx) + + +def test_target_management_add_target(prompt_patcher, target_configurator): + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "/example (GET)") + prompt_patcher.add_input("click_prompt", "get-example") + prompt_patcher.add_input("confirmation", False) # Skip scope configurations. + + target_configurator.add_target() + + # Verify we've added the expected target to the config and committed it. + expected = TargetConfig(path="/example", method="GET", alias="get-example") + assert RegisteredAPIConfig.load().targets == [expected] + + +def test_target_management_add_target_with_manual_scopes( + prompt_patcher, target_configurator +): + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "/example (GET)") + prompt_patcher.add_input("click_prompt", "get-example") + prompt_patcher.add_input("confirmation", True) # Configure scopes. + prompt_patcher.add_input("multiselection", ["example:read", "example:write"]) + + target_configurator.add_target() + + # Verify we've added the expected target to the config and committed it. + expected = TargetConfig( + path="/example", + method="GET", + alias="get-example", + scope_strings=["example:read", "example:write"], + ) + assert RegisteredAPIConfig.load().targets == [expected] + + +def test_target_management_add_target_with_defined_scopes( + prompt_patcher, config, capsys, rich_disabled_colors +): + # Update the spec to define scopes for a target. + config.core.specification.paths["/example"].get.security = [ + {"GlobusAuth": ["example:read"]}, + {"GlobusAuth": ["example:write"]}, + ] + config.commit() + + # Re-analyze the updated specification instead of using the fixture-provided one. + analysis = OpenAPISpecAnalyzer().analyze(config.core.specification) + ctx = ManageContext(config=config, analysis=analysis, globus_app=MagicMock()) + target_configurator = TargetConfigurator(ctx) + + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "/example (GET)") + prompt_patcher.add_input("click_prompt", "get-example") + + target_configurator.add_target() + + expected = TargetConfig(path="/example", method="GET", alias="get-example") + assert RegisteredAPIConfig.load().targets == [expected] + + # Spec-defined scopes are not committed to config, but are displayed as "imputed". + outstream = capsys.readouterr().out + assert " example:read" in outstream + assert " example:write" in outstream + + +def test_target_management_add_manual_target(prompt_patcher, target_configurator): + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", "") + prompt_patcher.add_input("click_prompt", "/manual") + prompt_patcher.add_input("selection", "POST") + prompt_patcher.add_input("click_prompt", "post-manual") + prompt_patcher.add_input("confirmation", False) # Skip scope configurations. + + target_configurator.add_target() + + expected = TargetConfig(path="/manual", method="POST", alias="post-manual") + assert RegisteredAPIConfig.load().targets == [expected] + + +def test_target_management_list_targets( + prompt_patcher, config, target_configurator, capsys +): + # Add some targets to the config. + config.targets = [ + TargetConfig(path="/example", method="GET", alias="get-example"), + TargetConfig(path="/example", method="POST", alias="post-example"), + ] + config.commit() + + target_configurator.list_targets() + + outstream = capsys.readouterr().out + for key in ("/example", "GET", "POST", "get-example", "post-example"): + assert key in outstream + + +def test_target_management_display_target( + prompt_patcher, config, target_configurator, rich_disabled_colors, capsys +): + # Add some targets to the config. + get_target = TargetConfig(path="/example", method="GET", alias="get-example") + post_target = TargetConfig(path="/example", method="POST", alias="post-example") + config.targets = [get_target, post_target] + config.commit() + + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", get_target) + + target_configurator.display_target() + + outstream = capsys.readouterr().out + assert "TargetConfig" in outstream + assert "path='/example'" in outstream + assert "method='GET'" in outstream + assert "alias='get-example'" in outstream + + +def test_target_management_remove_target(prompt_patcher, config, target_configurator): + # Add some targets to the config. + get_target = TargetConfig(path="/example", method="GET", alias="get-example") + post_target = TargetConfig(path="/example", method="POST", alias="post-example") + config.targets = [get_target, post_target] + config.commit() + + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", get_target) + + target_configurator.remove_target() + + assert RegisteredAPIConfig.load().targets == [post_target] + + +def test_target_management_modify_target(prompt_patcher, config, target_configurator): + # Add a target to the config. + target = TargetConfig(path="/example", method="GET", alias="get-example") + config.targets = [target] + config.commit() + + # Set up a sequence of selections to be made by the mocked selector. + prompt_patcher.add_input("selection", target) + prompt_patcher.add_input("click_prompt", "get-example-updated") + prompt_patcher.add_input("multiselection", []) + + target_configurator.modify_target() + + expected = TargetConfig(path="/example", method="GET", alias="get-example-updated") + assert RegisteredAPIConfig.load().targets == [expected] diff --git a/tests/commands/test_init.py b/tests/commands/test_init.py new file mode 100644 index 0000000..0e463fa --- /dev/null +++ b/tests/commands/test_init.py @@ -0,0 +1,120 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +from uuid import UUID + +import responses + +from globus_registered_api.config import RegisteredAPIConfig +from globus_registered_api.config import RoleConfig +from globus_registered_api.openapi.loader import load_openapi_spec + + +def test_init_errors_if_config_exists(gra, config): + config.commit() + + result = gra(["init"]) + + assert result.exit_code != 0 + assert "Cannot re-initialize" in result.output + assert "gra manage" in result.output + + +def test_init_gives_the_caller_owner_permissions(gra, prompt_patcher, mock_auth_client): + user_id = mock_auth_client.userinfo()["sub"] + + # Set up a sequence of inputs to be made by the mocked user. + prompt_patcher.add_input("confirmation", False) # No I don't have an OpenAPI spec. + prompt_patcher.add_input("click_prompt", "Test Service") + prompt_patcher.add_input("click_prompt", "https://api.testservice.com") + + gra(["init"], catch_exceptions=False) + + expected = RoleConfig(type="identity", id=UUID(user_id), access_level="owner") + assert RegisteredAPIConfig.load().roles == [expected] + + +def test_init_service_without_openapi_spec(gra, prompt_patcher): + # Set up a sequence of inputs to be made by the mocked user. + prompt_patcher.add_input("confirmation", False) # No I don't have an OpenAPI spec. + prompt_patcher.add_input("click_prompt", "Test Service") + prompt_patcher.add_input("click_prompt", "https://api.testservice.com") + + result = gra(["init"], catch_exceptions=False) + + assert result.exit_code == 0 + assert "Successfully initialized repository!" in result.output + + RegisteredAPIConfig.load() + + +def test_init_service_with_local_openapi_spec(gra, prompt_patcher, spec_path): + local_spec_path = str(spec_path("minimal.json")) + + # Set up a sequence of inputs to be made by the mocked user. + prompt_patcher.add_input("confirmation", True) # Yes, I have an OpenAPI spec. + prompt_patcher.add_input("prompt_toolkit_prompt", local_spec_path) + prompt_patcher.add_input("confirmation", True) # Use the server as base url. + + result = gra(["init"], catch_exceptions=False) + + assert result.exit_code == 0 + assert "Successfully initialized repository!" in result.output + + specification = load_openapi_spec(local_spec_path) + config = RegisteredAPIConfig.load() + assert config.core.specification == local_spec_path + assert config.core.base_url == specification.servers[0].url + + +def test_init_service_with_remote_openapi_spec(gra, prompt_patcher): + remote_spec_url = "https://random-domain.com/openapi.json" + specification = { + "openapi": "3.1.0", + "info": {"title": "Remote API", "version": "1.0.0"}, + } + responses.get(remote_spec_url, json=specification) + + # Set up a sequence of inputs to be made by the mocked user. + prompt_patcher.add_input("confirmation", True) # Yes, I have an OpenAPI spec. + prompt_patcher.add_input("prompt_toolkit_prompt", remote_spec_url) + prompt_patcher.add_input("click_prompt", "https://api.remote-service.com") + + result = gra(["init"], catch_exceptions=False) + + assert result.exit_code == 0 + assert "Successfully initialized repository!" in result.output + + config = RegisteredAPIConfig.load() + assert config.core.specification == remote_spec_url + assert config.core.base_url == "https://api.remote-service.com" + + +def test_init_service_with_multiple_servers(gra, prompt_patcher): + remote_spec_url = "https://random-domain.com/openapi.json" + specification = { + "openapi": "3.1.0", + "info": {"title": "Remote API", "version": "1.0.0"}, + "servers": [ + {"url": "https://api.server1.com"}, + {"url": "https://api.server2.com"}, + ], + } + responses.get(remote_spec_url, json=specification) + + # Set up a sequence of inputs to be made by the mocked user. + prompt_patcher.add_input("confirmation", True) # Yes, I have an OpenAPI Spec. + prompt_patcher.add_input("prompt_toolkit_prompt", remote_spec_url) + prompt_patcher.add_input("confirmation", True) # Yes, use of the servers. + prompt_patcher.add_input("selection", "https://api.server2.com") + + result = gra(["init"], catch_exceptions=False) + + assert result.exit_code == 0 + assert "Successfully initialized repository!" in result.output + + config = RegisteredAPIConfig.load() + assert config.core.specification == remote_spec_url + assert config.core.base_url == "https://api.server2.com" diff --git a/tests/conftest.py b/tests/conftest.py index cba2852..97625bf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ import pytest import responses -import globus_registered_api.clients +import globus_registered_api.clients as src_clients import globus_registered_api.config from globus_registered_api import ExtendedFlowsClient @@ -91,49 +91,59 @@ def __getitem__(self, key: str) -> t.Any: @pytest.fixture(autouse=True) -def mock_auth_client(monkeypatch): +def config_path(monkeypatch, tmp_path): """ - Fixture that patches _create_auth_client and returns a configured mock. + Fixture that patches the config path to a temporary directory for all tests. - Usage: - def test_something(mock_auth_client): - # _create_auth_client is already patched and returns a mock - # with userinfo() returning {"preferred_username": "testuser", ...} + Ensure that tests don't write to the runners invocation directory. """ + new_path = tmp_path / ".globus_registered_api/config.json" + monkeypatch.setattr(globus_registered_api.config, "_CONFIG_PATH", new_path) + + yield new_path + + +@pytest.fixture(autouse=True) +def mock_auth_client(monkeypatch): + """Fixture that patches create_auth_client and returns a configured MagicMock.""" client = MagicMock() - client.userinfo.return_value = MockResponse( - {"preferred_username": "testuser", "email": "test@example.com"} - ) - monkeypatch.setattr( - globus_registered_api.clients, - "AuthClient", - lambda *args, **kwargs: client, - ) + monkeypatch.setattr(src_clients, "AuthClient", lambda *_, **__: client) + + # Set up a default userinfo response. + resp = { + "preferred_username": "testuser", + "email": "test@example.com", + "sub": "00000000-0000-0000-0000-000000000000", + } + client.userinfo.return_value = MockResponse(resp) return client @pytest.fixture(autouse=True) -def config_path(monkeypatch, tmp_path): - """ - Fixture that patches the config path to a temporary directory for all tests. +def mock_groups_client(monkeypatch): + """Fixture that patches create_groups_client and returns a configured MagicMock.""" + client = MagicMock() + monkeypatch.setattr(src_clients, "GroupsClient", lambda *_, **__: client) + return client - Ensure that tests don't write to the runners invocation directory. - """ - config_path = tmp_path / ".globus_registered_api/config.json" - monkeypatch.setattr(globus_registered_api.config, "_CONFIG_PATH", config_path) - yield config_path +@pytest.fixture(autouse=True) +def mock_search_client(monkeypatch): + """Fixture that patches create_search_client and returns a configured MagicMock.""" + client = MagicMock() + monkeypatch.setattr(src_clients, "SearchClient", lambda *_, **__: client) + return client @pytest.fixture(autouse=True) def mock_flows_client(monkeypatch): """ - Fixture that patches ExtendedFlowsClient and returns a mock instance. + Fixture that patches ExtendedFlowsClient with a pre-initialized instance. + + Note: + Unlike other clients, flows is only patched to prevent GlobusApp-binding. + Calls will be made against the real api domains (but intercepted by responses). """ client = ExtendedFlowsClient() - monkeypatch.setattr( - globus_registered_api.clients, - "ExtendedFlowsClient", - lambda *args, **kwargs: client, - ) + monkeypatch.setattr(src_clients, "ExtendedFlowsClient", lambda *_, **__: client) return client diff --git a/tests/rendering/prompt/test_selection.py b/tests/rendering/prompt/test_selection.py new file mode 100644 index 0000000..e0395d0 --- /dev/null +++ b/tests/rendering/prompt/test_selection.py @@ -0,0 +1,139 @@ +# This file is a part of globus-registered-api. +# https://github.com/globus/globus-registered-api +# Copyright 2025-2026 Globus +# SPDX-License-Identifier: Apache-2.0 + +import typing as t + +import pytest +from prompt_toolkit.application import create_app_session +from prompt_toolkit.input import PipeInput +from prompt_toolkit.input import create_pipe_input +from prompt_toolkit.output import DummyOutput + +from globus_registered_api.rendering import prompt_multiselection +from globus_registered_api.rendering import prompt_selection + +KEY_UP = "\x1b[A" +KEY_DOWN = "\x1b[B" +KEY_ENTER = "\r" + + +@pytest.fixture +def input_simulator() -> t.Iterator[PipeInput]: + """ + A PipeInput which will be patched in to any PromptToolkit application created in + the context of this fixture. + + Usage: + def test_something(input_simulator): + input_simulator.send_text('inputdata') + + response = prompt_toolkit.shortcuts.prompt('> ') + assert response == 'inputdata' + """ + + with create_pipe_input() as pipe_input: + with create_app_session(input=pipe_input, output=DummyOutput()): + yield pipe_input + + +@pytest.mark.parametrize( + "input_sequence, expected_response", + [ + (KEY_ENTER, 1), # Select first option + (KEY_DOWN + KEY_ENTER, 2), # Select second option + (KEY_DOWN * 2 + KEY_ENTER, 3), # Select third option + (KEY_DOWN * 3 + KEY_ENTER, 3), # Don't allow going below the last option + (KEY_UP + KEY_ENTER, 1), # Don't allow going above the first option + ], +) +def test_prompt_selection(input_sequence, expected_response, input_simulator): + """ + Rendering + ========= + Select a Color: + > Red + Green + Blue + + """ + input_simulator.send_text(input_sequence) + + options = [(1, "Red"), (2, "Green"), (3, "Blue")] + response = prompt_selection("Color", options) + + assert response == expected_response + + +def test_prompt_selection_with_default(input_simulator): + input_simulator.send_text(KEY_ENTER) + + options = [(1, "Red"), (2, "Green"), (3, "Blue")] + response = prompt_selection("Color", options, default=2) + + assert response == 2 + + +def test_prompt_multiselection(input_simulator): + """ + Rendering + ========= + Select one or more Colors: + > Red + Green + Blue + + """ + input_simulator.send_text(KEY_ENTER) # Select the first option. + input_simulator.send_text(KEY_DOWN + KEY_ENTER) # Select the second option. + input_simulator.send_text(KEY_DOWN * 2 + KEY_ENTER) # Select the submit button. + + options = [(1, "Red"), (2, "Green"), (3, "Blue")] + response = prompt_multiselection("Color", options) + + assert response == [1, 2] + + +def test_prompt_multiselection_with_defaults(input_simulator): + # When defaults are provided, "Submit" will be hovered at the start, so just enter. + input_simulator.send_text(KEY_ENTER) + + options = [(1, "Red"), (2, "Green"), (3, "Blue")] + response = prompt_multiselection("Color", options, defaults=[1, 2]) + + assert response == [1, 2] + + +def test_prompt_multiselection_deselection(input_simulator): + input_simulator.send_text(KEY_ENTER) # Select the first option. + input_simulator.send_text(KEY_ENTER) # Deselect the first option. + input_simulator.send_text(KEY_DOWN * 4 + KEY_ENTER) # Select the submit button. + + options = [(1, "Red"), (2, "Green"), (3, "Blue")] + response = prompt_multiselection("Color", options) + + assert response == [] + + +def test_prompt_multiselection_enter_custom_input(input_simulator): + """ + Rendering + ========= + Select one or more Colors: + > Red + Green + Blue + + + """ + + # Select the 'Enter custom input' option. + input_simulator.send_text(KEY_DOWN * 3 + KEY_ENTER) + input_simulator.send_text("pink" + KEY_ENTER) # Enter "pink" as the custom input. + input_simulator.send_text(KEY_ENTER) # Select the submit button. + + options = [("red", "Red"), ("green", "Green"), ("blue", "Blue")] + response = prompt_multiselection("Color", options, custom_input=True) + + assert response == ["pink"]