Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 300 additions & 0 deletions src/pyinfra/operations/gpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
"""
Manage GPG keys and keyrings.
"""

from pathlib import PurePosixPath
from urllib.parse import urlparse

from pyinfra import host
from pyinfra.api import OperationError, operation

from . import files


@operation()
def key(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very complex function that covers many different scenarios. I think that from a public API standpoint, that is fine, because we're dealing with 1 resource: keys. But you could break up the function into separate functions that are all called from the key() function, based on the scenario.

See the docker operations for an example of how this can be done

src: str | None = None,
dest: str | None = None,
keyserver: str | None = None,
keyid: str | list[str] | None = None,
dearmor: bool = True,
mode: str = "0644",
present: bool = True,
):
"""
Install or remove GPG keys from various sources.

Args:
src: filename or URL to a key (ASCII .asc or binary .gpg)
dest: destination path for the key file (required for installation, optional for removal)
keyserver: keyserver URL for fetching keys by ID
keyid: key ID or list of key IDs (required with keyserver, optional for removal)
dearmor: whether to convert ASCII armored keys to binary format
mode: file permissions for the installed key
present: whether the key should be present (True) or absent (False)
When False: if dest is provided, removes from specific keyring;
if dest is None, removes from all APT keyrings;
if keyid is provided, removes specific key(s);
if keyid is None, removes entire keyring file(s)

Examples:
gpg.key(
name="Install Docker GPG key",
src="https://download.docker.com/linux/debian/gpg",
dest="/etc/apt/keyrings/docker.gpg",
)

gpg.key(
name="Remove old GPG key file",
dest="/etc/apt/keyrings/old-key.gpg",
present=False,
)

gpg.key(
name="Remove specific key by ID",
dest="/etc/apt/keyrings/vendor.gpg",
keyid="0xABCDEF12",
present=False,
)

gpg.key(
name="Remove key from all APT keyrings",
keyid="0xCOMPROMISED123",
present=False,
# dest=None means search in all keyrings
)

gpg.key(
name="Fetch keys from keyserver",
keyserver="hkps://keyserver.ubuntu.com",
keyid=["0xD88E42B4", "0x7EA0A9C3"],
dest="/etc/apt/keyrings/vendor.gpg",
)
"""

# Validate parameters based on operation type
if present is True:
# For installation, dest is required
if not dest:
raise OperationError("`dest` must be provided for installation")
elif present is False:
# For removal, either dest or keyid must be provided
if not dest and not keyid:
raise OperationError("For removal, either `dest` or `keyid` must be provided")

# For removal, handle different scenarios
if present is False:
if not dest and keyid:
# Remove key(s) from all APT keyrings
if isinstance(keyid, str):
keyid = [keyid]

# Define all APT keyring locations
# Not sure this is the best way to do this
# Cannot find a more generic way to get gpg keyring locations
keyring_patterns = [
"/etc/apt/trusted.gpg.d/*.gpg",
"/etc/apt/keyrings/*.gpg",
"/usr/share/keyrings/*.gpg",
]

for pattern in keyring_patterns:
for kid in keyid:
# Remove key from all matching keyrings
yield (
f'for keyring in {pattern}; do [ -e "$keyring" ] && '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it make more sense to retrieve keyrings through a fact and then only try to remove the key from keyrings that actually exist?

Or maybe even better, create a fact that lists all known keyrings and then use the GpgKeys fact for each of those keyrings to get all the keys. Then you can specifically remove the keyid if it exists

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Since there’s no global or common location for keyrings, this will allow specifying the directories to search in — for example, /etc/apt/trusted.gpg.d/ and /usr/share/keyrings/ in the APT context.

f'gpg --batch --no-default-keyring --keyring "$keyring" '
f"--delete-keys {kid} 2>/dev/null || true; done"
)

# Clean up empty keyrings
yield (
f'for keyring in {pattern}; do [ -e "$keyring" ] && '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As referenced in the other comment, if you can list all keys by keyring, then you know if keyrings are going to be empty after removing keyid from those rings and you can remove them based on that

f'! gpg --batch --no-default-keyring --keyring "$keyring" '
f'--list-keys 2>/dev/null | grep -q "pub" && rm -f "$keyring" || true; done'
)

return

elif dest and keyid:
# For APT keyring files, use a simpler approach:
# Check if keys exist in file, and if so, remove the entire file
# This is appropriate for most APT use cases
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about "most APT use cases" here. Is there a situation we might be removed a keyfile/keyring that has multiple keys in it?

if isinstance(keyid, str):
keyid = [keyid]

# Build a condition to check if any of the keys exist in the file
key_checks = []
for kid in keyid:
# Strip 0x prefix if present and handle both short and long key formats
clean_key = kid.replace("0x", "").replace("0X", "")
key_checks.append(
f'gpg --batch --no-default-keyring --keyring "{dest}" '
f'--list-keys 2>/dev/null | grep -qi "{clean_key}"'
)

condition = " || ".join(key_checks)

# If any of the keys exist in the file, remove the entire file
yield (f'if [ -f "{dest}" ] && ({condition}); then ' f'rm -f "{dest}"; fi')
return

elif dest and not keyid:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be else instead of elif. To me, that seems cleaner because it clearly signals there are only 3 possible combinations of conditions: keyid is provided and dest isn't, keyid and dest are both provided and lastly, dest is provided and keyid isn't

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it is more an "documentation" elif. To be more explicit.
What about add an else wich raise an exception if we go into an unhandle case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works for me!

# Remove entire keyring file
yield from files.file._inner(
path=dest,
present=False,
)
return

# For installation, validate required parameters
if not src and not keyserver:
raise OperationError("Either `src` or `keyserver` must be provided for installation")

if keyserver and not keyid:
raise OperationError("`keyid` must be provided with `keyserver`")

if keyid and not keyserver and not src:
raise OperationError(
"When using `keyid` for installation, either `keyserver` or `src` must be provided"
)

# For installation (present=True), ensure destination directory exists
if dest is None:
raise OperationError("dest is required for installation")

dest_dir = str(PurePosixPath(dest).parent)
yield from files.directory._inner(
path=dest_dir,
mode="0755",
present=True,
)

# --- src branch: install a key from URL or local file ---
if src:
if urlparse(src).scheme in ("http", "https"):
# Remote source: download first, then process
temp_file = host.get_temp_filename(src)

yield from files.download._inner(
src=src,
dest=temp_file,
)

# Install the key and clean up temp file
yield from _install_key_file(temp_file, dest, dearmor, mode)

# Clean up temp file using pyinfra
yield from files.file._inner(
path=temp_file,
present=False,
)
else:
# Local file: install directly
yield from _install_key_file(src, dest, dearmor, mode)

# --- keyserver branch: fetch keys by ID ---
if keyserver:
if keyid is None:
raise OperationError("`keyid` must be provided with `keyserver`")

if isinstance(keyid, str):
keyid = [keyid]

joined = " ".join(keyid)

# Create temporary GPG home directory
temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}"

yield from files.directory._inner(
path=temp_dir,
mode="0700", # GPG directories should be more restrictive
present=True,
)

# Export GNUPGHOME and fetch keys
yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501

# Export keys to destination
if dearmor:
# For binary output (dearmored), use --export without --armor
yield (f'export GNUPGHOME="{temp_dir}" && ' f'gpg --batch --export {joined} > "{dest}"')
else:
# For ASCII output, use --export --armor
yield (
f'export GNUPGHOME="{temp_dir}" && '
f'gpg --batch --export --armor {joined} > "{dest}"'
)

# Clean up temporary directory
yield from files.directory._inner(
path=temp_dir,
present=False,
)

# Set proper permissions
yield from files.file._inner(
path=dest,
mode=mode,
present=True,
)


@operation()
def dearmor(src: str, dest: str, mode: str = "0644"):
"""
Convert ASCII armored GPG key to binary format.

Args:
src: source ASCII armored key file
dest: destination binary key file
mode: file permissions for the output file

Example:
gpg.dearmor(
name="Convert key to binary",
src="/tmp/key.asc",
dest="/etc/apt/keyrings/key.gpg",
)
"""

# Ensure destination directory exists
dest_dir = str(PurePosixPath(dest).parent)
yield from files.directory._inner(
path=dest_dir,
mode="0755",
present=True,
)

yield f'gpg --batch --dearmor -o "{dest}" "{src}"'

# Set proper permissions
yield from files.file._inner(
path=dest,
mode=mode,
present=True,
)


def _install_key_file(src_file: str, dest_path: str, dearmor: bool, mode: str):
"""
Helper function to install a GPG key file, dearmoring if necessary.
"""
if dearmor:
# Check if it's an ASCII armored key and handle accordingly
# Note: Could be enhanced using GpgKey fact
yield (
f'if grep -q "BEGIN PGP PUBLIC KEY BLOCK" "{src_file}"; then '
f'gpg --batch --dearmor -o "{dest_path}" "{src_file}"; '
f'else cp "{src_file}" "{dest_path}"; fi'
)
else:
# Simple copy for binary keys or when dearmoring is disabled
yield f'cp "{src_file}" "{dest_path}"'

# Set proper permissions
yield from files.file._inner(
path=dest_path,
mode=mode,
present=True,
)
22 changes: 22 additions & 0 deletions tests/operations/gpg.dearmor/basic.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"args": [],
"kwargs": {
"src": "/tmp/test.asc",
"dest": "/tmp/test.gpg"
},
"facts": {
"files.Directory": {
"path=/tmp": {
"mode": 755
}
},
"files.File": {
"path=/tmp/test.gpg": null
}
},
"commands": [
"gpg --batch --dearmor -o \"/tmp/test.gpg\" \"/tmp/test.asc\"",
"touch /tmp/test.gpg",
"chmod 644 /tmp/test.gpg"
]
}
24 changes: 24 additions & 0 deletions tests/operations/gpg.dearmor/custom_mode.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"args": [],
"kwargs": {
"src": "/tmp/key.asc",
"dest": "/etc/apt/keyrings/key.gpg",
"mode": "0600"
},
"facts": {
"files.Directory": {
"path=/etc/apt/keyrings": null
},
"files.File": {
"path=/etc/apt/keyrings/key.gpg": null
}
},
"commands": [
"mkdir -p /etc/apt/keyrings",
"chmod 755 /etc/apt/keyrings",
"gpg --batch --dearmor -o \"/etc/apt/keyrings/key.gpg\" \"/tmp/key.asc\"",
"mkdir -p /etc/apt/keyrings",
"touch /etc/apt/keyrings/key.gpg",
"chmod 600 /etc/apt/keyrings/key.gpg"
]
}
24 changes: 24 additions & 0 deletions tests/operations/gpg.key/custom_mode.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"args": [],
"kwargs": {
"src": "/tmp/local-key.asc",
"dest": "/etc/apt/keyrings/test.gpg",
"mode": "0600"
},
"facts": {
"files.Directory": {
"path=/etc/apt/keyrings": null
},
"files.File": {
"path=/etc/apt/keyrings/test.gpg": null
}
},
"commands": [
"mkdir -p /etc/apt/keyrings",
"chmod 755 /etc/apt/keyrings",
"if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"/tmp/local-key.asc\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"/tmp/local-key.asc\"; else cp \"/tmp/local-key.asc\" \"/etc/apt/keyrings/test.gpg\"; fi",
"mkdir -p /etc/apt/keyrings",
"touch /etc/apt/keyrings/test.gpg",
"chmod 600 /etc/apt/keyrings/test.gpg"
]
}
Loading