Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 275 additions & 30 deletions src/pyinfra/facts/apt.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,206 @@
from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Union

from typing_extensions import TypedDict, override

from pyinfra.api import FactBase

from .gpg import GpgFactBase
from .util import make_cat_files_command


@dataclass
class AptRepo:
"""Represents an APT repository configuration.

This dataclass provides type safety for APT repository definitions,
supporting both legacy .list style and modern deb822 .sources formats.

Provides dict-like access for backward compatibility while offering
full type safety for modern code.
"""

type: str # "deb" or "deb-src"
url: str # Repository URL
distribution: str # Suite/distribution name
components: list[str] # List of components (e.g., ["main", "contrib"])
options: dict[str, Union[str, list[str]]] # Repository options

# Dict-like interface for backward compatibility
def __getitem__(self, key: str):
"""Dict-like access: repo['type'] works like repo.type"""
return getattr(self, key)

def __setitem__(self, key: str, value):
"""Dict-like assignment: repo['type'] = 'deb' works like repo.type = 'deb'"""
setattr(self, key, value)

def __contains__(self, key: str) -> bool:
"""Support 'key' in repo syntax"""
return hasattr(self, key)

def get(self, key: str, default=None):
"""Dict-like get: repo.get('type', 'deb')"""
return getattr(self, key, default)

def keys(self):
"""Return dict-like keys"""
return ["type", "url", "distribution", "components", "options"]

def values(self):
"""Return dict-like values"""
return [self.type, self.url, self.distribution, self.components, self.options]

def items(self):
"""Return dict-like items"""
return [(k, getattr(self, k)) for k in self.keys()]

@override
def __eq__(self, other) -> bool:
"""Enhanced equality that works with dicts and AptRepo instances"""
if isinstance(other, dict):
return (
self.type == other.get("type")
and self.url == other.get("url")
and self.distribution == other.get("distribution")
and self.components == other.get("components")
and self.options == other.get("options")
)
elif isinstance(other, AptRepo):
return (
self.type == other.type
and self.url == other.url
and self.distribution == other.distribution
and self.components == other.components
and self.options == other.options
)
return False

def to_json(self):
"""Convert to dict for JSON serialization"""
return {
"type": self.type,
"url": self.url,
"distribution": self.distribution,
"components": self.components,
"options": self.options,
}


@dataclass
class AptSourcesFile:
"""Represents a deb822 sources file entry before expansion into individual repositories.

This preserves the original multi-value fields from deb822 format,
while AptRepo represents individual expanded repositories.
"""

types: list[str] # ["deb", "deb-src"]
uris: list[str] # ["http://deb.debian.org", "https://mirror.example.com"]
suites: list[str] # ["bookworm", "bullseye"]
components: list[str] # ["main", "contrib", "non-free"]
architectures: list[str] | None = None # ["amd64", "i386"]
signed_by: list[str] | None = None # ["/path/to/key1.gpg", "/path/to/key2.gpg"]
trusted: str | None = None # "yes"/"no"

@classmethod
def from_deb822_lines(cls, lines: list[str]) -> "AptSourcesFile | None":
"""Parse deb822 stanza lines into AptSourcesFile.

Returns None if parsing failed or repository is disabled.
"""
if not lines:
return None

data: dict[str, str] = {}
for line in lines:
if not line or line.startswith("#"):
continue
# Field-Name: value
try:
key, value = line.split(":", 1)
except ValueError: # malformed line
continue
data[key.strip()] = value.strip()

# Validate required fields
required = ("Types", "URIs", "Suites")
if not all(field in data for field in required):
return None

# Filter out disabled repositories
enabled_str = data.get("Enabled", "yes").lower()
if enabled_str != "yes":
return None

# Parse fields into appropriate types
return cls(
types=data.get("Types", "").split(),
uris=data.get("URIs", "").split(),
suites=data.get("Suites", "").split(),
components=data.get("Components", "").split(),
architectures=(
data.get("Architectures", "").split() if data.get("Architectures") else None
),
signed_by=data.get("Signed-By", "").split() if data.get("Signed-By") else None,
trusted=data.get("Trusted", "").lower() if data.get("Trusted") else None,
)

@classmethod
def parse_sources_file(cls, lines: list[str]) -> list[AptRepo]:
"""Parse a full deb822 .sources file into AptRepo instances.

Splits on blank lines into stanzas and parses each one.
Returns a combined list of AptRepo instances for all stanzas.

Args:
lines: Lines from a .sources file
"""
repos = []
stanza: list[str] = []
for raw in lines + [""]: # sentinel blank line to flush last stanza
line = raw.rstrip("\n")
if line.strip() == "":
if stanza:
sources_file = cls.from_deb822_lines(stanza)
if sources_file:
repos.extend(sources_file.expand_to_repos())
stanza = []
continue
stanza.append(line)
return repos

def expand_to_repos(self) -> list[AptRepo]:
"""Expand this sources file entry into individual AptRepo instances."""
# Build options dict in the same format as legacy parsing
options: dict[str, Union[str, list[str]]] = {}

if self.architectures:
options["arch"] = (
self.architectures if len(self.architectures) > 1 else self.architectures[0]
)
if self.signed_by:
options["signed-by"] = self.signed_by if len(self.signed_by) > 1 else self.signed_by[0]
if self.trusted:
options["trusted"] = self.trusted

repos = []
# Produce combinations – in most real-world cases these will each be one.
for repo_type in self.types:
for uri in self.uris:
for suite in self.suites:
repos.append(
AptRepo(
type=repo_type,
url=uri,
distribution=suite,
components=self.components.copy(), # copy to avoid shared reference
options=dict(options), # copy per entry
)
)
return repos


def noninteractive_apt(command: str, force=False):
Expand All @@ -32,13 +225,21 @@ def noninteractive_apt(command: str, force=False):
)


def parse_apt_repo(name):
def parse_apt_repo(name: str) -> AptRepo | None:
"""Parse a traditional apt source line into an AptRepo.

Args:
name: Apt source line (e.g., "deb [arch=amd64] http://example.com focal main")

Returns:
AptRepo instance or None if parsing failed
"""
regex = r"^(deb(?:-src)?)(?:\s+\[([^\]]+)\])?\s+([^\s]+)\s+([^\s]+)\s+([a-z-\s\d]*)$"

matches = re.match(regex, name)

if not matches:
return
return None

# Parse any options
options = {}
Expand All @@ -51,53 +252,97 @@ def parse_apt_repo(name):

options[key] = value

return {
"options": options,
"type": matches.group(1),
"url": matches.group(3),
"distribution": matches.group(4),
"components": list(matches.group(5).split()),
}
return AptRepo(
type=matches.group(1),
url=matches.group(3),
distribution=matches.group(4),
components=list(matches.group(5).split()),
options=options,
)


class AptSources(FactBase):
def parse_apt_list_file(lines: list[str]) -> list[AptRepo]:
"""Parse legacy .list style apt source file.

Each non-comment, non-empty line is a discrete repository definition in the
traditional ``deb http://... suite components`` syntax.
Returns a list of AptRepo instances.

Args:
lines: Lines from a .list file
"""
Returns a list of installed apt sources:
repos = []
for raw in lines:
line = raw.strip()
if not line or line.startswith("#"):
continue
repo = parse_apt_repo(line)
if repo:
repos.append(repo)
return repos

.. code:: python

[
{
"type": "deb",
"url": "http://archive.ubuntu.org",
"distribution": "trusty",
"components", ["main", "multiverse"],
},
]
class AptSources(FactBase):
"""Returns a list of installed apt sources (legacy .list + deb822 .sources).

Returns a list of AptRepo instances that behave like dicts for backward compatibility:

[AptRepo(type="deb", url="http://archive.ubuntu.org", ...)]

Each AptRepo can be accessed like a dict:
repo['type'] # works like repo.type
repo.get('url') # works like getattr(repo, 'url')
"""

@override
def command(self) -> str:
return make_cat_files_command(
"/etc/apt/sources.list",
"/etc/apt/sources.list.d/*.list",
# We emit file boundary markers so the parser can select the correct
# parsing function based on filename extension.
return (
"sh -c '"
"for f in "
"/etc/apt/sources.list "
"/etc/apt/sources.list.d/*.list "
"/etc/apt/sources.list.d/*.sources; do "
'[ -e "$f" ] || continue; '
'echo "##FILE $f"; '
'cat "$f"; '
"echo; "
"done'"
)

@override
def requires_command(self) -> str:
return "apt" # if apt installed, above should exist
return "apt"

default = list

@override
def process(self, output):
repos = []
def process(self, output): # type: ignore[override]
repos: list[AptRepo] = []
current_file: str | None = None
buffer: list[str] = []

def flush():
nonlocal buffer, current_file, repos
if current_file is None or not buffer:
buffer = []
return

if current_file.endswith(".sources"):
repos.extend(AptSourcesFile.parse_sources_file(buffer))
else: # .list files or /etc/apt/sources.list
repos.extend(parse_apt_list_file(buffer))
buffer = []

for line in output:
repo = parse_apt_repo(line)
if repo:
repos.append(repo)
if line.startswith("##FILE "):
flush() # flush previous file buffer
current_file = line[7:].strip() # remove "##FILE " prefix
continue
buffer.append(line)

flush() # flush the final buffer
return repos


Expand Down
Loading