1
- import json
2
1
import logging
3
2
import os
4
3
import shlex
4
+ import subprocess
5
5
import tempfile
6
6
import uuid
7
7
8
8
from packaging import version
9
- from typing import TYPE_CHECKING
9
+ from typing import TYPE_CHECKING , Dict , List , Literal , Optional , Union , overload
10
10
11
11
import lib .commands as commands
12
12
import lib .pif as pif
@@ -40,9 +40,7 @@ class Host:
40
40
def __init__ (self , pool : 'lib.pool.Pool' , hostname_or_ip ):
41
41
self .pool = pool
42
42
self .hostname_or_ip = hostname_or_ip
43
- self .inventory = None
44
- self .uuid = None
45
- self .xo_srv_id = None
43
+ self .xo_srv_id : Optional [str ] = None
46
44
47
45
h_data = host_data (self .hostname_or_ip )
48
46
self .user = h_data ['user' ]
@@ -59,11 +57,41 @@ def __init__(self, pool: 'lib.pool.Pool', hostname_or_ip):
59
57
def __str__ (self ):
60
58
return self .hostname_or_ip
61
59
62
- def ssh (self , cmd , check = True , simple_output = True , suppress_fingerprint_warnings = True ,
63
- background = False , decode = True ):
60
+ @overload
61
+ def ssh (self , cmd : Union [str , List [str ]], * , check : bool = True , simple_output : Literal [True ] = True ,
62
+ suppress_fingerprint_warnings : bool = True , background : Literal [False ] = False ,
63
+ decode : Literal [True ] = True ) -> str :
64
+ ...
65
+
66
+ @overload
67
+ def ssh (self , cmd : Union [str , List [str ]], * , check : bool = True , simple_output : Literal [True ] = True ,
68
+ suppress_fingerprint_warnings : bool = True , background : Literal [False ] = False ,
69
+ decode : Literal [False ]) -> bytes :
70
+ ...
71
+
72
+ @overload
73
+ def ssh (self , cmd : Union [str , List [str ]], * , check : bool = True , simple_output : Literal [False ],
74
+ suppress_fingerprint_warnings : bool = True , background : Literal [False ] = False ,
75
+ decode : bool ) -> commands .SSHResult :
76
+ ...
77
+
78
+ @overload
79
+ def ssh (self , cmd : Union [str , List [str ]], * , check : bool = True , simple_output : bool = True ,
80
+ suppress_fingerprint_warnings : bool = True , background : Literal [True ],
81
+ decode : bool = True ) -> subprocess .Popen :
82
+ ...
83
+
84
+ @overload
85
+ def ssh (self , cmd : Union [str , List [str ]], * , check : bool = True , simple_output : bool = True ,
86
+ suppress_fingerprint_warnings : bool = True , background : bool = False , decode : bool = True ) \
87
+ -> Union [str , bytes , commands .SSHResult , subprocess .Popen ]:
88
+ ...
89
+
90
+ def ssh (self , cmd , * , check = True , simple_output = True , suppress_fingerprint_warnings = True ,
91
+ background = False , decode = True ) -> Union [str , bytes , commands .SSHResult , subprocess .Popen ]:
64
92
return commands .ssh (self .hostname_or_ip , cmd , check = check , simple_output = simple_output ,
65
- suppress_fingerprint_warnings = suppress_fingerprint_warnings , background = background ,
66
- decode = decode )
93
+ suppress_fingerprint_warnings = suppress_fingerprint_warnings ,
94
+ background = background , decode = decode )
67
95
68
96
def ssh_with_result (self , cmd ) -> commands .SSHResult :
69
97
# doesn't raise if the command's return is nonzero, unless there's a SSH error
@@ -75,7 +103,18 @@ def scp(self, src, dest, check=True, suppress_fingerprint_warnings=True, local_d
75
103
suppress_fingerprint_warnings = suppress_fingerprint_warnings , local_dest = local_dest
76
104
)
77
105
78
- def xe (self , action , args = {}, check = True , simple_output = True , minimal = False , force = False ):
106
+ @overload
107
+ def xe (self , action : str , args : Dict [str , Union [str , bool ]] = {}, * , check : bool = True ,
108
+ simple_output : Literal [True ] = True , minimal : bool = False , force : bool = False ) -> Union [bool , str ]:
109
+ ...
110
+
111
+ @overload
112
+ def xe (self , action : str , args : Dict [str , Union [str , bool ]] = {}, * , check : bool = True ,
113
+ simple_output : Literal [False ], minimal : bool = False , force : bool = False ) -> commands .SSHResult :
114
+ ...
115
+
116
+ def xe (self , action , args = {}, * , check = True , simple_output = True , minimal = False , force = False ) \
117
+ -> Union [bool , str , commands .SSHResult ]:
79
118
maybe_param_minimal = ['--minimal' ] if minimal else []
80
119
maybe_param_force = ['--force' ] if force else []
81
120
@@ -89,13 +128,14 @@ def stringify(key, value):
89
128
return ret .rstrip ()
90
129
return "{}={}" .format (key , shlex .quote (value ))
91
130
92
- command = ['xe' , action ] + maybe_param_minimal + maybe_param_force + \
93
- [stringify (key , value ) for key , value in args .items ()]
131
+ command : List [ str ] = ['xe' , action ] + maybe_param_minimal + maybe_param_force + \
132
+ [stringify (key , value ) for key , value in args .items ()]
94
133
result = self .ssh (
95
134
command ,
96
135
check = check ,
97
136
simple_output = simple_output
98
137
)
138
+ assert isinstance (result , (str , commands .SSHResult ))
99
139
100
140
if result == 'true' :
101
141
return True
@@ -163,7 +203,7 @@ def execute_script(self, script_contents, shebang='sh', simple_output=True):
163
203
164
204
def _get_xensource_inventory (self ):
165
205
output = self .ssh (['cat' , '/etc/xensource-inventory' ])
166
- inventory = {}
206
+ inventory : dict [ str , str ] = {}
167
207
for line in output .splitlines ():
168
208
key , raw_value = line .split ('=' )
169
209
inventory [key ] = raw_value .strip ('\' ' )
@@ -202,7 +242,7 @@ def xo_server_add(self, username, password, label=None, unregister_first=True):
202
242
'allowUnauthorized' : 'true' ,
203
243
'label' : label
204
244
}
205
- )
245
+ ). decode ()
206
246
self .xo_srv_id = xo_srv_id
207
247
208
248
def xo_server_status (self ):
@@ -216,6 +256,7 @@ def xo_server_connected(self):
216
256
return self .xo_server_status () == "connected"
217
257
218
258
def xo_server_reconnect (self ):
259
+ assert self .xo_srv_id is not None
219
260
logging .info ("Reconnect XO to host %s" % self )
220
261
xo_cli ('server.disable' , {'id' : self .xo_srv_id })
221
262
xo_cli ('server.enable' , {'id' : self .xo_srv_id })
@@ -285,6 +326,7 @@ def import_iso(self, uri, sr: SR):
285
326
},
286
327
)
287
328
329
+ download_path = None
288
330
try :
289
331
params = {'uuid' : vdi_uuid }
290
332
if '://' in uri :
@@ -293,7 +335,6 @@ def import_iso(self, uri, sr: SR):
293
335
self .ssh (f"curl -o '{ download_path } ' '{ uri } '" )
294
336
params ['filename' ] = download_path
295
337
else :
296
- download_path = None
297
338
params ['filename' ] = uri
298
339
logging .info (f"Import ISO { uri } : name { random_name } , uuid { vdi_uuid } " )
299
340
@@ -355,7 +396,7 @@ def get_last_yum_history_tid(self):
355
396
"""
356
397
try :
357
398
history_str = self .ssh (['yum' , 'history' , 'list' , '--noplugins' ])
358
- except commands .SSHCommandFailed as e :
399
+ except commands .SSHCommandFailed :
359
400
# yum history list fails if the list is empty, and it's also not possible to rollback
360
401
# to before the first transaction, so "0" would not be appropriate as last transaction.
361
402
# To workaround this, create transactions: install and remove a small package.
0 commit comments