|
1 | 1 | from sonic_platform_base.module_base import ModuleBase |
| 2 | +import pytest |
| 3 | +import json |
| 4 | +import os |
| 5 | +import fcntl |
| 6 | +from unittest.mock import patch, MagicMock, call |
| 7 | +from io import StringIO |
| 8 | +import shutil |
| 9 | + |
| 10 | +class MockFile: |
| 11 | + def __init__(self, data=None): |
| 12 | + self.data = data |
| 13 | + self.written_data = None |
| 14 | + self.closed = False |
| 15 | + self.fileno_called = False |
| 16 | + |
| 17 | + def __enter__(self): |
| 18 | + return self |
| 19 | + |
| 20 | + def __exit__(self, *args): |
| 21 | + self.closed = True |
| 22 | + |
| 23 | + def read(self): |
| 24 | + return self.data |
| 25 | + |
| 26 | + def write(self, data): |
| 27 | + self.written_data = data |
| 28 | + |
| 29 | + def fileno(self): |
| 30 | + self.fileno_called = True |
| 31 | + return 123 |
| 32 | + |
2 | 33 |
|
3 | 34 | class TestModuleBase: |
4 | 35 |
|
@@ -39,3 +70,152 @@ def test_sensors(self): |
39 | 70 | assert(module.get_all_current_sensors() == ["s1"]) |
40 | 71 | assert(module.get_current_sensor(0) == "s1") |
41 | 72 |
|
| 73 | + def test_pci_entry_state_db(self): |
| 74 | + module = ModuleBase() |
| 75 | + mock_connector = MagicMock() |
| 76 | + module.state_db_connector = mock_connector |
| 77 | + |
| 78 | + module.pci_entry_state_db("0000:00:00.0", "detaching") |
| 79 | + mock_connector.hset.assert_has_calls([ |
| 80 | + call("PCIE_DETACH_INFO|0000:00:00.0", "bus_info", "0000:00:00.0"), |
| 81 | + call("PCIE_DETACH_INFO|0000:00:00.0", "dpu_state", "detaching") |
| 82 | + ]) |
| 83 | + |
| 84 | + module.pci_entry_state_db("0000:00:00.0", "attaching") |
| 85 | + mock_connector.delete.assert_called_with("PCIE_DETACH_INFO|0000:00:00.0") |
| 86 | + |
| 87 | + mock_connector.hset.side_effect = Exception("DB Error") |
| 88 | + module.pci_entry_state_db("0000:00:00.0", "detaching") |
| 89 | + |
| 90 | + def test_pci_operation_lock(self): |
| 91 | + module = ModuleBase() |
| 92 | + mock_file = MockFile() |
| 93 | + |
| 94 | + with patch('builtins.open', return_value=mock_file) as mock_file_open, \ |
| 95 | + patch('fcntl.flock') as mock_flock, \ |
| 96 | + patch.object(module, 'get_name', return_value="DPU0"), \ |
| 97 | + patch('os.makedirs') as mock_makedirs: |
| 98 | + |
| 99 | + with module._pci_operation_lock(): |
| 100 | + mock_flock.assert_called_with(123, fcntl.LOCK_EX) |
| 101 | + |
| 102 | + mock_flock.assert_has_calls([ |
| 103 | + call(123, fcntl.LOCK_EX), |
| 104 | + call(123, fcntl.LOCK_UN) |
| 105 | + ]) |
| 106 | + assert mock_file.fileno_called |
| 107 | + |
| 108 | + def test_handle_pci_removal(self): |
| 109 | + module = ModuleBase() |
| 110 | + |
| 111 | + with patch.object(module, 'get_pci_bus_info', return_value=["0000:00:00.0"]), \ |
| 112 | + patch.object(module, 'pci_entry_state_db') as mock_db, \ |
| 113 | + patch.object(module, 'pci_detach', return_value=True), \ |
| 114 | + patch.object(module, '_pci_operation_lock') as mock_lock, \ |
| 115 | + patch.object(module, 'get_name', return_value="DPU0"): |
| 116 | + assert module.handle_pci_removal() is True |
| 117 | + mock_db.assert_called_with("0000:00:00.0", "detaching") |
| 118 | + mock_lock.assert_called_once() |
| 119 | + |
| 120 | + with patch.object(module, 'get_pci_bus_info', side_effect=Exception()): |
| 121 | + assert module.handle_pci_removal() is False |
| 122 | + |
| 123 | + def test_handle_pci_rescan(self): |
| 124 | + module = ModuleBase() |
| 125 | + |
| 126 | + with patch.object(module, 'get_pci_bus_info', return_value=["0000:00:00.0"]), \ |
| 127 | + patch.object(module, 'pci_entry_state_db') as mock_db, \ |
| 128 | + patch.object(module, 'pci_reattach', return_value=True), \ |
| 129 | + patch.object(module, '_pci_operation_lock') as mock_lock, \ |
| 130 | + patch.object(module, 'get_name', return_value="DPU0"): |
| 131 | + assert module.handle_pci_rescan() is True |
| 132 | + mock_db.assert_called_with("0000:00:00.0", "attaching") |
| 133 | + mock_lock.assert_called_once() |
| 134 | + |
| 135 | + with patch.object(module, 'get_pci_bus_info', side_effect=Exception()): |
| 136 | + assert module.handle_pci_rescan() is False |
| 137 | + |
| 138 | + def test_handle_sensor_removal(self): |
| 139 | + module = ModuleBase() |
| 140 | + |
| 141 | + with patch.object(module, 'get_name', return_value="DPU0"), \ |
| 142 | + patch('os.path.exists', return_value=True), \ |
| 143 | + patch('shutil.copy2') as mock_copy, \ |
| 144 | + patch('os.system') as mock_system: |
| 145 | + assert module.handle_sensor_removal() is True |
| 146 | + mock_copy.assert_called_once_with("/usr/share/sonic/platform/module_sensors_ignore_conf/ignore_sensors_DPU0.conf", |
| 147 | + "/etc/sensors.d/ignore_sensors_DPU0.conf") |
| 148 | + mock_system.assert_called_once_with("service sensord restart") |
| 149 | + |
| 150 | + with patch.object(module, 'get_name', return_value="DPU0"), \ |
| 151 | + patch('os.path.exists', return_value=False), \ |
| 152 | + patch('shutil.copy2') as mock_copy, \ |
| 153 | + patch('os.system') as mock_system: |
| 154 | + assert module.handle_sensor_removal() is True |
| 155 | + mock_copy.assert_not_called() |
| 156 | + mock_system.assert_not_called() |
| 157 | + |
| 158 | + with patch.object(module, 'get_name', return_value="DPU0"), \ |
| 159 | + patch('os.path.exists', return_value=True), \ |
| 160 | + patch('shutil.copy2', side_effect=Exception("Copy failed")): |
| 161 | + assert module.handle_sensor_removal() is False |
| 162 | + |
| 163 | + def test_handle_sensor_addition(self): |
| 164 | + module = ModuleBase() |
| 165 | + |
| 166 | + with patch.object(module, 'get_name', return_value="DPU0"), \ |
| 167 | + patch('os.path.exists', return_value=True), \ |
| 168 | + patch('os.remove') as mock_remove, \ |
| 169 | + patch('os.system') as mock_system: |
| 170 | + assert module.handle_sensor_addition() is True |
| 171 | + mock_remove.assert_called_once_with("/etc/sensors.d/ignore_sensors_DPU0.conf") |
| 172 | + mock_system.assert_called_once_with("service sensord restart") |
| 173 | + |
| 174 | + with patch.object(module, 'get_name', return_value="DPU0"), \ |
| 175 | + patch('os.path.exists', return_value=False), \ |
| 176 | + patch('os.remove') as mock_remove, \ |
| 177 | + patch('os.system') as mock_system: |
| 178 | + assert module.handle_sensor_addition() is True |
| 179 | + mock_remove.assert_not_called() |
| 180 | + mock_system.assert_not_called() |
| 181 | + |
| 182 | + with patch.object(module, 'get_name', return_value="DPU0"), \ |
| 183 | + patch('os.path.exists', return_value=True), \ |
| 184 | + patch('os.remove', side_effect=Exception("Remove failed")): |
| 185 | + assert module.handle_sensor_addition() is False |
| 186 | + |
| 187 | + def test_module_pre_shutdown(self): |
| 188 | + module = ModuleBase() |
| 189 | + |
| 190 | + # Test successful case |
| 191 | + with patch.object(module, 'handle_pci_removal', return_value=True), \ |
| 192 | + patch.object(module, 'handle_sensor_removal', return_value=True): |
| 193 | + assert module.module_pre_shutdown() is True |
| 194 | + |
| 195 | + # Test PCI removal failure |
| 196 | + with patch.object(module, 'handle_pci_removal', return_value=False), \ |
| 197 | + patch.object(module, 'handle_sensor_removal', return_value=True): |
| 198 | + assert module.module_pre_shutdown() is False |
| 199 | + |
| 200 | + # Test sensor removal failure |
| 201 | + with patch.object(module, 'handle_pci_removal', return_value=True), \ |
| 202 | + patch.object(module, 'handle_sensor_removal', return_value=False): |
| 203 | + assert module.module_pre_shutdown() is False |
| 204 | + |
| 205 | + def test_module_post_startup(self): |
| 206 | + module = ModuleBase() |
| 207 | + |
| 208 | + # Test successful case |
| 209 | + with patch.object(module, 'handle_pci_rescan', return_value=True), \ |
| 210 | + patch.object(module, 'handle_sensor_addition', return_value=True): |
| 211 | + assert module.module_post_startup() is True |
| 212 | + |
| 213 | + # Test PCI rescan failure |
| 214 | + with patch.object(module, 'handle_pci_rescan', return_value=False), \ |
| 215 | + patch.object(module, 'handle_sensor_addition', return_value=True): |
| 216 | + assert module.module_post_startup() is False |
| 217 | + |
| 218 | + # Test sensor addition failure |
| 219 | + with patch.object(module, 'handle_pci_rescan', return_value=True), \ |
| 220 | + patch.object(module, 'handle_sensor_addition', return_value=False): |
| 221 | + assert module.module_post_startup() is False |
0 commit comments