diff --git a/README.md b/README.md index ec2c41d..b123ef1 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,121 @@ To spaw a new shell within the virtual environment use: $ poetry shell ``` +## Credential Manager Library + +This is a module made to retrieve credentials from different secrets management systems like Bitwarden. +It accesses the secrets management service, looks for the desired credential and returns it in String form. + +To use the module in your python code + +### Bitwarden + +``` +from grimoirelab_toolkit.credential_manager import BitwardenManager + + +# Instantiate the Bitwarden manager using the api credentials for login +bw_manager = BitwardenManager("your_client_id", "your_client_secret", "your_master_password") + +# Login +bw_manager.login() + +# Retrieve a secret from Bitwarden +username = bw_manager.get_secret("github") +password = bw_manager.get_secret("elasticsearch") + +# Logout +bw_manager.logout() +``` + + +#### Response format + +When calling `get_secret(item_name)`, the method returns a JSON object with the following structure: + +_NOTE: the parameter "item_name" corresponds with the field "name" of the json. That's the name of the item._ +(in this case, GitHub) + + +##### Example Response + + ```json + { + "passwordHistory": [ + { + "lastUsedDate": "2024-11-05T10:27:18.411Z", + "password": "previous_password_value_1" + }, + { + "lastUsedDate": "2024-11-05T09:20:06.512Z", + "password": "previous_password_value_2" + } + ], + "revisionDate": "2025-05-11T14:40:19.456Z", + "creationDate": "2024-10-30T18:56:41.023Z", + "object": "item", + "id": "91300380-620f-4707-8de1-b21901383315", + "organizationId": null, + "folderId": null, + "type": 1, + "reprompt": 0, + "name": "GitHub", + "notes": null, + "favorite": false, + "fields": [ + { + "name": "api-token", + "value": "TOKEN" + "type": 0, + "linkedId": null + }, + { + "name": "api_key", + "value": "APIKEY", + "type": 0, + "linkedId": null + } + ], + "login": { + "uris": [], + "username": "your_username", + "password": "your_password", + "totp": null, + "passwordRevisionDate": "2024-11-05T10:27:18.411Z" + }, + "collectionIds": [], + "attachments": [] + } +``` + + Field Descriptions + + - passwordHistory: Array of previously used passwords with timestamps + - revisionDate: Last modification timestamp (ISO 8601 format) + - creationDate: Item creation timestamp (ISO 8601 format) + - object: Always "item" for credential items + - id: Unique identifier for this item + - organizationId: Organization ID if shared, null for personal items + - folderId: Folder ID if organized, null otherwise + - type: Item type (1 = login, 2 = secure note, 3 = card, 4 = identity) + - name: Display name of the credential item (name used as argument in get_secret()) + - notes: Optional notes field + - favorite: Boolean indicating if item is favorited + - fields: Array of custom fields with name-value pairs + - name: Field name + - value: Field value (can contain secrets) + - type: Field type (0 = text, 1 = hidden, 2 = boolean) + - login: Login credentials object + - username: Login username + - password: Login password + - totp: TOTP secret for 2FA (if configured) + - uris: Array of associated URIs/URLs + - passwordRevisionDate: Last password change timestamp + - collectionIds: Array of collection IDs this item belongs to + - attachments: Array of file attachments + +The module uses the [Bitwarden CLI](https://bitwarden.com/help/cli/) to interact with Bitwarden. + ## License Licensed under GNU General Public License (GPL), version 3 or later. diff --git a/grimoirelab_toolkit/credential_manager/__init__.py b/grimoirelab_toolkit/credential_manager/__init__.py new file mode 100644 index 0000000..c0ff6b2 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/__init__.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) Grimoirelab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +from .bw_manager import BitwardenManager +from .exceptions import ( + CredentialManagerError, + InvalidCredentialsError, + CredentialNotFoundError, + BitwardenCLIError, +) + +__all__ = [ + "BitwardenManager", + "CredentialManagerError", + "InvalidCredentialsError", + "CredentialNotFoundError", + "BitwardenCLIError", +] diff --git a/grimoirelab_toolkit/credential_manager/__main__.py b/grimoirelab_toolkit/credential_manager/__main__.py new file mode 100644 index 0000000..2b89c9e --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/__main__.py @@ -0,0 +1,4 @@ +from .credential_manager import main + +if __name__ == "__main__": + main() diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py new file mode 100644 index 0000000..1c07b43 --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/bw_manager.py @@ -0,0 +1,212 @@ +# +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# +import json +import subprocess +import logging +import shutil + +from .exceptions import ( + BitwardenCLIError, + InvalidCredentialsError, + CredentialNotFoundError, +) + +logger = logging.getLogger(__name__) + + +class BitwardenManager: + """Retrieve credentials from Bitwarden. + + This class defines functions to log in, retrieve secrets + and log out of Bitwarden using the Bitwarden CLI. The + workflow is: + + manager = BitwardenManager(client_id, client_secret, master_password) + manager.login() + manager.get_secret("github") + manager.get_secret("elasticsearch") + manager.logout() + + The manager logs in using the client_id, client_secret, and + master_password given as arguments when creating the instance, + so the object is reusable along the program. + + The path of Bitwarden CLI (bw) is retrieved using shutil. + """ + + def __init__(self, client_id: str, client_secret: str, master_password: str): + """ + Creates BitwardenManager object using API key authentication + + :param str client_id: Bitwarden API client ID + :param str client_secret: Bitwarden API client secret + :param str master_password: Master password for unlocking the vault + """ + # Session key of the bw session + self.session_key = None + + # API credentials + self.client_id = client_id + self.client_secret = client_secret + self.master_password = master_password + + # Get the absolute path to the bw executable + self.bw_path = shutil.which("bw") + if not self.bw_path: + raise BitwardenCLIError("Bitwarden CLI (bw) not found in PATH") + + # Set up environment variables for consistent execution context + self.env = { + "LANG": "C", + "BW_CLIENTID": client_id, + "BW_CLIENTSECRET": client_secret, + } + + def login(self) -> str | None: + """Log into Bitwarden. + + Use the API authentication key to log in and unlock the vault. After it, + it will obtain a session key that will be used by to access the vault. + + :returns: The session key for the current Bitwarden session. + + :raises InvalidCredentialsError: If invalid credentials are provided + :raises BitwardenCLIError: If Bitwarden CLI operations fail + """ + # Log in using API key + login_result = subprocess.run( + [self.bw_path, "login", "--apikey"], + input=f"{self.client_id}\n{self.client_secret}\n", + capture_output=True, + text=True, + env=self.env, + ) + + if login_result.returncode != 0: + error_msg = ( + login_result.stderr.strip() if login_result.stderr else "Unknown error" + ) + logger.error("Error logging in with API key: %s", error_msg) + raise InvalidCredentialsError( + "Invalid API credentials provided for Bitwarden" + ) + + # After login, we need to unlock the vault to get a session key + self.session_key = self._unlock_vault() + + return self.session_key + + def _unlock_vault(self) -> str: + """Unlock the vault after authentication. + + Executes the Bitwarden unlock command to obtain a session key + for an already authenticated user but locked vault. + + :returns: Session key for the unlocked vault + :raises BitwardenCLIError: If unlock operation fails or returns empty session key + """ + # this uses the master password to unlock the vault + unlock_result = subprocess.run( + [self.bw_path, "unlock", "--raw"], + input=f"{self.master_password}\n", + capture_output=True, + text=True, + env=self.env, + ) + + if unlock_result.returncode != 0: + error_msg = ( + unlock_result.stderr.strip() + if unlock_result.stderr + else "Unknown error" + ) + logger.error("Error unlocking vault: %s", error_msg) + raise BitwardenCLIError(f"Failed to unlock vault: {error_msg}") + + # the session key is used when retrieving the secrets with get_secret + session_key = unlock_result.stdout.strip() + if not session_key: + raise BitwardenCLIError("Empty session key received from unlock command") + + return session_key + + def get_secret(self, item_name: str) -> dict: + """Retrieve an item from the Bitwarden vault. + + Retrieves all the fields stored for an item with the name + provided as an argument and returns them as a dictionary. + + The returned dictionary includes fields such as: + - login: username, password, URIs, TOTP + - fields: custom fields + - notes: secure notes + - name, id, and other metadata + + :param str item_name: The name of the item to retrieve + + :returns: Dictionary containing the item data + :rtype: dict + + :raises CredentialNotFoundError: If the specific credential is not found + :raises BitwardenCLIError: If Bitwarden CLI operations fail + """ + # Pass session key via command line parameter + result = subprocess.run( + [self.bw_path, "get", "item", item_name, "--session", self.session_key], + capture_output=True, + text=True, + env=self.env, + ) + + if result.returncode != 0: + raise CredentialNotFoundError(f"Credential not found: '{item_name}'") + + # Parse the JSON response returned in stdout + try: + item = json.loads(result.stdout) + except json.JSONDecodeError as e: + logger.error("Failed to parse Bitwarden response: %s", str(e)) + raise BitwardenCLIError(f"Invalid JSON response from Bitwarden: {e}") + + return item + + def logout(self) -> None: + """Log out from Bitwarden and invalidate the session. + + This method ends the current session and clears the session key. + """ + logger.info("Logging out from Bitwarden") + + # Execute logout command + result = subprocess.run( + [self.bw_path, "logout"], + capture_output=True, + text=True, + env=self.env, + ) + + if result.returncode != 0: + error_msg = result.stderr.strip() if result.stderr else "Unknown error" + logger.error("Error during logout: %s", error_msg) + + # Clear session key for security + self.session_key = None + + logger.info("Successfully logged out from Bitwarden") diff --git a/grimoirelab_toolkit/credential_manager/exceptions.py b/grimoirelab_toolkit/credential_manager/exceptions.py new file mode 100644 index 0000000..c9fd26c --- /dev/null +++ b/grimoirelab_toolkit/credential_manager/exceptions.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) Grimoirelab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +"""Custom exceptions for the credential manager module.""" + +__all__ = [ + "CredentialManagerError", + "InvalidCredentialsError", + "CredentialNotFoundError", + "BitwardenCLIError", +] + + +class CredentialManagerError(Exception): + """Base exception for all credential manager errors.""" + + pass + + +class InvalidCredentialsError(CredentialManagerError): + """Raised when invalid credentials are provided.""" + + pass + + +class CredentialNotFoundError(CredentialManagerError): + """Raised when a specific credential is not found in a secret.""" + + pass + + +class BitwardenCLIError(CredentialManagerError): + """Raised for Bitwarden CLI specific errors.""" + + pass diff --git a/releases/unreleased/credential-manager.yaml b/releases/unreleased/credential-manager.yaml new file mode 100644 index 0000000..8351b29 --- /dev/null +++ b/releases/unreleased/credential-manager.yaml @@ -0,0 +1,9 @@ +title: 'Add Bitwarden credential manager' +category: added +author: Alberto Ferrer Sánchez +issue: +notes: > + Implement automated credential management system with Bitwarden + integration. This allows the user to retrieve secrets for said + service without the need to hardcode them in config files or + having to write them manually. diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_bw_manager.py b/tests/test_bw_manager.py new file mode 100644 index 0000000..f3ff18c --- /dev/null +++ b/tests/test_bw_manager.py @@ -0,0 +1,265 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) Grimoirelab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: +# Alberto Ferrer Sánchez (alberefe@gmail.com) +# + +import unittest +from unittest.mock import patch, MagicMock, call + +from grimoirelab_toolkit.credential_manager.bw_manager import BitwardenManager +from grimoirelab_toolkit.credential_manager.exceptions import ( + InvalidCredentialsError, + BitwardenCLIError, + CredentialNotFoundError, +) + + +class TestBitwardenManager(unittest.TestCase): + """Tests for BitwardenManager class.""" + + def setUp(self): + """Set up common test fixtures.""" + + self.client_id = "test_client_id" + self.client_secret = "test_client_secret" + self.master_password = "test_master_password" + self.session_key = "test_session_key" + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_initialization_success(self, mock_which): + """Test successful initialization with valid credentials.""" + + mock_which.return_value = "/usr/bin/bw" + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + + self.assertEqual(manager.client_id, self.client_id) + self.assertEqual(manager.client_secret, self.client_secret) + self.assertEqual(manager.master_password, self.master_password) + self.assertIsNone(manager.session_key) + self.assertEqual(manager.bw_path, "/usr/bin/bw") + self.assertIn("BW_CLIENTID", manager.env) + self.assertIn("BW_CLIENTSECRET", manager.env) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_initialization_bw_not_found(self, mock_which): + """Test initialization fails when bw CLI is not found.""" + + mock_which.return_value = None + + with self.assertRaises(BitwardenCLIError) as context: + BitwardenManager(self.client_id, self.client_secret, self.master_password) + + self.assertIn("Bitwarden CLI (bw) not found in PATH", str(context.exception)) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_login_success(self, mock_which, mock_run): + """Test successful login and unlock.""" + + mock_which.return_value = "/usr/bin/bw" + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="Logged in!", stderr=""), # login + MagicMock(returncode=0, stdout="test_session_key\n", stderr=""), # unlock + ] + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + session_key = manager.login() + + self.assertEqual(session_key, "test_session_key") + self.assertEqual(manager.session_key, "test_session_key") + self.assertEqual(mock_run.call_count, 2) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_login_failure(self, mock_which, mock_run): + """Test login failure with invalid credentials.""" + + mock_which.return_value = "/usr/bin/bw" + mock_run.return_value = MagicMock(returncode=1, stderr="Invalid credentials") + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + + with self.assertRaises(InvalidCredentialsError): + manager.login() + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_unlock_failure(self, mock_which, mock_run): + """Test unlock failure after successful login.""" + + mock_which.return_value = "/usr/bin/bw" + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="Logged in!", stderr=""), # login + MagicMock(returncode=1, stderr="Unlock failed", stdout=""), # unlock + ] + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + + with self.assertRaises(BitwardenCLIError) as context: + manager.login() + + self.assertIn("Failed to unlock vault", str(context.exception)) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_get_secret_success(self, mock_which, mock_run): + """Test successful secret retrieval.""" + + mock_which.return_value = "/usr/bin/bw" + secret_result = MagicMock( + returncode=0, stdout='{"name":"github","login":{"password":"secret123"}}' + ) + mock_run.return_value = secret_result + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + manager.session_key = self.session_key + result = manager.get_secret("github") + + # Now returns a parsed dict, not subprocess result + self.assertIsInstance(result, dict) + self.assertEqual(result["name"], "github") + self.assertEqual(result["login"]["password"], "secret123") + mock_run.assert_called_once() + + # Verify the get_secret call includes session key + call_args = mock_run.call_args + self.assertEqual( + call_args[0][0], + ["/usr/bin/bw", "get", "item", "github", "--session", self.session_key], + ) + self.assertTrue(call_args[1]["capture_output"]) + self.assertTrue(call_args[1]["text"]) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_get_secret_returns_parsed_dict(self, mock_which, mock_run): + """Test that get_secret returns parsed dict from JSON response.""" + + mock_which.return_value = "/usr/bin/bw" + secret_result = MagicMock(returncode=0, stdout='{"data":"value"}', stderr="") + mock_run.return_value = secret_result + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + manager.session_key = self.session_key + result = manager.get_secret("my_item") + + # The method returns a parsed dict, not subprocess result + self.assertIsInstance(result, dict) + self.assertEqual(result["data"], "value") + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_get_secret_not_found(self, mock_which, mock_run): + """Test get_secret raises error when item not found.""" + + mock_which.return_value = "/usr/bin/bw" + secret_result = MagicMock(returncode=1, stderr="Not found") + mock_run.return_value = secret_result + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + manager.session_key = self.session_key + + with self.assertRaises(CredentialNotFoundError) as context: + manager.get_secret("nonexistent") + + self.assertIn("Credential not found", str(context.exception)) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_get_secret_invalid_json(self, mock_which, mock_run): + """Test get_secret raises error when response is not valid JSON.""" + + mock_which.return_value = "/usr/bin/bw" + secret_result = MagicMock(returncode=0, stdout="not valid json") + mock_run.return_value = secret_result + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + manager.session_key = self.session_key + + with self.assertRaises(BitwardenCLIError) as context: + manager.get_secret("github") + + self.assertIn("Invalid JSON response", str(context.exception)) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_logout_success(self, mock_which, mock_run): + """Test successful logout clears session data.""" + + mock_which.return_value = "/usr/bin/bw" + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="Logged in!"), # login + MagicMock(returncode=0, stdout="test_session_key"), # unlock + MagicMock(returncode=0, stdout="You have logged out."), # logout + ] + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + manager.login() + + self.assertEqual(manager.session_key, "test_session_key") + + manager.logout() + + self.assertIsNone(manager.session_key) + self.assertEqual(mock_run.call_count, 3) + + # Verify logout was called + logout_call = call( + ["/usr/bin/bw", "logout"], capture_output=True, text=True, env=manager.env + ) + self.assertIn(logout_call, mock_run.call_args_list) + + @patch("grimoirelab_toolkit.credential_manager.bw_manager.subprocess.run") + @patch("grimoirelab_toolkit.credential_manager.bw_manager.shutil.which") + def test_logout_failure_still_clears_data(self, mock_which, mock_run): + """Test logout still clears session data even when command fails.""" + + mock_which.return_value = "/usr/bin/bw" + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="Logged in!"), # login + MagicMock(returncode=0, stdout="test_session_key"), # unlock + MagicMock(returncode=1, stderr="Logout failed"), # logout + ] + + manager = BitwardenManager( + self.client_id, self.client_secret, self.master_password + ) + manager.login() + manager.logout() + + self.assertIsNone(manager.session_key)