1
1
from __future__ import annotations
2
2
3
3
import re
4
+ from typing import List
4
5
5
6
from typing_extensions import TypedDict , override
6
7
7
8
from pyinfra .api import FactBase
8
9
9
10
from .gpg import GpgFactBase
10
- from .util import make_cat_files_command
11
11
12
12
13
13
def noninteractive_apt (command : str , force = False ):
@@ -60,44 +60,176 @@ def parse_apt_repo(name):
60
60
}
61
61
62
62
63
- class AptSources (FactBase ):
63
+ def parse_deb822_stanza (lines : list [str ]):
64
+ """Parse a deb822 style repository stanza.
65
+
66
+ deb822 sources are key/value pairs separated by blank lines, eg::
67
+
68
+ Types: deb
69
+ URIs: http://deb.debian.org/debian
70
+ Suites: bookworm
71
+ Components: main contrib
72
+ Architectures: amd64
73
+ Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg
74
+
75
+ Returns a list of dicts matching the legacy ``parse_apt_repo`` output so the
76
+ rest of pyinfra can remain backwards compatible. A stanza may define
77
+ multiple types/URIs/suites which we expand into individual repo dicts.
64
78
"""
65
- Returns a list of installed apt sources:
66
79
67
- .. code:: python
80
+ if not lines :
81
+ return []
82
+
83
+ data : dict [str , str ] = {}
84
+ for line in lines :
85
+ if not line or line .startswith ("#" ):
86
+ continue
87
+ # Field-Name: value
88
+ try :
89
+ key , value = line .split (":" , 1 )
90
+ except ValueError : # malformed line
91
+ continue
92
+ data [key .strip ()] = value .strip ()
93
+
94
+ required = ("Types" , "URIs" , "Suites" )
95
+ if not all (field in data for field in required ): # not a valid stanza
96
+ return []
97
+
98
+ types = data .get ("Types" , "" ).split ()
99
+ uris = data .get ("URIs" , "" ).split ()
100
+ suites = data .get ("Suites" , "" ).split ()
101
+ components = data .get ("Components" , "" ).split ()
102
+
103
+ # Map deb822 specific fields to legacy option names
104
+ options : dict [str , object ] = {}
105
+ if architectures := data .get ("Architectures" ):
106
+ archs = architectures .split ()
107
+ if archs :
108
+ options ["arch" ] = archs if len (archs ) > 1 else archs [0 ]
109
+ if signed_by := data .get ("Signed-By" ):
110
+ signed = signed_by .split ()
111
+ options ["signed-by" ] = signed if len (signed ) > 1 else signed [0 ]
112
+ if trusted := data .get ("Trusted" ):
113
+ options ["trusted" ] = trusted .lower ()
114
+
115
+ repos = []
116
+ # Produce combinations – in most real-world cases these will each be one.
117
+ for _type in types or ["deb" ]:
118
+ for uri in uris :
119
+ for suite in suites :
120
+ repos .append (
121
+ {
122
+ "options" : dict (options ), # copy per entry
123
+ "type" : _type ,
124
+ "url" : uri ,
125
+ "distribution" : suite ,
126
+ "components" : components ,
127
+ }
128
+ )
129
+ return repos
130
+
131
+
132
+ def parse_apt_list_file (lines : List [str ]):
133
+ """Parse legacy .list style apt source file.
134
+
135
+ Each non-comment, non-empty line is a discrete repository definition in the
136
+ traditional ``deb http://... suite components`` syntax.
137
+ Returns a list of repo dicts (may be empty).
138
+ """
139
+ repos = []
140
+ for raw in lines :
141
+ line = raw .strip ()
142
+ if not line or line .startswith ("#" ):
143
+ continue
144
+ repo = parse_apt_repo (line )
145
+ if repo :
146
+ repos .append (repo )
147
+ return repos
148
+
149
+
150
+ def parse_deb822_sources_file (lines : List [str ]):
151
+ """Parse a full deb822 ``.sources`` file.
152
+
153
+ Splits on blank lines into stanzas and uses ``parse_deb822_stanza`` for each
154
+ stanza. Returns a combined list of repo dicts for all stanzas.
155
+ """
156
+ repos = []
157
+ stanza : List [str ] = []
158
+ for raw in lines + ["" ]: # sentinel blank line to flush last stanza
159
+ line = raw .rstrip ("\n " )
160
+ if line .strip () == "" :
161
+ if stanza :
162
+ repos .extend (parse_deb822_stanza (stanza ))
163
+ stanza = []
164
+ continue
165
+ stanza .append (line )
166
+ return repos
68
167
69
- [
70
- {
71
- "type": "deb",
72
- "url": "http://archive.ubuntu.org",
73
- "distribution": "trusty",
74
- "components", ["main", "multiverse"],
75
- },
76
- ]
168
+
169
+ class AptSources (FactBase ):
170
+ """Returns a list of installed apt sources (legacy .list + deb822 .sources).
171
+
172
+ Backwards compatible with historical output: a flat list of dicts:
173
+
174
+ {
175
+ "type": "deb",
176
+ "url": "http://archive.ubuntu.org",
177
+ "distribution": "bookworm",
178
+ "components": ["main", "contrib"],
179
+ "options": { ... },
180
+ }
77
181
"""
78
182
79
183
@override
80
184
def command (self ) -> str :
81
- return make_cat_files_command (
82
- "/etc/apt/sources.list" ,
83
- "/etc/apt/sources.list.d/*.list" ,
185
+ # We emit file boundary markers so the parser can select the correct
186
+ # parsing function based on filename extension.
187
+ return (
188
+ "sh -c '"
189
+ "for f in "
190
+ "/etc/apt/sources.list "
191
+ "/etc/apt/sources.list.d/*.list "
192
+ "/etc/apt/sources.list.d/*.sources; do "
193
+ '[ -e "$f" ] || continue; '
194
+ 'echo "##FILE $f"; '
195
+ 'cat "$f"; '
196
+ "echo; "
197
+ "done'"
84
198
)
85
199
86
200
@override
87
201
def requires_command (self ) -> str :
88
- return "apt" # if apt installed, above should exist
202
+ return "apt"
89
203
90
204
default = list
91
205
92
206
@override
93
- def process (self , output ):
94
- repos = []
95
-
96
- for line in output :
97
- repo = parse_apt_repo (line )
98
- if repo :
99
- repos .append (repo )
100
-
207
+ def process (self , output ): # type: ignore[override]
208
+ repos : list = []
209
+ current_file : str | None = None
210
+ buffer : list [str ] = []
211
+
212
+ def flush ():
213
+ nonlocal buffer , current_file , repos
214
+ if current_file is None or not buffer :
215
+ buffer = []
216
+ return
217
+ if current_file .endswith (".sources" ):
218
+ repos .extend (parse_deb822_sources_file (buffer ))
219
+ else : # treat anything else as legacy list syntax
220
+ repos .extend (parse_apt_list_file (buffer ))
221
+ buffer = []
222
+
223
+ for raw_line in output :
224
+ if raw_line .startswith ("##FILE " ):
225
+ # New file marker
226
+ flush ()
227
+ current_file = raw_line .split (" " , 1 )[1 ].strip ()
228
+ continue
229
+ buffer .append (raw_line )
230
+
231
+ # Flush last file
232
+ flush ()
101
233
return repos
102
234
103
235
@@ -115,14 +247,30 @@ class AptKeys(GpgFactBase):
115
247
}
116
248
"""
117
249
118
- # This requires both apt-key *and* apt-key itself requires gpg
119
250
@override
120
251
def command (self ) -> str :
121
- return "! command -v gpg || apt-key list --with-colons"
122
-
123
- @override
124
- def requires_command (self ) -> str :
125
- return "apt-key"
252
+ # Prefer not to use deprecated apt-key even if present. Iterate over keyrings
253
+ # directly. This maintains backwards compatibility of output with the
254
+ # previous implementation which fell back to this method.
255
+ return (
256
+ "for f in "
257
+ " /etc/apt/trusted.gpg "
258
+ " /etc/apt/trusted.gpg.d/*.gpg /etc/apt/trusted.gpg.d/*.asc "
259
+ " /etc/apt/keyrings/*.gpg /etc/apt/keyrings/*.asc "
260
+ " /usr/share/keyrings/*.gpg /usr/share/keyrings/*.asc "
261
+ "; do "
262
+ ' [ -e "$f" ] || continue; '
263
+ ' case "$f" in '
264
+ " *.asc) "
265
+ ' gpg --batch --show-keys --with-colons --keyid-format LONG "$f" '
266
+ " ;; "
267
+ " *) "
268
+ ' gpg --batch --no-default-keyring --keyring "$f" '
269
+ " --list-keys --with-colons --keyid-format LONG "
270
+ " ;; "
271
+ " esac; "
272
+ "done"
273
+ )
126
274
127
275
128
276
class AptSimulationDict (TypedDict ):
0 commit comments