Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 107 additions & 21 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.markup import escape
from textual.reactive import reactive
from textual.widgets import DataTable, Footer, Header, Input, Tree
from textual.screen import ModalScreen
from textual.widgets import Footer, Header, Input, Pretty, Static, Tree

from secrets_manager.models.gcp_projects import GCPProject
from secrets_manager.utils.gcp import get_secret_versions, list_secrets, search_gcp_projects
from secrets_manager.utils.helpers import format_error_message, sanitize_project_id_search
from secrets_manager.utils.gcp import (
get_secret_version_value,
get_secret_versions,
list_secrets,
search_gcp_projects,
)
from secrets_manager.utils.helpers import (
format_error_message,
sanitize_project_id_search,
sanitize_secrets,
)


class SecretsManager(App):
CSS_PATH = "secrets_manager.tcss"
BINDINGS = [
Binding("ctrl+q", "quit", "Quit"),
Binding("p", "secret_preview", "Secret Preview"),
]

search_query = reactive("")
Expand All @@ -36,14 +48,10 @@ def compose(self) -> ComposeResult:
with Horizontal(id="main-container"):
yield Tree("Projects", id="projects-tree")
with Vertical(id="secrets-container"):
yield DataTable()
yield Tree("Secrets", id="secrets-tree")

yield Footer()

def on_mount(self) -> None:
"""Set up the initial state when the app starts."""
self.query_one(DataTable).add_columns("Name", "Latest Version", "State", "Create Time")

def on_input_changed(self, event: Input.Changed) -> None:
"""Update the search query reactive property."""
if event.input.id == "project-input-search":
Expand All @@ -59,7 +67,7 @@ def watch_search_query(self, search_query: str) -> None:
def watch_current_project(self, project: "GCPProject") -> None:
"""React to changes in a selected project."""
if project is not None:
self._load_secrets()
self._list_secrets()

@work(thread=True)
def _do_search(self, search_term: str) -> None:
Expand Down Expand Up @@ -98,25 +106,36 @@ def on_tree_node_selected(self, event: Tree.NodeSelected) -> None:
self.current_project = event.node.data

@work(thread=True)
def _load_secrets(self) -> None:
"""Load secrets for the selected project."""
table = self.query_one(DataTable)
table.clear()
def _list_secrets(self) -> None:
"""List secrets for the selected project."""
tree = self.query_one("#secrets-tree", Tree)
tree.clear()
tree.root.label = "Secrets"

if self.current_project:
try:
secrets = list_secrets(gcp_project=self.current_project)

for secret in secrets:
secret_name = secret.name.split("/")[-1]
create_time = secret.create_time.strftime("%Y-%m-%d %H:%M:%S")
secret_versions = get_secret_versions(secret)

# First secret in list is always the latest secret
latest_version = secret_versions[0]
latest_version_number = latest_version.name.split("/")[-1]
table.add_row(
secret_name, latest_version_number, latest_version.state.name, create_time
)
# Create a node for each secret
secret_node = tree.root.add(secret_name, data={"secret_name": secret.name})

# Add versions as children
secret_versions = get_secret_versions(secret)
for version in secret_versions:
version_number = version.name.split("/")[-1]
secret_node.add_leaf(
f"Version {version_number} - {version.state.name}",
data={
"secret_name": secret.name,
"version": version_number,
"state": version.state.name,
},
)

tree.root.expand()

except GoogleAPICallError as e:
self.notify(
Expand All @@ -130,6 +149,73 @@ def _load_secrets(self) -> None:
markup=False,
)

def action_secret_preview(self):
tree = self.query_one("#secrets-tree", Tree)
if tree.cursor_node:
data = tree.cursor_node.data
if tree.cursor_node.parent == tree.root:
# Get latest version if the cursor is on a secret node
secret_name = f"{data['secret_name']}/versions/latest"
else:
# Get specific version if the cursor is on a version node
secret_name = f"{data['secret_name']}/versions/{data['version']}"
self.push_screen(SecretPreview(secret_name))


class SecretPreview(ModalScreen):
def __init__(self, secret_name: str) -> None:
"""Initialize the modal screen with the secret value.

Args:
secret_name: The secret value to display
"""
super().__init__()
self.secret_name = secret_name

BINDINGS = [
Binding("escape", "dismiss", "Close"),
]

def compose(self) -> ComposeResult:
"""Compose the modal with a Pretty widget to display the secret."""
with Vertical(classes="preview-container"):
# Extract the actual secret name and version from the full path
parts = self.secret_name.split("/")
secret_name = parts[-3]
version = parts[-1]

yield Static(
f"Secret: [b]{secret_name}[/b]\nVersion: [b]{version}[/b]", classes="secret-header"
)
yield Pretty({}, id="pretty-preview")

def on_mount(self) -> None:
self._get_secret(self.secret_name)

def action_dismiss(self) -> None:
"""Handle the dismiss action to close the modal."""
self.app.pop_screen()

@work(thread=True)
def _get_secret(self, secret_name) -> None:
try:
secret_value = get_secret_version_value(secret_name)
secret_to_preview = sanitize_secrets(secret_value)
self.query_one(Pretty).update(secret_to_preview)
except GoogleAPICallError as e:
self.notify(
f"[b]Failed to preview secret: {e.code} {e.reason}[/b]\n[d]{escape(format_error_message(str(e.message)))}[/d]",
severity="error",
)
self.action_dismiss()
except Exception as e:
self.notify(
f"Failed to preview secret: {format_error_message(str(e), 200)}",
severity="error",
markup=False,
)
self.action_dismiss()


if __name__ == "__main__":
app = SecretsManager()
Expand Down
79 changes: 46 additions & 33 deletions secrets_manager.tcss
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
Input {
dock: top;
width: 100%;
height: 3;
margin: 1;
border: solid $accent;
}

#main-container {
height: 100%;
}

#projects-tree {
width: 30%;
border: solid $accent;
}

#secrets-container {
width: 70%;
}

.json-view {
background: $surface;
color: $text;
height: 100%;
padding: 1;
overflow-y: auto;
}

DataTable {
height: 100%;
border: solid $accent;
}
Input {
dock: top;
width: 100%;
height: 3;
margin: 1;
border: solid $accent;
}

#main-container {
height: 100%;
}

#projects-tree {
width: 30%;
border: solid $accent;
}

#secrets-container {
width: 70%;
}

#secrets-tree {
height: 100%;
border: solid $accent;
}

.preview-container {
background: $surface;
margin: 2 4;
height: 100%;
overflow-y: auto;
}

#pretty-preview {
padding: 1;
}

SecretPreview {
align: center middle;
}

.secret-header {
padding: 1;
border-bottom: solid $accent;
text-align: center;
}
7 changes: 3 additions & 4 deletions secrets_manager/utils/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,20 @@ def get_secret_versions(
return versions


def get_secret_version_value(secret_version: secretmanager.SecretVersion) -> dict:
def get_secret_version_value(secret_id: str) -> dict:
"""
Get the value of a specific secret version.

Args:
secret_version (SecretVersion): Secret version instance to retrieve values
secret_id (str): Secret identifier in the format "projects/{project_id}/secrets/{secret_id}/versions/{version_id}"

Returns:
str: The secret value
"""
client = secretmanager.SecretManagerServiceClient()
name = secret_version.name

# Access the secret version
response = client.access_secret_version(request={"name": name})
response = client.access_secret_version(request={"name": secret_id})

# Return the decoded payload
return json.loads(response.payload.data)
Expand Down
39 changes: 34 additions & 5 deletions secrets_manager/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,37 @@ def sanitize_project_id_search(search_term: str) -> str:
return sanitized


if __name__ == "__main__":
print(sanitize_project_id_search(""))
print(sanitize_project_id_search("-"))
print(sanitize_project_id_search("--"))
print(sanitize_project_id_search(" "))
def sanitize_secrets(data: dict) -> dict:
"""Sanitize all values in a dictionary while preserving keys.

Args:
data: Dictionary containing secret data

Returns:
Dict with all values sanitized but keys preserved
"""

def _mask_value(value: str | int | float | bool) -> str:
"""Mask any value while preserving some structure."""
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, int | float):
Copy link

Copilot AI May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using 'int | float' in isinstance() might cause unexpected behavior. Consider changing this to 'if isinstance(value, (int, float)):' to ensure proper type checking.

Suggested change
if isinstance(value, int | float):
if isinstance(value, (int, float)):

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

@mikaeld mikaeld May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong

return "**NUMBER**"

str_value = str(value)
if len(str_value) <= 4:
return "*" * len(str_value)
return f"{str_value[:2]}{'*' * (len(str_value) - 4)}{str_value[-2:]}"

def _sanitize_recursive(obj):
"""Recursively sanitize values in nested structures."""
if isinstance(obj, dict):
return {k: _sanitize_recursive(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [_sanitize_recursive(item) for item in obj]
else:
return _mask_value(obj)

return _sanitize_recursive(data)
2 changes: 1 addition & 1 deletion tests/test_utils/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_get_secret_version_value(mock_secret_manager_client, mock_secret_versio
mock_response.payload.data = json.dumps({"key": "value"}).encode()
mock_client.access_secret_version.return_value = mock_response

result = get_secret_version_value(mock_secret_version)
result = get_secret_version_value(mock_secret_version.name)

assert result == {"key": "value"}
mock_client.access_secret_version.assert_called_once_with(
Expand Down