7
7
from pyinfra .api import FactBase
8
8
9
9
from .gpg import GpgFactBase
10
- from .util import make_cat_files_command
11
10
12
11
13
12
def noninteractive_apt (command : str , force = False ):
@@ -60,44 +59,178 @@ def parse_apt_repo(name):
60
59
}
61
60
62
61
63
- class AptSources (FactBase ):
62
+ def parse_deb822_stanza (lines : list [str ]) -> list [dict [str , object ]]:
63
+ """Parse a deb822 style repository stanza.
64
+
65
+ deb822 sources are key/value pairs separated by blank lines, eg::
66
+
67
+ Types: deb
68
+ URIs: http://deb.debian.org/debian
69
+ Suites: bookworm
70
+ Components: main contrib
71
+ Architectures: amd64
72
+ Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg
73
+
74
+ Returns a list of dicts matching the legacy ``parse_apt_repo`` output so the
75
+ rest of pyinfra can remain backwards compatible. A stanza may define
76
+ multiple types/URIs/suites which we expand into individual repo dicts.
64
77
"""
65
- Returns a list of installed apt sources:
66
78
67
- .. code:: python
79
+ if not lines :
80
+ return []
81
+
82
+ data : dict [str , str ] = {}
83
+ for line in lines :
84
+ if not line or line .startswith ("#" ):
85
+ continue
86
+ # Field-Name: value
87
+ try :
88
+ key , value = line .split (":" , 1 )
89
+ except ValueError : # malformed line
90
+ continue
91
+ data [key .strip ()] = value .strip ()
92
+
93
+ required = ("Types" , "URIs" , "Suites" )
94
+ if not all (field in data for field in required ): # not a valid stanza
95
+ return []
96
+
97
+ types = data .get ("Types" , "" ).split ()
98
+ uris = data .get ("URIs" , "" ).split ()
99
+ suites = data .get ("Suites" , "" ).split ()
100
+ components = data .get ("Components" , "" ).split ()
101
+
102
+ # Map deb822 specific fields to legacy option names
103
+ options : dict [str , object ] = {}
104
+ if architectures := data .get ("Architectures" ):
105
+ archs = architectures .split ()
106
+ if archs :
107
+ options ["arch" ] = archs if len (archs ) > 1 else archs [0 ]
108
+ if signed_by := data .get ("Signed-By" ):
109
+ signed = signed_by .split ()
110
+ options ["signed-by" ] = signed if len (signed ) > 1 else signed [0 ]
111
+ if trusted := data .get ("Trusted" ):
112
+ options ["trusted" ] = trusted .lower ()
113
+
114
+ repos = []
115
+ # Produce combinations – in most real-world cases these will each be one.
116
+ for _type in types or ["deb" ]:
117
+ for uri in uris :
118
+ for suite in suites :
119
+ repos .append (
120
+ {
121
+ "options" : dict (options ), # copy per entry
122
+ "type" : _type ,
123
+ "url" : uri ,
124
+ "distribution" : suite ,
125
+ "components" : components ,
126
+ }
127
+ )
128
+ return repos
129
+
130
+
131
+ def parse_apt_list_file (lines : list [str ]) -> list [dict [str , object ]]:
132
+ """Parse legacy .list style apt source file.
133
+
134
+ Each non-comment, non-empty line is a discrete repository definition in the
135
+ traditional ``deb http://... suite components`` syntax.
136
+ Returns a list of repo dicts (may be empty).
137
+ """
138
+ repos = []
139
+ for raw in lines :
140
+ line = raw .strip ()
141
+ if not line or line .startswith ("#" ):
142
+ continue
143
+ repo = parse_apt_repo (line )
144
+ if repo :
145
+ repos .append (repo )
146
+ return repos
147
+
148
+
149
+ def parse_deb822_sources_file (
150
+ lines : list [str ],
151
+ ) -> list [dict [str , object ]]:
152
+ """Parse a full deb822 ``.sources`` file.
153
+
154
+ Splits on blank lines into stanzas and uses ``parse_deb822_stanza`` for each
155
+ stanza. Returns a combined list of repo dicts for all stanzas.
156
+ """
157
+ repos = []
158
+ stanza : list [str ] = []
159
+ for raw in lines + ["" ]: # sentinel blank line to flush last stanza
160
+ line = raw .rstrip ("\n " )
161
+ if line .strip () == "" :
162
+ if stanza :
163
+ repos .extend (parse_deb822_stanza (stanza ))
164
+ stanza = []
165
+ continue
166
+ stanza .append (line )
167
+ return repos
68
168
69
- [
70
- {
71
- "type": "deb",
72
- "url": "http://archive.ubuntu.org",
73
- "distribution": "trusty",
74
- "components", ["main", "multiverse"],
75
- },
76
- ]
169
+
170
+ class AptSources (FactBase ):
171
+ """Returns a list of installed apt sources (legacy .list + deb822 .sources).
172
+
173
+ Backwards compatible with historical output: a flat list of dicts:
174
+
175
+ {
176
+ "type": "deb",
177
+ "url": "http://archive.ubuntu.org",
178
+ "distribution": "bookworm",
179
+ "components": ["main", "contrib"],
180
+ "options": { ... },
181
+ }
77
182
"""
78
183
79
184
@override
80
185
def command (self ) -> str :
81
- return make_cat_files_command (
82
- "/etc/apt/sources.list" ,
83
- "/etc/apt/sources.list.d/*.list" ,
186
+ # We emit file boundary markers so the parser can select the correct
187
+ # parsing function based on filename extension.
188
+ return (
189
+ "sh -c '"
190
+ "for f in "
191
+ "/etc/apt/sources.list "
192
+ "/etc/apt/sources.list.d/*.list "
193
+ "/etc/apt/sources.list.d/*.sources; do "
194
+ '[ -e "$f" ] || continue; '
195
+ 'echo "##FILE $f"; '
196
+ 'cat "$f"; '
197
+ "echo; "
198
+ "done'"
84
199
)
85
200
86
201
@override
87
202
def requires_command (self ) -> str :
88
- return "apt" # if apt installed, above should exist
203
+ return "apt"
89
204
90
205
default = list
91
206
92
207
@override
93
- def process (self , output ):
94
- repos = []
95
-
96
- for line in output :
97
- repo = parse_apt_repo (line )
98
- if repo :
99
- repos .append (repo )
100
-
208
+ def process (self , output ): # type: ignore[override]
209
+ repos : list = []
210
+ current_file : str | None = None
211
+ buffer : list [str ] = []
212
+
213
+ def flush ():
214
+ nonlocal buffer , current_file , repos
215
+ if current_file is None or not buffer :
216
+ buffer = []
217
+ return
218
+ if current_file .endswith (".sources" ):
219
+ repos .extend (parse_deb822_sources_file (buffer ))
220
+ else : # treat anything else as legacy list syntax
221
+ repos .extend (parse_apt_list_file (buffer ))
222
+ buffer = []
223
+
224
+ for raw_line in output :
225
+ if raw_line .startswith ("##FILE " ):
226
+ # New file marker
227
+ flush ()
228
+ current_file = raw_line .split (" " , 1 )[1 ].strip ()
229
+ continue
230
+ buffer .append (raw_line )
231
+
232
+ # Flush last file
233
+ flush ()
101
234
return repos
102
235
103
236
@@ -115,14 +248,30 @@ class AptKeys(GpgFactBase):
115
248
}
116
249
"""
117
250
118
- # This requires both apt-key *and* apt-key itself requires gpg
119
251
@override
120
252
def command (self ) -> str :
121
- return "! command -v gpg || apt-key list --with-colons"
122
-
123
- @override
124
- def requires_command (self ) -> str :
125
- return "apt-key"
253
+ # Prefer not to use deprecated apt-key even if present. Iterate over keyrings
254
+ # directly. This maintains backwards compatibility of output with the
255
+ # previous implementation which fell back to this method.
256
+ return (
257
+ "for f in "
258
+ " /etc/apt/trusted.gpg "
259
+ " /etc/apt/trusted.gpg.d/*.gpg /etc/apt/trusted.gpg.d/*.asc "
260
+ " /etc/apt/keyrings/*.gpg /etc/apt/keyrings/*.asc "
261
+ " /usr/share/keyrings/*.gpg /usr/share/keyrings/*.asc "
262
+ "; do "
263
+ ' [ -e "$f" ] || continue; '
264
+ ' case "$f" in '
265
+ " *.asc) "
266
+ ' gpg --batch --show-keys --with-colons --keyid-format LONG "$f" '
267
+ " ;; "
268
+ " *) "
269
+ ' gpg --batch --no-default-keyring --keyring "$f" '
270
+ " --list-keys --with-colons --keyid-format LONG "
271
+ " ;; "
272
+ " esac; "
273
+ "done"
274
+ )
126
275
127
276
128
277
class AptSimulationDict (TypedDict ):
0 commit comments