Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions subprojects/robotpy-wpilib/gen/Alert.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,15 @@ classes:
SetText:
GetText:
GetType:
inline_code: |
.def("close", [](frc::Alert &self) {
py::gil_scoped_release release;
self.Set(false);
}, py::doc("Disables the alert"))
.def("__enter__", [](frc::Alert &self) -> frc::Alert& {
return self;
})
.def("__exit__", [](frc::Alert &self, py::args args) {
py::gil_scoped_release release;
self.Set(false);
})
12 changes: 10 additions & 2 deletions subprojects/robotpy-wpilib/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
import ntcore
import wpilib
from wpilib.simulation._simulation import _resetWpilibSimulationData


@pytest.fixture
Expand All @@ -11,7 +12,15 @@ def cfg_logging(caplog):


@pytest.fixture(scope="function")
def nt(cfg_logging):
def wpilib_state():
try:
yield None
finally:
_resetWpilibSimulationData()


@pytest.fixture(scope="function")
def nt(cfg_logging, wpilib_state):
instance = ntcore.NetworkTableInstance.getDefault()
instance.startLocal()

Expand All @@ -20,4 +29,3 @@ def nt(cfg_logging):
finally:
instance.stopLocal()
instance._reset()
wpilib._wpilib._clearSmartDashboardData()
223 changes: 223 additions & 0 deletions subprojects/robotpy-wpilib/tests/test_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import typing as T

import pytest

from ntcore import NetworkTableInstance
from wpilib import Alert, SmartDashboard
from wpilib.simulation import pauseTiming, resumeTiming, stepTiming

AlertType = Alert.AlertType


@pytest.fixture(scope="function")
def group_name(nt, request):

group_name = f"AlertTest_{request.node.name}"
yield group_name

SmartDashboard.updateValues()
assert len(get_active_alerts(nt, group_name, AlertType.kError)) == 0
assert len(get_active_alerts(nt, group_name, AlertType.kWarning)) == 0
assert len(get_active_alerts(nt, group_name, AlertType.kInfo)) == 0


def get_subscriber_for_type(
nt: NetworkTableInstance, group_name: str, alert_type: AlertType
):
subtable_name = {
AlertType.kError: "errors",
AlertType.kWarning: "warnings",
AlertType.kInfo: "infos",
}.get(alert_type, "unknown")
topic = f"/SmartDashboard/{group_name}/{subtable_name}"
return nt.getStringArrayTopic(topic).subscribe([])


def get_active_alerts(
nt: NetworkTableInstance, group_name: str, alert_type: AlertType
) -> T.List[str]:
SmartDashboard.updateValues()
with get_subscriber_for_type(nt, group_name, alert_type) as sub:
return sub.get()


def is_alert_active(
nt: NetworkTableInstance, group_name: str, text: str, alert_type: AlertType
):
active_alerts = get_active_alerts(nt, group_name, alert_type)
return text in active_alerts


def assert_state(
nt: NetworkTableInstance,
group_name: str,
alert_type: AlertType,
expected_state: T.List[str],
):
assert expected_state == get_active_alerts(nt, group_name, alert_type)


def test_set_unset_single(nt, group_name):
with Alert(group_name, "one", AlertType.kError) as one:

assert not is_alert_active(nt, group_name, "one", AlertType.kError)
assert not is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)

one.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)

one.set(False)
assert not is_alert_active(nt, group_name, "one", AlertType.kError)


def test_set_unset_multiple(nt, group_name):
with (
Alert(group_name, "one", AlertType.kError) as one,
Alert(group_name, "two", AlertType.kInfo) as two,
):

assert not is_alert_active(nt, group_name, "one", AlertType.kError)
assert not is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)
assert not is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(True)
two.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)
assert is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(False)
assert not is_alert_active(nt, group_name, "one", AlertType.kError)
assert is_alert_active(nt, group_name, "two", AlertType.kInfo)


def test_set_is_idempotent(nt, group_name):
group_name = group_name
with (
Alert(group_name, "A", AlertType.kInfo) as a,
Alert(group_name, "B", AlertType.kInfo) as b,
Alert(group_name, "C", AlertType.kInfo) as c,
):

a.set(True)
b.set(True)
c.set(True)

start_state = get_active_alerts(nt, group_name, AlertType.kInfo)
assert set(start_state) == {"A", "B", "C"}

b.set(True)
assert_state(nt, group_name, AlertType.kInfo, start_state)

a.set(True)
assert_state(nt, group_name, AlertType.kInfo, start_state)


def test_close_unsets_alert(nt, group_name):
group_name = group_name
with Alert(group_name, "alert", AlertType.kWarning) as alert:
alert.set(True)
assert is_alert_active(nt, group_name, "alert", AlertType.kWarning)
assert not is_alert_active(nt, group_name, "alert", AlertType.kWarning)


def test_set_text_while_unset(nt, group_name):
group_name = group_name
with Alert(group_name, "BEFORE", AlertType.kInfo) as alert:
assert alert.getText() == "BEFORE"
alert.set(True)
assert is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
alert.set(False)
assert not is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
alert.setText("AFTER")
assert alert.getText() == "AFTER"
alert.set(True)
assert not is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
assert is_alert_active(nt, group_name, "AFTER", AlertType.kInfo)


def test_set_text_while_set(nt, group_name):
with Alert(group_name, "BEFORE", AlertType.kInfo) as alert:
assert alert.getText() == "BEFORE"
alert.set(True)
assert is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
alert.setText("AFTER")
assert alert.getText() == "AFTER"
assert not is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
assert is_alert_active(nt, group_name, "AFTER", AlertType.kInfo)


def test_set_text_does_not_affect_sort(nt, group_name):
pauseTiming()
try:
with (
Alert(group_name, "A", AlertType.kInfo) as a,
Alert(group_name, "B", AlertType.kInfo) as b,
Alert(group_name, "C", AlertType.kInfo) as c,
):

a.set(True)
stepTiming(1)
b.set(True)
stepTiming(1)
c.set(True)

expected_state = get_active_alerts(nt, group_name, AlertType.kInfo)
expected_state[expected_state.index("B")] = "AFTER"

b.setText("AFTER")
assert_state(nt, group_name, AlertType.kInfo, expected_state)
finally:
resumeTiming()


def test_sort_order(nt, group_name):
pauseTiming()
try:
with (
Alert(group_name, "A", AlertType.kInfo) as a,
Alert(group_name, "B", AlertType.kInfo) as b,
Alert(group_name, "C", AlertType.kInfo) as c,
):

a.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["A"])

stepTiming(1)
b.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["B", "A"])

stepTiming(1)
c.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["C", "B", "A"])

stepTiming(1)
c.set(False)
assert_state(nt, group_name, AlertType.kInfo, ["B", "A"])

stepTiming(1)
c.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["C", "B", "A"])

stepTiming(1)
a.set(False)
assert_state(nt, group_name, AlertType.kInfo, ["C", "B"])

stepTiming(1)
b.set(False)
assert_state(nt, group_name, AlertType.kInfo, ["C"])

stepTiming(1)
b.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["B", "C"])

stepTiming(1)
a.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["A", "B", "C"])
finally:
resumeTiming()
Loading