Skip to content
86 changes: 86 additions & 0 deletions bioblend/_tests/TestGalaxyUsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,89 @@ def test_create_user_apikey(self):
# Test regenerating an API key for a user that already has one
regenerated_apikey = self.gi.users.create_user_apikey(new_user_id)
assert regenerated_apikey and regenerated_apikey not in (new_apikey, "Not available.")

@test_util.skip_unless_galaxy("release_25.1")
def test_get_credentials_empty(self):
user_id = self.gi.users.get_current_user()["id"]
creds = self.gi.users.get_credentials(user_id, source_id="nonexistent_tool_id")
assert creds == []

@test_util.skip_unless_galaxy("release_25.1")
def test_create_and_get_credentials(self):
# Requires a Galaxy tool with credential definitions; skip if unavailable
user_id = self.gi.users.get_current_user()["id"]
tool_id = "random_lines1"
tool = self.gi.tools.show_tool(tool_id)
try:
cred = self.gi.users.create_credentials(
user_id=user_id,
source_type="tool",
source_id=tool_id,
source_version=tool["version"],
service_name="test_service",
service_version="1.0",
group_name="default",
variables=[{"name": "api_url", "value": "https://example.org"}],
secrets=[{"name": "api_key", "value": "secret123"}],
)
except ConnectionError as e:
if "does not require any credentials" in str(e) or "not defined" in str(e):
self.skipTest("Test tool does not have credential definitions")
Comment thread
dannon marked this conversation as resolved.
Outdated
raise
assert cred is not None
creds = self.gi.users.get_credentials(user_id, source_id=tool_id)
assert len(creds) >= 1
matching = [c for c in creds if c["name"] == "test_service"]
assert len(matching) == 1
assert matching[0]["source_id"] == tool_id
assert matching[0]["version"] == "1.0"
assert len(matching[0]["groups"]) >= 1

@test_util.skip_unless_galaxy("release_25.1")
def test_get_credentials_for_tool(self):
# Requires a Galaxy tool with credential definitions; skip if unavailable
user_id = self.gi.users.get_current_user()["id"]
tool_id = "random_lines1"
tool = self.gi.tools.show_tool(tool_id)
try:
group = self.gi.users.create_credentials(
user_id=user_id,
source_type="tool",
source_id=tool_id,
source_version=tool["version"],
service_name="test_service_run",
service_version="2.0",
group_name="default",
secrets=[{"name": "token", "value": "abc123"}],
)
except ConnectionError as e:
if "does not require any credentials" in str(e) or "not defined" in str(e):
self.skipTest("Test tool does not have credential definitions")
raise
# Set the active credential group (not auto-set on creation)
creds = self.gi.users.get_credentials(user_id, source_id=tool_id)
matching = [c for c in creds if c["name"] == "test_service_run"]
self.gi.users.select_credential_group(
user_id=user_id,
source_type="tool",
source_id=tool_id,
source_version=tool["version"],
user_credentials_id=matching[0]["id"],
group_id=group["id"],
)
context = self.gi.users.get_credentials_for_tool(user_id, tool_id, tool_version=tool["version"])
assert context is not None
assert len(context) >= 1
entries = [e for e in context if e["name"] == "test_service_run"]
assert len(entries) == 1
entry = entries[0]
assert "user_credentials_id" in entry
assert entry["version"] == "2.0"
assert "id" in entry["selected_group"]
assert entry["selected_group"]["name"] == "default"

@test_util.skip_unless_galaxy("release_25.1")
def test_get_credentials_for_tool_none(self):
user_id = self.gi.users.get_current_user()["id"]
context = self.gi.users.get_credentials_for_tool(user_id, "nonexistent_tool_id")
assert context is None
13 changes: 12 additions & 1 deletion bioblend/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ def run_tool(
tool_inputs: InputsBuilder | dict,
input_format: Literal["21.01", "legacy"] = "legacy",
data_manager_mode: Literal["populate", "dry_run", "bundle"] | None = None,
credentials_context: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""
Runs tool specified by ``tool_id`` in history indicated
Expand All @@ -404,6 +405,13 @@ def run_tool(

'bundle' will create a data manager bundle that can be imported on other Galaxy servers.

:type credentials_context: list of dicts
:param credentials_context: list of credential context dicts for tools
that require credentials (e.g. API keys). Each dict should contain
``user_credentials_id``, ``name``, ``version``, and ``selected_group``
(with ``id`` and ``name`` keys). Obtain credential IDs by storing
credentials via the Galaxy credentials API first.

:type tool_inputs: dict
:param tool_inputs: dictionary of input datasets and parameters
for the tool (see below)
Expand Down Expand Up @@ -466,7 +474,7 @@ def run_tool(
You can also check the examples in `Galaxy's API test suite
<https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy_test/api/test_tools.py>`_.
"""
payload: dict[str, str | dict] = {
payload: dict[str, Any] = {
"history_id": history_id,
"tool_id": tool_id,
"input_format": input_format,
Expand All @@ -480,6 +488,9 @@ def run_tool(
if data_manager_mode:
payload["data_manager_mode"] = data_manager_mode

if credentials_context is not None:
payload["credentials_context"] = credentials_context

return self._post(payload)

def upload_file(
Expand Down
196 changes: 196 additions & 0 deletions bioblend/galaxy/users/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,199 @@ def update_user(self, user_id: str, user_data: dict | None = None, **kwargs: Any
user_data.update(kwargs)
url = self._make_url(user_id) + "/information/inputs"
return self._put(url=url, payload=user_data, id=user_id)

def get_credentials(
self,
user_id: str,
source_type: str = "tool",
source_id: str | None = None,
source_version: str | None = None,
) -> list[dict[str, Any]]:
"""
Get stored credentials for a user, optionally filtered by tool.

:type user_id: str
:param user_id: encoded user ID

:type source_type: str
:param source_type: credential source type (default: 'tool')

:type source_id: str
:param source_id: tool ID to filter by

:type source_version: str
:param source_version: tool version to filter by

:rtype: list of dicts
:return: list of stored credentials
"""
url = self._make_url(user_id) + "/credentials"
params: dict[str, str] = {"source_type": source_type}
if source_id is not None:
params["source_id"] = source_id
if source_version is not None:
params["source_version"] = source_version
return self._get(url=url, params=params)

def create_credentials(
self,
user_id: str,
source_type: str,
source_id: str,
source_version: str,
service_name: str,
service_version: str,
group_name: str,
variables: list[dict[str, str]] | None = None,
secrets: list[dict[str, str]] | None = None,
) -> dict[str, Any]:
"""
Store credentials for a user (e.g. API keys for external services).

:type user_id: str
:param user_id: encoded user ID

:type source_type: str
:param source_type: credential source type (e.g. 'tool')

:type source_id: str
:param source_id: tool ID

:type source_version: str
:param source_version: tool version

:type service_name: str
:param service_name: name of the credential service

:type service_version: str
:param service_version: version of the credential service

:type group_name: str
:param group_name: name for the credential group (minimum 3 characters)

:type variables: list of dicts
:param variables: list of variable dicts with 'name' and 'value' keys

:type secrets: list of dicts
:param secrets: list of secret dicts with 'name' and 'value' keys

:rtype: dict
:return: the created credential group (with ``id``, ``name``,
``variables``, ``secrets``, and ``update_time``)
"""
url = self._make_url(user_id) + "/credentials"
payload = {
"source_type": source_type,
"source_id": source_id,
"source_version": source_version,
"service_credential": {
"name": service_name,
"version": service_version,
"group": {
"name": group_name,
"variables": variables or [],
"secrets": secrets or [],
},
},
}
return self._post(url=url, payload=payload)

def select_credential_group(
self,
user_id: str,
source_type: str,
source_id: str,
source_version: str,
user_credentials_id: str,
group_id: str | None,
) -> None:
"""
Select the active credential group for a set of user credentials.
This must be called after ``create_credentials()`` before the
Comment thread
dannon marked this conversation as resolved.
Outdated
credentials can be used with ``run_tool()``.

:type user_id: str
:param user_id: encoded user ID

:type source_type: str
:param source_type: credential source type (e.g. 'tool')

:type source_id: str
:param source_id: tool ID

:type source_version: str
:param source_version: tool version

:type user_credentials_id: str
:param user_credentials_id: encoded ID of the user credentials entry

:type group_id: str or None
:param group_id: encoded ID of the credential group to activate,
or ``None`` to unset
"""
url = self._make_url(user_id) + "/credentials"
payload = {
"source_type": source_type,
"source_id": source_id,
"source_version": source_version,
"service_credentials": [
{
"user_credentials_id": user_credentials_id,
"current_group_id": group_id,
},
],
}
try:
self._put(url=url, payload=payload)
except ConnectionError as e:
if e.status_code == 204:
return None
raise

def get_credentials_for_tool(
self,
user_id: str,
tool_id: str,
tool_version: str | None = None,
) -> list[dict[str, Any]] | None:
"""
Build a credentials_context list suitable for passing to
``tools.run_tool()``. Returns None if no credentials are stored.

:type user_id: str
:param user_id: encoded user ID

:type tool_id: str
:param tool_id: tool ID to look up credentials for

:type tool_version: str
:param tool_version: tool version

:rtype: list of dicts or None
:return: credentials_context list for run_tool(), or None
"""
creds = self.get_credentials(user_id, source_type="tool", source_id=tool_id, source_version=tool_version)
if not creds:
return None
context = []
for cred in creds:
current_group_id = cred.get("current_group_id")
if not current_group_id:
continue
group_name = "default"
for group in cred.get("groups", []):
if group["id"] == current_group_id:
group_name = group.get("name", "default")
break
context.append(
{
"user_credentials_id": cred["id"],
"name": cred.get("name", ""),
"version": cred.get("version", ""),
"selected_group": {
"id": current_group_id,
"name": group_name,
},
}
)
return context if context else None