diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 94f5b332319ebc..173b7ab2cf9280 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -2,7 +2,6 @@ import os import pytest import random -import unittest # noqa: TID251 from collections import defaultdict, Counter import hypothesis.strategies as st from hypothesis import Phase, given, settings @@ -60,7 +59,7 @@ def get_test_cases() -> list[tuple[str, CarTestRoute | None]]: @pytest.mark.slow @pytest.mark.shared_download_cache -class TestCarModelBase(unittest.TestCase): +class TestCarModelBase: platform: Platform | None = None test_route: CarTestRoute | None = None @@ -69,13 +68,12 @@ class TestCarModelBase(unittest.TestCase): elm_frame: int | None car_safety_mode_frame: int | None - @classmethod - def get_testing_data_from_logreader(cls, lr): + def get_testing_data_from_logreader(self, lr): car_fw = [] can_msgs = [] - cls.elm_frame = None - cls.car_safety_mode_frame = None - cls.fingerprint = gen_empty_fingerprint() + self.elm_frame = None + self.car_safety_mode_frame = None + self.fingerprint = gen_empty_fingerprint() alpha_long = False for msg in lr: if msg.which() == "can": @@ -84,82 +82,80 @@ def get_testing_data_from_logreader(cls, lr): if len(can_msgs) <= FRAME_FINGERPRINT: for m in msg.can: if m.src < 64: - cls.fingerprint[m.src][m.address] = len(m.dat) + self.fingerprint[m.src][m.address] = len(m.dat) elif msg.which() == "carParams": car_fw = msg.carParams.carFw if msg.carParams.openpilotLongitudinalControl: alpha_long = True - if cls.platform is None: + if self.platform is None: live_fingerprint = msg.carParams.carFingerprint - cls.platform = MIGRATION.get(live_fingerprint, live_fingerprint) + self.platform = MIGRATION.get(live_fingerprint, live_fingerprint) # Log which can frame the panda safety mode left ELM327, for CAN validity checks elif msg.which() == 'pandaStates': for ps in msg.pandaStates: - if cls.elm_frame is None and ps.safetyModel != SafetyModel.elm327: - cls.elm_frame = len(can_msgs) - if cls.car_safety_mode_frame is None and ps.safetyModel not in \ + if self.elm_frame is None and ps.safetyModel != SafetyModel.elm327: + self.elm_frame = len(can_msgs) + if self.car_safety_mode_frame is None and ps.safetyModel not in \ (SafetyModel.elm327, SafetyModel.noOutput): - cls.car_safety_mode_frame = len(can_msgs) + self.car_safety_mode_frame = len(can_msgs) elif msg.which() == 'pandaStateDEPRECATED': - if cls.elm_frame is None and msg.pandaStateDEPRECATED.safetyModel != SafetyModel.elm327: - cls.elm_frame = len(can_msgs) - if cls.car_safety_mode_frame is None and msg.pandaStateDEPRECATED.safetyModel not in \ + if self.elm_frame is None and msg.pandaStateDEPRECATED.safetyModel != SafetyModel.elm327: + self.elm_frame = len(can_msgs) + if self.car_safety_mode_frame is None and msg.pandaStateDEPRECATED.safetyModel not in \ (SafetyModel.elm327, SafetyModel.noOutput): - cls.car_safety_mode_frame = len(can_msgs) + self.car_safety_mode_frame = len(can_msgs) assert len(can_msgs) > int(50 / DT_CTRL), "no can data found" return car_fw, can_msgs, alpha_long - @classmethod - def get_testing_data(cls): + def get_testing_data(self): test_segs = (2, 1, 0) - if cls.test_route.segment is not None: - test_segs = (cls.test_route.segment,) + if self.test_route.segment is not None: + test_segs = (self.test_route.segment,) for seg in test_segs: - segment_range = f"{cls.test_route.route}/{seg}" + segment_range = f"{self.test_route.route}/{seg}" try: sources = [internal_source] if len(INTERNAL_SEG_LIST) else [openpilotci_source, comma_api_source] lr = LogReader(segment_range, sources=sources, sort_by_time=True) - return cls.get_testing_data_from_logreader(lr) + return self.get_testing_data_from_logreader(lr) except (LogsUnavailable, AssertionError): pass - raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?") + raise Exception(f"Route: {repr(self.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?") + @pytest.fixture(autouse=True, scope="class") + def setup_class(self): + if self.__class__.__name__ == 'TestCarModel' or self.__class__.__name__.endswith('Base'): + pytest.skip("Base class or TestCarModel template") - @classmethod - def setUpClass(cls): - if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'): - raise unittest.SkipTest + if self.test_route is None: + if self.platform in non_tested_cars: + print(f"Skipping tests for {self.platform}: missing route") + pytest.skip(f"Skipping tests for {self.platform}: missing route") + raise Exception(f"missing test route for {self.platform}") - if cls.test_route is None: - if cls.platform in non_tested_cars: - print(f"Skipping tests for {cls.platform}: missing route") - raise unittest.SkipTest - raise Exception(f"missing test route for {cls.platform}") - - car_fw, cls.can_msgs, alpha_long = cls.get_testing_data() + car_fw, self.can_msgs, alpha_long = self.get_testing_data() # if relay is expected to be open in the route - cls.openpilot_enabled = cls.car_safety_mode_frame is not None + self.openpilot_enabled = self.car_safety_mode_frame is not None - cls.CarInterface = interfaces[cls.platform] - cls.CP = cls.CarInterface.get_params(cls.platform, cls.fingerprint, car_fw, alpha_long, False, docs=False) - assert cls.CP - assert cls.CP.carFingerprint == cls.platform + self.CarInterface = interfaces[self.platform] + self.CP = self.CarInterface.get_params(self.platform, self.fingerprint, car_fw, alpha_long, False, docs=False) + assert self.CP + assert self.CP.carFingerprint == self.platform os.environ["COMMA_CACHE"] = DEFAULT_DOWNLOAD_CACHE_ROOT + yield + if hasattr(self, 'can_msgs'): + del self.can_msgs - @classmethod - def tearDownClass(cls): - del cls.can_msgs - - def setUp(self): + @pytest.fixture(autouse=True) + def setup_method(self): self.CI = self.CarInterface(self.CP.copy()) assert self.CI @@ -168,22 +164,22 @@ def setUp(self): cfg = self.CP.safetyConfigs[-1] set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) - self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") + assert 0 == set_status, f"failed to set safetyModel {cfg}" self.safety.init_tests() def test_car_params(self): if self.CP.dashcamOnly: - self.skipTest("no need to check carParams for dashcamOnly") + pytest.skip("no need to check carParams for dashcamOnly") # make sure car params are within a valid range - self.assertGreater(self.CP.mass, 1) + assert self.CP.mass > 1 if self.CP.steerControlType != SteerControlType.angle: tuning = self.CP.lateralTuning.which() if tuning == 'pid': - self.assertTrue(len(self.CP.lateralTuning.pid.kpV)) + assert len(self.CP.lateralTuning.pid.kpV) elif tuning == 'torque': - self.assertTrue(self.CP.lateralTuning.torque.latAccelFactor > 0) + assert self.CP.lateralTuning.torque.latAccelFactor > 0 else: raise Exception("unknown tuning") @@ -200,7 +196,7 @@ def test_car_interface(self): if i > 250: can_invalid_cnt += not CS.canValid - self.assertEqual(can_invalid_cnt, 0) + assert can_invalid_cnt == 0 def test_radar_interface(self): RI = self.CarInterface.RadarInterface(self.CP) @@ -213,11 +209,11 @@ def test_radar_interface(self): rr: structs.RadarData | None = RI.update(msg) if rr is not None and i > 50: error_cnt += rr.errors.canError - self.assertEqual(error_cnt, 0) + assert error_cnt == 0 def test_panda_safety_rx_checks(self): if self.CP.dashcamOnly: - self.skipTest("no need to check panda safety for dashcamOnly") + pytest.skip("no need to check panda safety for dashcamOnly") start_ts = self.can_msgs[0][0] @@ -239,29 +235,29 @@ def test_panda_safety_rx_checks(self): # ensure all msgs defined in the addr checks are valid self.safety.safety_tick_current_safety_config() if t > 1e6: - self.assertTrue(self.safety.safety_config_valid()) + assert self.safety.safety_config_valid() # Don't check relay malfunction on disabled routes (relay closed), # or before fingerprinting is done (elm327 and noOutput) if self.openpilot_enabled and t / 1e4 > self.car_safety_mode_frame: - self.assertFalse(self.safety.get_relay_malfunction()) + assert not self.safety.get_relay_malfunction() else: self.safety.set_relay_malfunction(False) - self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") + assert not len(failed_addrs), f"panda safety RX check failed: {failed_addrs}" # ensure RX checks go invalid after small time with no traffic self.safety.set_timer(int(t + (2*1e6))) self.safety.safety_tick_current_safety_config() - self.assertFalse(self.safety.safety_config_valid()) + assert not self.safety.safety_config_valid() def test_panda_safety_tx_cases(self, data=None): """Asserts we can tx common messages""" if self.CP.dashcamOnly: - self.skipTest("no need to check panda safety for dashcamOnly") + pytest.skip("no need to check panda safety for dashcamOnly") if self.CP.notCar: - self.skipTest("Skipping test for notCar") + pytest.skip("Skipping test for notCar") def test_car_controller(car_control): now_nanos = 0 @@ -275,10 +271,10 @@ def test_car_controller(car_control): msgs_sent += len(sendcan) for addr, dat, bus in sendcan: to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) - self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) + assert self.safety.safety_tx_hook(to_send), (addr, dat, bus) # Make sure we attempted to send messages - self.assertGreater(msgs_sent, 50) + assert msgs_sent > 50 # Make sure we can send all messages while inactive CC = structs.CarControl() @@ -306,7 +302,7 @@ def test_panda_safety_carstate_fuzzy(self, data): """ if self.CP.dashcamOnly: - self.skipTest("no need to check panda safety for dashcamOnly") + pytest.skip("no need to check panda safety for dashcamOnly") valid_addrs = [(addr, bus, size) for bus, addrs in self.fingerprint.items() for addr, size in addrs.items()] address, bus, size = data.draw(st.sampled_from(valid_addrs)) @@ -339,7 +335,7 @@ def test_panda_safety_carstate_fuzzy(self, data): continue if self.safety.get_gas_pressed_prev() != prev_panda_gas: - self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) + assert CS.gasPressed == self.safety.get_gas_pressed_prev() if self.safety.get_brake_pressed_prev() != prev_panda_brake: # TODO: remove this exception once this mismatch is resolved @@ -348,16 +344,16 @@ def test_panda_safety_carstate_fuzzy(self, data): if self.CP.carFingerprint in (HONDA.HONDA_PILOT, HONDA.HONDA_RIDGELINE) and CS.brake > 0.05: brake_pressed = False - self.assertEqual(brake_pressed, self.safety.get_brake_pressed_prev()) + assert brake_pressed == self.safety.get_brake_pressed_prev() if self.safety.get_regen_braking_prev() != prev_panda_regen_braking: - self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev()) + assert CS.regenBraking == self.safety.get_regen_braking_prev() if self.safety.get_steering_disengage_prev() != prev_panda_steering_disengage: - self.assertEqual(CS.steeringDisengage, self.safety.get_steering_disengage_prev()) + assert CS.steeringDisengage == self.safety.get_steering_disengage_prev() if self.safety.get_vehicle_moving() != prev_panda_vehicle_moving and not self.CP.notCar: - self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) + assert not CS.standstill == self.safety.get_vehicle_moving() # check vehicle speed if angle control car or available if self.safety.get_vehicle_speed_min() > 0 or self.safety.get_vehicle_speed_max() > 0: @@ -366,23 +362,23 @@ def test_panda_safety_carstate_fuzzy(self, data): if vehicle_speed_seen and (self.safety.get_vehicle_speed_min() != prev_panda_vehicle_speed_min or self.safety.get_vehicle_speed_max() != prev_panda_vehicle_speed_max): v_ego_raw = CS.vEgoRaw / self.CP.wheelSpeedFactor - self.assertFalse(v_ego_raw > (self.safety.get_vehicle_speed_max() + 1e-3) or + assert not (v_ego_raw > (self.safety.get_vehicle_speed_max() + 1e-3) or v_ego_raw < (self.safety.get_vehicle_speed_min() - 1e-3)) if not (self.CP.brand == "honda" and not (self.CP.flags & HondaFlags.BOSCH)): if self.safety.get_cruise_engaged_prev() != prev_panda_cruise_engaged: - self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev()) + assert CS.cruiseState.enabled == self.safety.get_cruise_engaged_prev() if self.CP.brand == "honda": if self.safety.get_acc_main_on() != prev_panda_acc_main_on: - self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on()) + assert CS.cruiseState.available == self.safety.get_acc_main_on() def test_panda_safety_carstate(self): """ Assert that panda safety matches openpilot's carState """ if self.CP.dashcamOnly: - self.skipTest("no need to check panda safety for dashcamOnly") + pytest.skip("no need to check panda safety for dashcamOnly") # warm up pass, as initial states may be different for can in self.can_msgs[:300]: @@ -400,7 +396,7 @@ def test_panda_safety_carstate(self): for msg in filter(lambda m: m.src < 64, can[1]): to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) ret = self.safety.safety_rx_hook(to_send) - self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}") + assert 1 == ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}" # Skip first frame so CS_prev is properly initialized if idx == 0: @@ -462,7 +458,7 @@ def test_panda_safety_carstate(self): CS_prev = CS failed_checks = {k: v for k, v in checks.items() if v > 0} - self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") + assert not len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}" @parameterized_class(('platform', 'test_route'), get_test_cases()) @@ -471,5 +467,3 @@ class TestCarModel(TestCarModelBase): pass -if __name__ == "__main__": - unittest.main() diff --git a/selfdrive/ui/mici/layouts/main.py b/selfdrive/ui/mici/layouts/main.py index b52f9ed39a06f9..dc1c897977e43f 100644 --- a/selfdrive/ui/mici/layouts/main.py +++ b/selfdrive/ui/mici/layouts/main.py @@ -73,7 +73,8 @@ def _setup_callbacks(self): device.add_interactive_timeout_callback(self._set_mode_for_started) def _scroll_to(self, layout: Widget): - layout_x = int(layout.rect.x) + # Scroll to the absolute layout_x position (negated because scroll_panel uses negative offsets for content) + layout_x = -int(layout.rect.x) self._scroller.scroll_to(layout_x, smooth=True) def _render(self, _): @@ -83,9 +84,9 @@ def _render(self, _): if not self._setup: if self._alerts_layout.active_alerts() > 0: - self._scroller.scroll_to(self._alerts_layout.rect.x) + self._scroller.scroll_to(-self._alerts_layout.rect.x) else: - self._scroller.scroll_to(self._rect.width) + self._scroller.scroll_to(-self._rect.width) self._setup = True # Render diff --git a/selfdrive/ui/mici/widgets/dialog.py b/selfdrive/ui/mici/widgets/dialog.py index abd558aa8db545..a4516120a0ebf5 100644 --- a/selfdrive/ui/mici/widgets/dialog.py +++ b/selfdrive/ui/mici/widgets/dialog.py @@ -362,7 +362,7 @@ def _on_option_selected(self, option: str): y_pos = rect_center_y - (btn.rect.y + height / 2) break - self._scroller.scroll_to(-y_pos) + self._scroller.scroll_to(y_pos) def _selected_option_changed(self): pass diff --git a/system/ui/widgets/scroller.py b/system/ui/widgets/scroller.py index f33ba941bf9190..8bd6f5ee8033f1 100644 --- a/system/ui/widgets/scroller.py +++ b/system/ui/widgets/scroller.py @@ -73,17 +73,12 @@ def __init__(self, items: list[Widget], horizontal: bool = True, snap_items: boo def set_reset_scroll_at_show(self, scroll: bool): self._reset_scroll_at_show = scroll - def scroll_to(self, pos: float, smooth: bool = False): - # already there - if abs(pos) < 1: - return - - # FIXME: the padding correction doesn't seem correct - scroll_offset = self.scroll_panel.get_offset() - pos + def scroll_to(self, target_offset: float, smooth: bool = False): + """Scrolls to an absolute offset.""" if smooth: - self._scrolling_to = scroll_offset + self._scrolling_to = float(target_offset) else: - self.scroll_panel.set_offset(scroll_offset) + self.scroll_panel.set_offset(float(target_offset)) @property def is_auto_scrolling(self) -> bool: diff --git a/tools/car_porting/test_car_model.py b/tools/car_porting/test_car_model.py index dd248f562e59e7..19c16c7b266119 100755 --- a/tools/car_porting/test_car_model.py +++ b/tools/car_porting/test_car_model.py @@ -1,23 +1,12 @@ #!/usr/bin/env python3 import argparse import sys -import unittest # noqa: TID251 from opendbc.car.tests.routes import CarTestRoute from openpilot.selfdrive.car.tests.test_models import TestCarModel from openpilot.tools.lib.route import SegmentRange -def create_test_models_suite(routes: list[CarTestRoute]) -> unittest.TestSuite: - test_suite = unittest.TestSuite() - for test_route in routes: - # create new test case and discover tests - test_case_args = {"platform": test_route.car_model, "test_route": test_route} - CarModelTestCase = type("CarModelTestCase", (TestCarModel,), test_case_args) - test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(CarModelTestCase)) - return test_suite - - if __name__ == "__main__": parser = argparse.ArgumentParser(description="Test any route against common issues with a new car port. " + "Uses selfdrive/car/tests/test_models.py") @@ -30,7 +19,12 @@ def create_test_models_suite(routes: list[CarTestRoute]) -> unittest.TestSuite: sr = SegmentRange(args.route_or_segment_name) - test_routes = [CarTestRoute(sr.route_name, args.car, segment=seg_idx) for seg_idx in sr.seg_idxs] - test_suite = create_test_models_suite(test_routes) + # Create dynamic test classes in the global scope so pytest can find them + for seg_idx in sr.seg_idxs: + test_route = CarTestRoute(sr.route_name, args.car, segment=seg_idx) + test_case_args = {"platform": test_route.car_model, "test_route": test_route} + class_name = f"TestCarModel_{test_route.car_model}_{seg_idx}".replace("|", "_").replace("-", "_").replace("/", "_") + globals()[class_name] = type(class_name, (TestCarModel,), test_case_args) - unittest.TextTestRunner().run(test_suite) + import subprocess + subprocess.run([sys.executable, "-m", "pytest", __file__, *sys.argv[2:]], check=True)