Skip to content

Commit 86a5c25

Browse files
dhsrivasAmandeep-Kaur2202
authored andcommitted
utils/pci: Introduce functions for attaching driver and interrupt capability checks
This patch introduces a set of PCI utility functions to simplify device management and capability checks: * add_vendor_id() – retrieves the vendor ID of a PCI device and associate it with the specified driver * attach_driver() – unbinds a device from its current driver and rebinds it to a given driver * check_msix_capability() – verifies whether the device supports msix interrupts * device_supports_irqs() – checks if the device supports at least a specified number of IRQs These helpers provide a cleaner and reusable interface for driver binding and interrupt capability validation. Signed-off-by: Dheeraj Kumar Srivastava <dheerajkumar.srivastava@amd.com>
1 parent e12175a commit 86a5c25

File tree

3 files changed

+154
-1
lines changed

3 files changed

+154
-1
lines changed

avocado/utils/pci.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"""
2222

2323

24+
import errno
2425
import os
2526
import re
2627

@@ -376,6 +377,94 @@ def get_vendor_id(full_pci_address):
376377
return out.split(" ")[2].strip()
377378

378379

380+
def add_vendor_id(full_pci_address, driver):
381+
"""
382+
Retrieve and add the vendor ID of a PCI device to the specified driver.
383+
384+
:param full_pci_address: Full PCI device address, including domain
385+
(e.g., 0000:03:00.0).
386+
:param driver: Driver to associate with the vendor ID of the PCI device.
387+
"""
388+
# Get vendor id of pci device
389+
output = get_vendor_id(full_pci_address)
390+
vid = output.replace(":", " ")
391+
392+
# Add device vendor id to the driver
393+
try:
394+
genio.write_file(f"/sys/bus/pci/drivers/{driver}/new_id", f"{vid}\n")
395+
except OSError as e:
396+
if e.errno != errno.EEXIST:
397+
raise ValueError(
398+
f"Failed to add vendor ID '{vid}' to driver '{driver}': {e}"
399+
) from e
400+
401+
402+
def attach_driver(full_pci_address, driver):
403+
"""
404+
Unbind the device from its existing driver and bind it to the given driver.
405+
406+
:param full_pci_address: Full PCI device address (e.g., 0000:03:00.0)
407+
:param driver: Driver to be attached to the specified PCI device
408+
:raises ValueError: If `driver` or `full_pci_address` is None, or if the driver
409+
could not be attached successfully due to an OS error.
410+
:warning: This function may unbind the device from its current driver, which can
411+
temporarily make the device unavailable until it is reattached.
412+
"""
413+
414+
try:
415+
if not driver or not full_pci_address:
416+
raise ValueError("'driver' or 'full_pci_address' inputs are None")
417+
418+
# add vendor id of device to driver
419+
add_vendor_id(full_pci_address, driver)
420+
421+
# unbind the device from its initial driver
422+
cur_driver = get_driver(full_pci_address)
423+
if cur_driver is not None:
424+
unbind(cur_driver, full_pci_address)
425+
426+
# Bind device to driver
427+
bind(driver, full_pci_address)
428+
429+
except (OSError, ValueError, genio.GenIOError) as e:
430+
raise ValueError(
431+
f"Not able to attach {driver} to {full_pci_address}. Reason: {e}"
432+
) from e
433+
434+
435+
def check_msix_capability(full_pci_address):
436+
"""
437+
Check whether the PCI device supports Extended Message Signaled Interrupts.
438+
439+
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
440+
:return: True if supported, False otherwise
441+
"""
442+
out = process.run(f"lspci -vvs {full_pci_address}", ignore_status=True, shell=True)
443+
if out.exit_status:
444+
raise ValueError(f"lspci failed for {full_pci_address}: {out.stderr_text}")
445+
return any("MSI-X:" in line for line in out.stdout_text.splitlines())
446+
447+
448+
def device_supports_irqs(full_pci_address, count):
449+
"""
450+
Check if the device supports at least the specified number of interrupts.
451+
452+
:param full_pci_address: Full PCI device address including domain (e.g., 0000:03:00.0)
453+
:param count: Number of IRQs the device should support
454+
:return: True if supported, False otherwise
455+
"""
456+
out = process.run(
457+
f"lspci -vv -s {full_pci_address}", ignore_status=True, shell=True
458+
)
459+
if out.exit_status:
460+
raise ValueError(f"lspci failed for {full_pci_address}: {out.stderr_text}")
461+
match = re.search(r"MSI-X:.*?Count=(\d+)", out.stdout_text, re.S)
462+
if match:
463+
nirq = int(match.group(1))
464+
return nirq >= count
465+
return False
466+
467+
379468
def reset_check(full_pci_address):
380469
"""
381470
Check if reset for "full_pci_address" is successful

selftests/check.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"job-api-check-tmp-directory-exists": 1,
2828
"nrunner-interface": 90,
2929
"nrunner-requirement": 28,
30-
"unit": 957,
30+
"unit": 965,
3131
"jobs": 11,
3232
"functional-parallel": 366,
3333
"functional-serial": 7,

selftests/unit/utils/pci.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import errno
12
import unittest.mock
23

34
from avocado.utils import pci
@@ -42,6 +43,69 @@ def test_get_slot_from_sysfs_negative(self):
4243
):
4344
self.assertRaises(ValueError, pci.get_slot_from_sysfs, "0002:01:00.1")
4445

46+
@unittest.mock.patch("avocado.utils.pci.get_vendor_id")
47+
@unittest.mock.patch("avocado.utils.pci.genio.write_file")
48+
def test_add_vendor_id_success(self, mock_write, mock_get_vid):
49+
mock_get_vid.return_value = "1234:abcd"
50+
pci.add_vendor_id("0000:03:00.0", "driver")
51+
mock_get_vid.assert_called_once_with("0000:03:00.0")
52+
mock_write.assert_called_once_with(
53+
"/sys/bus/pci/drivers/driver/new_id", "1234 abcd\n"
54+
)
55+
56+
@unittest.mock.patch("avocado.utils.pci.get_vendor_id", return_value="1234:abcd")
57+
@unittest.mock.patch(
58+
"avocado.utils.pci.genio.write_file",
59+
side_effect=OSError(errno.EEXIST, "File exists"),
60+
)
61+
def test_add_vendor_id_already_exists(self, mock_write, mock_get_vid):
62+
pci.add_vendor_id("0000:03:00.0", "driver")
63+
mock_get_vid.assert_called_once_with("0000:03:00.0")
64+
mock_write.assert_called_once_with(
65+
"/sys/bus/pci/drivers/driver/new_id", "1234 abcd\n"
66+
)
67+
68+
@unittest.mock.patch("avocado.utils.pci.bind")
69+
@unittest.mock.patch("avocado.utils.pci.unbind")
70+
@unittest.mock.patch("avocado.utils.pci.get_driver")
71+
@unittest.mock.patch("avocado.utils.pci.add_vendor_id")
72+
def test_attach_driver(self, mock_add_vid, mock_get_driver, mock_unbind, mock_bind):
73+
mock_get_driver.return_value = "old_driver"
74+
pci.attach_driver("0000:03:00.0", "new_driver")
75+
mock_add_vid.assert_called_once()
76+
mock_unbind.assert_called_once_with("old_driver", "0000:03:00.0")
77+
mock_bind.assert_called_once_with("new_driver", "0000:03:00.0")
78+
79+
@unittest.mock.patch("avocado.utils.pci.process.run")
80+
def test_check_msix_capability_supported(self, mock_run):
81+
mock_run.return_value.exit_status = 0
82+
mock_run.return_value.stdout_text = "Capabilities: [90] MSI-X: Enable+ Count=16"
83+
self.assertTrue(pci.check_msix_capability("0000:03:00.0"))
84+
85+
@unittest.mock.patch("avocado.utils.pci.process.run")
86+
def test_check_msix_capability_unsupported(self, mock_run):
87+
mock_run.return_value.exit_status = 0
88+
mock_run.return_value.stdout_text = "Capabilities: [90] MSI: Enable+ Count=16"
89+
self.assertFalse(pci.check_msix_capability("0000:03:00.0"))
90+
91+
@unittest.mock.patch("avocado.utils.pci.process.run")
92+
def test_device_supports_irqs_enough(self, mock_run):
93+
mock_run.return_value.exit_status = 0
94+
mock_run.return_value.stdout_text = "MSI-X: Enable+ Count=64"
95+
self.assertTrue(pci.device_supports_irqs("0000:03:00.0", 32))
96+
97+
@unittest.mock.patch("avocado.utils.pci.process.run")
98+
def test_device_supports_irqs_insufficient(self, mock_run):
99+
mock_run.return_value.exit_status = 0
100+
mock_run.return_value.stdout_text = "MSI-X: Enable+ Count=4"
101+
self.assertFalse(pci.device_supports_irqs("0000:03:00.0", 16))
102+
103+
@unittest.mock.patch("avocado.utils.pci.process.run")
104+
def test_device_supports_irqs_no_msix(self, mock_run):
105+
mock_run.return_value.exit_status = 0
106+
mock_run.return_value.stdout_text = "Capabilities: [90] MSI: Enable+ Count=16"
107+
self.assertFalse(pci.device_supports_irqs("0000:03:00.0", 8))
108+
45109

46110
if __name__ == "__main__":
47111
unittest.main()

0 commit comments

Comments
 (0)