1
+ from __future__ import annotations
2
+
1
3
import logging
2
4
import time
3
5
import xml .etree .ElementTree as ET
6
+ from typing import cast , Optional , Sequence , Union
4
7
5
8
from lib .commands import ssh , SSHCommandFailed
6
9
from lib .common import wait_for
10
+ from lib .typing import AnswerfileDict , Self , SimpleAnswerfileDict
7
11
8
12
class AnswerFile :
9
- def __init__ (self , kind , / ):
13
+ def __init__ (self , kind : str , / ) -> None :
10
14
from data import BASE_ANSWERFILES
11
- defn = BASE_ANSWERFILES [kind ]
15
+ defn : SimpleAnswerfileDict = BASE_ANSWERFILES [kind ]
12
16
self .defn = self ._normalize_structure (defn )
13
17
14
- def write_xml (self , filename ) :
18
+ def write_xml (self , filename : str ) -> None :
15
19
etree = ET .ElementTree (self ._defn_to_xml_et (self .defn ))
16
20
etree .write (filename )
17
21
18
22
# chainable mutators for lambdas
19
23
20
- def top_append (self , * defs ):
24
+ def top_append (self , * defs : Union [SimpleAnswerfileDict , None , ValueError ]) -> Self :
25
+ assert not isinstance (self .defn ['CONTENTS' ], str ), "a toplevel CONTENTS must be a list"
21
26
for defn in defs :
22
27
if defn is None :
23
28
continue
24
29
self .defn ['CONTENTS' ].append (self ._normalize_structure (defn ))
25
30
return self
26
31
27
- def top_setattr (self , attrs ) :
32
+ def top_setattr (self , attrs : "dict[str, str]" ) -> Self :
28
33
assert 'CONTENTS' not in attrs
29
- self .defn .update (attrs )
34
+ self .defn .update (cast ( AnswerfileDict , attrs ) )
30
35
return self
31
36
32
37
# makes a mutable deep copy of all `contents`
33
38
@staticmethod
34
- def _normalize_structure (defn ) :
39
+ def _normalize_structure (defn : Union [ SimpleAnswerfileDict , ValueError ]) -> AnswerfileDict :
35
40
assert isinstance (defn , dict ), f"{ defn !r} is not a dict"
36
41
assert 'TAG' in defn , f"{ defn } has no TAG"
37
42
38
43
# type mutation through nearly-shallow copy
39
- new_defn = {
44
+ new_defn : AnswerfileDict = {
40
45
'TAG' : defn ['TAG' ],
41
46
'CONTENTS' : [],
42
47
}
@@ -45,29 +50,37 @@ def _normalize_structure(defn):
45
50
if isinstance (value , str ):
46
51
new_defn ['CONTENTS' ] = value
47
52
else :
53
+ value_as_sequence : Sequence ["SimpleAnswerfileDict" ]
54
+ if isinstance (value , Sequence ):
55
+ value_as_sequence = value
56
+ else :
57
+ value_as_sequence = (
58
+ cast (SimpleAnswerfileDict , value ),
59
+ )
48
60
new_defn ['CONTENTS' ] = [
49
61
AnswerFile ._normalize_structure (item )
50
- for item in value
62
+ for item in value_as_sequence
51
63
if item is not None
52
64
]
53
65
elif key == 'TAG' :
54
66
pass # already copied
55
67
else :
56
- new_defn [key ] = value
68
+ new_defn [key ] = value # type: ignore[literal-required]
57
69
58
70
return new_defn
59
71
60
72
# convert to a ElementTree.Element tree suitable for further
61
73
# modification before we serialize it to XML
62
74
@staticmethod
63
- def _defn_to_xml_et (defn , / , * , parent = None ):
75
+ def _defn_to_xml_et (defn : AnswerfileDict , / , * , parent : Optional [ ET . Element ] = None ) -> ET . Element :
64
76
assert isinstance (defn , dict )
65
- defn = dict (defn )
66
- name = defn .pop ('TAG' )
77
+ defn_copy = dict (defn )
78
+ name = defn_copy .pop ('TAG' )
67
79
assert isinstance (name , str )
68
- contents = defn .pop ('CONTENTS' , ( ))
80
+ contents = cast ( Union [ str , "list[AnswerfileDict]" ], defn_copy .pop ('CONTENTS' , [] ))
69
81
assert isinstance (contents , (str , list ))
70
- element = ET .Element (name , {}, ** defn )
82
+ defn_filtered = cast ("dict[str, str]" , defn_copy )
83
+ element = ET .Element (name , {}, ** defn_filtered )
71
84
if parent is not None :
72
85
parent .append (element )
73
86
if isinstance (contents , str ):
@@ -77,7 +90,7 @@ def _defn_to_xml_et(defn, /, *, parent=None):
77
90
AnswerFile ._defn_to_xml_et (content , parent = element )
78
91
return element
79
92
80
- def poweroff (ip ) :
93
+ def poweroff (ip : str ) -> None :
81
94
try :
82
95
ssh (ip , ["poweroff" ])
83
96
except SSHCommandFailed as e :
@@ -88,7 +101,7 @@ def poweroff(ip):
88
101
else :
89
102
raise
90
103
91
- def monitor_install (* , ip ) :
104
+ def monitor_install (* , ip : str ) -> None :
92
105
# wait for "yum install" phase to finish
93
106
wait_for (lambda : ssh (ip , ["grep" ,
94
107
"'DISPATCH: NEW PHASE: Completing installation'" ,
@@ -112,7 +125,7 @@ def monitor_install(*, ip):
112
125
).returncode == 1 ,
113
126
"Wait for installer to terminate" )
114
127
115
- def monitor_upgrade (* , ip ) :
128
+ def monitor_upgrade (* , ip : str ) -> None :
116
129
# wait for "yum install" phase to start
117
130
wait_for (lambda : ssh (ip , ["grep" ,
118
131
"'DISPATCH: NEW PHASE: Reading package information'" ,
@@ -145,7 +158,7 @@ def monitor_upgrade(*, ip):
145
158
).returncode == 1 ,
146
159
"Wait for installer to terminate" )
147
160
148
- def monitor_restore (* , ip ) :
161
+ def monitor_restore (* , ip : str ) -> None :
149
162
# wait for "yum install" phase to start
150
163
wait_for (lambda : ssh (ip , ["grep" ,
151
164
"'Restoring backup'" ,
0 commit comments