-
Notifications
You must be signed in to change notification settings - Fork 6
Test installer's raid1 feature #311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
cf6e934
53dac04
a523d85
4e9c6ee
ccd75c3
0e2304f
cb83100
f3125fa
2a2121b
d09820f
d928126
fbce339
1487351
df05671
8d9fe61
1354df4
feebb8a
16a0db1
a76a06e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,96 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import time | ||
import xml.etree.ElementTree as ET | ||
from typing import cast, Optional, Sequence, Union | ||
|
||
from lib.commands import ssh, SSHCommandFailed | ||
from lib.common import wait_for | ||
from lib.typing import AnswerfileDict, Self, SimpleAnswerfileDict | ||
|
||
class AnswerFile: | ||
def __init__(self, kind, /): | ||
def __init__(self, kind: str, /) -> None: | ||
from data import BASE_ANSWERFILES | ||
defn = BASE_ANSWERFILES[kind] | ||
defn: SimpleAnswerfileDict = BASE_ANSWERFILES[kind] | ||
self.defn = self._normalize_structure(defn) | ||
|
||
def write_xml(self, filename): | ||
def write_xml(self, filename: str) -> None: | ||
etree = ET.ElementTree(self._defn_to_xml_et(self.defn)) | ||
etree.write(filename) | ||
|
||
# chainable mutators for lambdas | ||
|
||
def top_append(self, *defs): | ||
def top_append(self, *defs: Union[SimpleAnswerfileDict, None, ValueError]) -> Self: | ||
assert not isinstance(self.defn['CONTENTS'], str), "a toplevel CONTENTS must be a list" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't we check that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The goal of this is to be as close as possible for checking that
The code assumes a "toplevel" is rather typed
But yes, this is a bit hackish and should be better explained in a comment, or simply typed better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pydantic could help here: class Foo(TypedDict):
bar: int
baz: str
plop: NotRequired[None | str]
foo_validator = TypeAdapter(Foo)
# raise an exception when the arg doesn't match what's declared in Foo
foo_validator.validate_python(dict(bar=1, baz="blah", plop=None)) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be more pythonic to keep as much as possible at the type-hint level :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's using the type hints, just replacing the assert with the pydantic validator: from typing import TypedDict
from typing_extensions import NotRequired
from pydantic import TypeAdapter
import os
class Foo(TypedDict):
bar: int
baz: str
plop: NotRequired[None | str]
foo_validator = TypeAdapter(Foo)
v = None
if os.path.exists("/tmp"):
v = dict(bar=1, baz="blah", plop=None)
# at that point v is of type dict[str, int | str | None]
v = foo_validator.validate_python(v)
# here v is of type Foo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, that makes the runtime check more expensive (the reason why type hints are ignored by the interpreter) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, but it's still very fast compared to almost everything we're doing in the tests
|
||
for defn in defs: | ||
if defn is None: | ||
continue | ||
self.defn['CONTENTS'].append(self._normalize_structure(defn)) | ||
return self | ||
|
||
def top_setattr(self, attrs): | ||
def top_setattr(self, attrs: "dict[str, str]") -> Self: | ||
assert 'CONTENTS' not in attrs | ||
self.defn.update(attrs) | ||
self.defn.update(cast(AnswerfileDict, attrs)) | ||
return self | ||
|
||
# makes a mutable deep copy of all `contents` | ||
@staticmethod | ||
def _normalize_structure(defn): | ||
assert isinstance(defn, dict) | ||
assert 'TAG' in defn | ||
defn = dict(defn) | ||
if 'CONTENTS' not in defn: | ||
defn['CONTENTS'] = [] | ||
if not isinstance(defn['CONTENTS'], str): | ||
defn['CONTENTS'] = [AnswerFile._normalize_structure(item) | ||
for item in defn['CONTENTS']] | ||
return defn | ||
def _normalize_structure(defn: Union[SimpleAnswerfileDict, ValueError]) -> AnswerfileDict: | ||
assert isinstance(defn, dict), f"{defn!r} is not a dict" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would raise an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, the whole idea of Else yes, that makes sense. |
||
assert 'TAG' in defn, f"{defn} has no TAG" | ||
|
||
# type mutation through nearly-shallow copy | ||
new_defn: AnswerfileDict = { | ||
'TAG': defn['TAG'], | ||
'CONTENTS': [], | ||
} | ||
for key, value in defn.items(): | ||
if key == 'CONTENTS': | ||
if isinstance(value, str): | ||
new_defn['CONTENTS'] = value | ||
else: | ||
value_as_sequence: Sequence["SimpleAnswerfileDict"] | ||
if isinstance(value, Sequence): | ||
value_as_sequence = value | ||
else: | ||
value_as_sequence = ( | ||
cast(SimpleAnswerfileDict, value), | ||
) | ||
new_defn['CONTENTS'] = [ | ||
AnswerFile._normalize_structure(item) | ||
for item in value_as_sequence | ||
if item is not None | ||
] | ||
elif key == 'TAG': | ||
pass # already copied | ||
else: | ||
new_defn[key] = value # type: ignore[literal-required] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO it would have been simpler to keep the old implementation (before the previous commit that prepared this one) and just cast the result to the expected type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think 'pydantic' can validate a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The problem is, it cannot be just a cast. The original implementation gradually mutates a on: defn['CONTENTS'] = [AnswerFile._normalize_structure(item)
for item in defn['CONTENTS']] pyright:
I tried quite a few things and that quickly becomes a nightmare of casts which defeats the type-checking. Right now I don't see a better way to write Anyway, if we move those attributes into a separate dict as discussed in a thread below, the need for that override will vanish. |
||
|
||
return new_defn | ||
|
||
# convert to a ElementTree.Element tree suitable for further | ||
# modification before we serialize it to XML | ||
@staticmethod | ||
def _defn_to_xml_et(defn, /, *, parent=None): | ||
def _defn_to_xml_et(defn: AnswerfileDict, /, *, parent: Optional[ET.Element] = None) -> ET.Element: | ||
assert isinstance(defn, dict) | ||
defn = dict(defn) | ||
name = defn.pop('TAG') | ||
defn_copy = dict(defn) | ||
name = defn_copy.pop('TAG') | ||
assert isinstance(name, str) | ||
contents = defn.pop('CONTENTS', ()) | ||
contents = cast(Union[str, "list[AnswerfileDict]"], defn_copy.pop('CONTENTS', [])) | ||
assert isinstance(contents, (str, list)) | ||
element = ET.Element(name, **defn) | ||
defn_filtered = cast("dict[str, str]", defn_copy) | ||
element = ET.Element(name, {}, **defn_filtered) | ||
if parent is not None: | ||
parent.append(element) | ||
if isinstance(contents, str): | ||
element.text = contents | ||
else: | ||
for contents in contents: | ||
AnswerFile._defn_to_xml_et(contents, parent=element) | ||
for content in contents: | ||
AnswerFile._defn_to_xml_et(content, parent=element) | ||
return element | ||
|
||
def poweroff(ip): | ||
def poweroff(ip: str) -> None: | ||
try: | ||
ssh(ip, ["poweroff"]) | ||
except SSHCommandFailed as e: | ||
|
@@ -71,7 +101,7 @@ def poweroff(ip): | |
else: | ||
raise | ||
|
||
def monitor_install(*, ip): | ||
def monitor_install(*, ip: str) -> None: | ||
# wait for "yum install" phase to finish | ||
wait_for(lambda: ssh(ip, ["grep", | ||
"'DISPATCH: NEW PHASE: Completing installation'", | ||
|
@@ -95,7 +125,7 @@ def monitor_install(*, ip): | |
).returncode == 1, | ||
"Wait for installer to terminate") | ||
|
||
def monitor_upgrade(*, ip): | ||
def monitor_upgrade(*, ip: str) -> None: | ||
# wait for "yum install" phase to start | ||
wait_for(lambda: ssh(ip, ["grep", | ||
"'DISPATCH: NEW PHASE: Reading package information'", | ||
|
@@ -128,7 +158,7 @@ def monitor_upgrade(*, ip): | |
).returncode == 1, | ||
"Wait for installer to terminate") | ||
|
||
def monitor_restore(*, ip): | ||
def monitor_restore(*, ip: str) -> None: | ||
# wait for "yum install" phase to start | ||
wait_for(lambda: ssh(ip, ["grep", | ||
"'Restoring backup'", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,40 @@ | ||
from typing import TypedDict | ||
from typing_extensions import NotRequired | ||
from __future__ import annotations | ||
|
||
import sys | ||
from typing import Sequence, TypedDict, Union | ||
|
||
if sys.version_info >= (3, 11): | ||
from typing import NotRequired, Self | ||
else: | ||
from typing_extensions import NotRequired, Self | ||
|
||
IsoImageDef = TypedDict('IsoImageDef', | ||
{'path': str, | ||
'net-url': NotRequired[str], | ||
'net-only': NotRequired[bool], | ||
'unsigned': NotRequired[bool], | ||
}) | ||
|
||
|
||
# Dict-based description of an Answerfile object to be built. | ||
AnswerfileDict = TypedDict('AnswerfileDict', { | ||
'TAG': str, | ||
'CONTENTS': Union[str, "list[AnswerfileDict]"], | ||
}) | ||
|
||
# Simplified version of AnswerfileDict for user input. | ||
# - does not require to write 0 or 1 subelement as a list | ||
SimpleAnswerfileDict = TypedDict('SimpleAnswerfileDict', { | ||
'TAG': str, | ||
'CONTENTS': NotRequired[Union[str, "SimpleAnswerfileDict", Sequence["SimpleAnswerfileDict"]]], | ||
|
||
# No way to allow arbitrary fields in addition? This conveys the | ||
# field's type, but allows them in places we wouldn't want them, | ||
glehmann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# and forces every XML attribute we use to appear here. | ||
'device': NotRequired[str], | ||
'guest-storage': NotRequired[str], | ||
'mode': NotRequired[str], | ||
'name': NotRequired[str], | ||
'proto': NotRequired[str], | ||
'type': NotRequired[str], | ||
}) |
Uh oh!
There was an error while loading. Please reload this page.