Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions avocado/utils/vfio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: 2025 Advanced Micro Devices, Inc.
# Author: Dheeraj Kumar Srivastava <[email protected]> # pylint: disable=C0401

# pylint: disable=C0402

"""APIs for virtual function I/O management."""


import ctypes
import os
import struct
from fcntl import ioctl

from avocado.utils import pci


def get_vfio_container_fd():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see this function being used publicly or only by the vfio module itself? If this is probably not going to be used by end users, maybe making it a private function can give more API flexiblity. And, TBH, it seems to provide little value other than standardization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. The intention is to keep this helper public, as it’s expected to be reused when testing devices whose functionality is exposed to guest space, where VFIO becomes part of the test flow. In that context, having a small, standardized utility to obtain the VFIO container FD helps avoid duplication and keeps VFIO-related setup consistent across tests.

If this use case aligns with Avocado’s API expectations, I believe keeping it public is reasonable; otherwise, I’m open to revisiting the scope as the usage evolves.

"""get vfio container file descriptor

:return: file descriptor of the vfio container.
:rtype: int
:raises ValueError: if the vfio container cannot be opened.
"""
try:
return os.open("/dev/vfio/vfio", os.O_RDWR | os.O_CLOEXEC)
except OSError as e:
raise ValueError(f"Failed to open VFIO container: {e}") from e


def check_vfio_container(
container_fd,
vfio_get_api_version,
vfio_api_version,
vfio_check_extension,
vfio_type_iommu,
):
"""Validate a vfio container by verifying the api version and ensuring that the
required iommu extension is supported. If either validation fails, an exception
is raised with an appropriate message

:param container_fd: vfio container file descriptor
:type container_fd: int
:param vfio_get_api_version: ioctl to retrieve the vfio container api version
:type vfio_get_api_version: int
:param vfio_api_version: expected vfio api version
:type vfio_api_version: int
:param vfio_check_extension: ioctl to check iommu extension support
:type vfio_check_extension: int
:param vfio_type_iommu: expected vfio iommu type
:type vfio_type_iommu: int
:raises ValueError: if the vfio api version is invalid or if the required
iommu extension is not supported.
:return: True if vfio container fd check passes.
:rtype: bool
"""
try:
if ioctl(container_fd, vfio_get_api_version) != vfio_api_version:
raise OSError("Failed to get right API version")
except OSError as e:
raise ValueError(f"Failed to get API version: {e}") from e

try:
ioctl(container_fd, vfio_check_extension, vfio_type_iommu)
except OSError as e:
raise ValueError(f"does not support type 1 iommu: {e}") from e

return True


def get_iommu_group_fd(device, vfio_group_get_status, vfio_group_flags_viable):
"""get iommu group fd for the pci device

:param device: full pci address including domain (0000:03:00.0)
:type device: str
:param vfio_group_get_status: ioctl to get iommu group status
:type vfio_group_get_status: int
:param vfio_group_flags_viable: ioctl to check if iommu group is viable
:type vfio_group_flags_viable: int
:raises ValueError: if the vfio group device cannot be opened or the group
is not viable.
:return: file descriptor for the iommu group.
:rtype: int
"""
vfio_group = f"/dev/vfio/{pci.get_iommu_group(device)}"
try:
group_fd = os.open(vfio_group, os.O_RDWR)
except OSError as e:
raise ValueError(f"Failed to open {vfio_group}: {e}") from e

argsz = struct.calcsize("II")
group_status_request = struct.pack("II", argsz, 2)
group_status_response = ioctl(group_fd, vfio_group_get_status, group_status_request)
group_status = struct.unpack("II", group_status_response)

if not group_status[1] & vfio_group_flags_viable:
raise ValueError("Group not viable, are all devices attached to vfio?")

return group_fd
Comment on lines +81 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add exception handling for ioctl call.

The ioctl() call at line 98 is not wrapped in a try/except block. If the ioctl fails, an unhandled OSError will propagate instead of a descriptive ValueError. This is inconsistent with other functions in this module (e.g., get_device_fd() which properly wraps ioctl).

Apply this diff to add proper exception handling:

     vfio_group = f"/dev/vfio/{pci.get_iommu_group(device)}"
     try:
         group_fd = os.open(vfio_group, os.O_RDWR)
     except OSError as e:
         raise ValueError(f"Failed to open {vfio_group}: {e}") from e

     argsz = struct.calcsize("II")
     group_status_request = struct.pack("II", argsz, 2)
-    group_status_response = ioctl(group_fd, vfio_group_get_status, group_status_request)
+    try:
+        group_status_response = ioctl(group_fd, vfio_group_get_status, group_status_request)
+    except OSError as e:
+        os.close(group_fd)
+        raise ValueError(f"Failed to get group status: {e}") from e
+    
     group_status = struct.unpack("II", group_status_response)

     if not group_status[1] & vfio_group_flags_viable:
+        os.close(group_fd)
         raise ValueError("Group not viable, are all devices attached to vfio?")

     return group_fd

Note: Added os.close(group_fd) in error paths to prevent file descriptor leaks.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_iommu_group_fd(device, vfio_group_get_status, vfio_group_flags_viable):
"""get iommu group fd for the pci device
:param device: full pci address including domain (0000:03:00.0)
:type device: str
:param vfio_group_get_status: ioctl to get iommu group status
:type vfio_group_get_status: int
:param vfio_group_flags_viable: ioctl to check if iommu group is viable
:type vfio_group_flags_viable: int
:raises ValueError: if the vfio group device cannot be opened or the group
is not viable.
:return: file descriptor for the iommu group.
:rtype: int
"""
vfio_group = f"/dev/vfio/{pci.get_iommu_group(device)}"
try:
group_fd = os.open(vfio_group, os.O_RDWR)
except OSError as e:
raise ValueError(f"Failed to open {vfio_group}: {e}") from e
argsz = struct.calcsize("II")
group_status_request = struct.pack("II", argsz, 2)
group_status_response = ioctl(group_fd, vfio_group_get_status, group_status_request)
group_status = struct.unpack("II", group_status_response)
if not group_status[1] & vfio_group_flags_viable:
raise ValueError("Group not viable, are all devices attached to vfio?")
return group_fd
def get_iommu_group_fd(device, vfio_group_get_status, vfio_group_flags_viable):
"""get iommu group fd for the pci device
:param device: full pci address including domain (0000:03:00.0)
:type device: str
:param vfio_group_get_status: ioctl to get iommu group status
:type vfio_group_get_status: int
:param vfio_group_flags_viable: ioctl to check if iommu group is viable
:type vfio_group_flags_viable: int
:raises ValueError: if the vfio group device cannot be opened or the group
is not viable.
:return: file descriptor for the iommu group.
:rtype: int
"""
vfio_group = f"/dev/vfio/{pci.get_iommu_group(device)}"
try:
group_fd = os.open(vfio_group, os.O_RDWR)
except OSError as e:
raise ValueError(f"Failed to open {vfio_group}: {e}") from e
argsz = struct.calcsize("II")
group_status_request = struct.pack("II", argsz, 2)
try:
group_status_response = ioctl(group_fd, vfio_group_get_status, group_status_request)
except OSError as e:
os.close(group_fd)
raise ValueError(f"Failed to get group status: {e}") from e
group_status = struct.unpack("II", group_status_response)
if not group_status[1] & vfio_group_flags_viable:
os.close(group_fd)
raise ValueError("Group not viable, are all devices attached to vfio?")
return group_fd
🧰 Tools
🪛 Ruff (0.14.0)

94-94: Avoid specifying long messages outside the exception class

(TRY003)


102-102: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In avocado/utils/vfio.py around lines 76 to 104, the ioctl() call that checks
group status is not exception-handled so an OSError would propagate and leak the
opened group_fd; wrap the ioctl and subsequent unpack in a try/except OSError
block, on exception close group_fd and raise a ValueError with a descriptive
message (including the original error), and likewise ensure that if the group is
not viable you close group_fd before raising the ValueError to avoid file
descriptor leaks.



def attach_group_to_container(group_fd, container_fd, vfio_group_set_container):
"""attach the iommu group of pci device to the vfio container.

:param group_fd: iommu group file descriptor
:type group_fd: int
:param container_fd: vfio container file descriptor
:type container_fd: int
:param vfio_group_set_container: vfio ioctl to add iommu group to the container fd
:type vfio_group_set_container: int
:raises ValueError: if attaching the group to the container fails.
"""

try:
ioctl(group_fd, vfio_group_set_container, ctypes.c_void_p(container_fd))
except OSError as e:
raise ValueError(
f"failed to attach pci device's iommu group to the vfio container: {e}"
) from e


def detach_group_from_container(group_fd, container_fd, vfio_group_unset_container):
"""detach the iommu group of pci device from vfio container

:param group_fd: iommu group file descriptor
:type group_fd: int
:param container_fd: vfio container file descriptor
:type container_fd: int
:param vfio_group_unset_container: vfio ioctl to detach iommu group from the
container fd
:type vfio_group_unset_container: int
:raises ValueError: if detaching the group to the container fails.
"""

try:
ioctl(group_fd, vfio_group_unset_container, ctypes.c_void_p(container_fd))
except OSError as e:
raise ValueError(
f"failed to detach pci device's iommu group from vfio container: {e}"
) from e


def get_device_fd(device, group_fd, vfio_group_get_device_fd):
"""Get device file descriptor

:param device: full pci address including domain (0000:03:00.0)
:type device: str
:param group_fd: iommu group file descriptor
:type group_fd: int
:param vfio_group_get_device_fd: ioctl to get device fd
:type vfio_group_get_device_fd: int
:raises ValueError: if not able to get device descriptor
:return: device descriptor
:rtype: int
"""
buf = ctypes.create_string_buffer(device.encode("utf-8") + b"\x00")
try:
device_fd = ioctl(group_fd, vfio_group_get_device_fd, buf)
except OSError as e:
raise ValueError("failed to get vfio device fd") from e

return device_fd


def vfio_device_supports_irq(
device_fd, vfio_pci_msix_irq_index, vfio_device_get_irq_info, count
):
"""Check if device supports at least count number of interrupts

:param device_fd: device file descriptor
:type device_fd: int
:param vfio_pci_msix_irq_index: vfio ioctl to get irq index for msix
:type vfio_pci_msix_irq_index: int
:param vfio_device_get_irq_info: vfio ioctl to get vfio device irq information
:type vfio_device_get_irq_info: int
:param count: number of irqs the device should support
:type count: int
:return: true if supported, false otherwise
:rtype: bool
"""
argsz = struct.calcsize("IIII")
index = vfio_pci_msix_irq_index
irq_info_request = struct.pack("IIII", argsz, 1, index, 1)
irq_info_response = ioctl(device_fd, vfio_device_get_irq_info, irq_info_request)
nirq = (struct.unpack("IIII", irq_info_response))[3]
if nirq < count:
return False
return True
Comment on lines +175 to +198
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add exception handling for ioctl call.

The ioctl() call at line 186 is not wrapped in a try/except block. If the ioctl fails, an unhandled OSError will propagate. For consistency with get_device_fd() and proper error handling, this should be wrapped in try/except.

Apply this diff to add proper exception handling:

 def vfio_device_supports_irq(
     device_fd, vfio_pci_msix_irq_index, vfio_device_get_irq_info, count
 ):
     """Check if device supports at least count number of interrupts

     :param device_fd: device file descriptor
     :type device_fd: int
     :param vfio_pci_msix_irq_index: vfio ioctl to get irq index for msix
     :type vfio_pci_msix_irq_index: int
     :param vfio_device_get_irq_info: vfio ioctl to get vfio device irq information
     :type vfio_device_get_irq_info: int
     :param count: number of irqs the device should support
     :type count: int
+    :raises ValueError: if unable to query device IRQ information
     :return: true if supported, false otherwise
     :rtype: bool
     """
     argsz = struct.calcsize("IIII")
     index = vfio_pci_msix_irq_index
     irq_info_request = struct.pack("IIII", argsz, 1, index, 1)
-    irq_info_response = ioctl(device_fd, vfio_device_get_irq_info, irq_info_request)
+    try:
+        irq_info_response = ioctl(device_fd, vfio_device_get_irq_info, irq_info_request)
+    except OSError as e:
+        raise ValueError(f"Failed to get device IRQ info: {e}") from e
+    
     nirq = (struct.unpack("IIII", irq_info_response))[3]
     if nirq < count:
         return False
     return True
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def vfio_device_supports_irq(
device_fd, vfio_pci_msix_irq_index, vfio_device_get_irq_info, count
):
"""Check if device supports at least count number of interrupts
:param device_fd: device file descriptor
:type device_fd: int
:param vfio_pci_msix_irq_index: vfio ioctl to get irq index for msix
:type vfio_pci_msix_irq_index: int
:param vfio_device_get_irq_info: vfio ioctl to get vfio device irq information
:type vfio_device_get_irq_info: int
:param count: number of irqs the device should support
:type count: int
:return: true if supported, false otherwise
:rtype: bool
"""
argsz = struct.calcsize("IIII")
index = vfio_pci_msix_irq_index
irq_info_request = struct.pack("IIII", argsz, 1, index, 1)
irq_info_response = ioctl(device_fd, vfio_device_get_irq_info, irq_info_request)
nirq = (struct.unpack("IIII", irq_info_response))[3]
if nirq < count:
return False
return True
def vfio_device_supports_irq(
device_fd, vfio_pci_msix_irq_index, vfio_device_get_irq_info, count
):
"""Check if device supports at least count number of interrupts
:param device_fd: device file descriptor
:type device_fd: int
:param vfio_pci_msix_irq_index: vfio ioctl to get irq index for msix
:type vfio_pci_msix_irq_index: int
:param vfio_device_get_irq_info: vfio ioctl to get vfio device irq information
:type vfio_device_get_irq_info: int
:param count: number of irqs the device should support
:type count: int
:raises ValueError: if unable to query device IRQ information
:return: true if supported, false otherwise
:rtype: bool
"""
argsz = struct.calcsize("IIII")
index = vfio_pci_msix_irq_index
irq_info_request = struct.pack("IIII", argsz, 1, index, 1)
try:
irq_info_response = ioctl(device_fd, vfio_device_get_irq_info, irq_info_request)
except OSError as e:
raise ValueError(f"Failed to get device IRQ info: {e}") from e
nirq = (struct.unpack("IIII", irq_info_response))[3]
if nirq < count:
return False
return True
🤖 Prompt for AI Agents
In avocado/utils/vfio.py around lines 167 to 190, the ioctl() call that fetches
IRQ info is not protected and will let OSError propagate; wrap the
ioctl(device_fd, vfio_device_get_irq_info, irq_info_request) call in a
try/except OSError block and on failure raise a RuntimeError (or rethrow a
contextual exception) that includes a clear message and the original exception
details so callers get consistent, descriptive error handling similar to
get_device_fd().

2 changes: 1 addition & 1 deletion selftests/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"job-api-check-tmp-directory-exists": 1,
"nrunner-interface": 90,
"nrunner-requirement": 28,
"unit": 934,
"unit": 948,
"jobs": 11,
"functional-parallel": 353,
"functional-serial": 7,
Expand Down
175 changes: 175 additions & 0 deletions selftests/unit/utils/vfio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import os
import struct
import unittest
from unittest import mock

from avocado import Test
from avocado.utils import vfio


class VfioUtilsTests(Test):
@mock.patch("os.open")
def test_get_vfio_container_fd_fail(self, mock_open):
mock_open.side_effect = OSError("No such file or directory")
with self.assertRaises(ValueError) as e:
vfio.get_vfio_container_fd()
self.assertIn("Failed to open VFIO container", str(e.exception))

@mock.patch("avocado.utils.vfio.ioctl")
def test_invalid_api_version(self, mock_ioctl):
mock_ioctl.side_effect = OSError("Failed")
with self.assertRaises(ValueError) as e:
vfio.check_vfio_container(
container_fd=3,
vfio_get_api_version=15204,
vfio_api_version=0,
vfio_check_extension=15205,
vfio_type_iommu=1,
)
self.assertIn("Failed to get API version", str(e.exception))

@mock.patch("avocado.utils.vfio.ioctl")
def test_missing_iommu_extension(self, mock_ioctl):
mock_ioctl.side_effect = [0, OSError("Failed")]
with self.assertRaises(ValueError) as e:
vfio.check_vfio_container(
container_fd=3,
vfio_get_api_version=15204,
vfio_api_version=0,
vfio_check_extension=15205,
vfio_type_iommu=1,
)
self.assertIn("does not support type 1 iommu", str(e.exception))

@mock.patch("avocado.utils.vfio.os.open")
@mock.patch("avocado.utils.vfio.pci.get_iommu_group")
def test_get_group_fd_fail_1(self, mock_get_group, mock_open):
mock_open.side_effect = OSError("No such file or directory")
mock_get_group.return_value = "42"
with self.assertRaises(ValueError) as e:
vfio.get_iommu_group_fd(
device="0000:03:00.0",
vfio_group_get_status=100,
vfio_group_flags_viable=0x1,
)
self.assertIn("Failed to open /dev/vfio/42", str(e.exception))

@mock.patch("avocado.utils.vfio.ioctl")
@mock.patch("avocado.utils.vfio.os.open")
@mock.patch("avocado.utils.vfio.pci.get_iommu_group")
def test_get_group_fd_fail_2(self, mock_get_group, mock_open, mock_ioctl):
mock_open.return_value = 3
mock_get_group.return_value = "42"
argsz = struct.calcsize("II")
# Pack request, ioctl returns same but with flags = 0
mock_ioctl.return_value = struct.pack("II", argsz, 0)

with self.assertRaises(ValueError) as ctx:
vfio.get_iommu_group_fd(
device="0000:03:00.0",
vfio_group_get_status=100,
vfio_group_flags_viable=0x1,
)
self.assertIn("Group not viable", str(ctx.exception))

@mock.patch("avocado.utils.vfio.ioctl")
@mock.patch("avocado.utils.vfio.os.open")
@mock.patch("avocado.utils.vfio.pci.get_iommu_group")
def test_get_group_fd_success(self, mock_get_group, mock_open, mock_ioctl):
mock_open.return_value = 3
mock_get_group.return_value = "42"
argsz = struct.calcsize("II")
# Pack request, ioctl returns same but with flags = 1
mock_ioctl.return_value = struct.pack("II", argsz, 0x1)

fd = vfio.get_iommu_group_fd(
device="0000:03:00.0",
vfio_group_get_status=100,
vfio_group_flags_viable=0x1,
)

self.assertEqual(fd, 3)
mock_open.assert_called_once_with("/dev/vfio/42", os.O_RDWR)
mock_ioctl.assert_called_once()

@mock.patch("avocado.utils.vfio.ioctl")
def test_attach_group_to_container_success(self, mock_ioctl):
mock_ioctl.return_value = 0
# Should not raise
vfio.attach_group_to_container(
group_fd=10, container_fd=20, vfio_group_set_container=100
)

@mock.patch("avocado.utils.vfio.ioctl")
def test_attach_group_to_container_failure(self, mock_ioctl):
mock_ioctl.side_effect = OSError("Failed")
with self.assertRaises(ValueError) as ctx:
vfio.attach_group_to_container(
group_fd=10, container_fd=20, vfio_group_set_container=100
)
self.assertIn("failed to attach pci device", str(ctx.exception))

@mock.patch("avocado.utils.vfio.ioctl")
def test_detach_group_from_container_success(self, mock_ioctl):
mock_ioctl.return_value = 0
vfio.detach_group_from_container(
group_fd=10, container_fd=20, vfio_group_unset_container=200
)

@mock.patch("avocado.utils.vfio.ioctl")
def test_detach_group_from_container_failure(self, mock_ioctl):
mock_ioctl.side_effect = OSError("Failed")
with self.assertRaises(ValueError) as ctx:
vfio.detach_group_from_container(
group_fd=10, container_fd=20, vfio_group_unset_container=200
)
self.assertIn("failed to detach pci device", str(ctx.exception))

@mock.patch("avocado.utils.vfio.ioctl")
def test_get_device_fd_success(self, mock_ioctl):
mock_ioctl.return_value = 50
device_fd = vfio.get_device_fd(
"0000:03:00.0", group_fd=10, vfio_group_get_device_fd=200
)
self.assertEqual(device_fd, 50)

@mock.patch("avocado.utils.vfio.ioctl")
def test_get_device_fd_failure(self, mock_ioctl):
mock_ioctl.side_effect = OSError("Device not found")
with self.assertRaises(ValueError) as e:
vfio.get_device_fd(
"0000:03:00.0", group_fd=10, vfio_group_get_device_fd=200
)
self.assertIn("failed to get vfio device fd", str(e.exception))

@mock.patch("avocado.utils.vfio.ioctl")
def test_vfio_device_supports_irq_success(self, mock_ioctl):
argsz = struct.calcsize("IIII")
response = struct.pack("IIII", argsz, 0, 0, 8)
mock_ioctl.return_value = response

result = vfio.vfio_device_supports_irq(
device_fd=30,
vfio_pci_msix_irq_index=100,
vfio_device_get_irq_info=300,
count=4,
)
self.assertTrue(result)

@mock.patch("avocado.utils.vfio.ioctl")
def test_vfio_device_supports_irq_fail(self, mock_ioctl):
argsz = struct.calcsize("IIII")
response = struct.pack("IIII", argsz, 0, 0, 2)
mock_ioctl.return_value = response

result = vfio.vfio_device_supports_irq(
device_fd=30,
vfio_pci_msix_irq_index=100,
vfio_device_get_irq_info=300,
count=4,
)
self.assertFalse(result)


if __name__ == "__main__":
unittest.main()
Loading