Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
migrate_desturi_type = "ssh"
virsh_migrate_desturi = "qemu+ssh://%s@${migrate_dest_host}/session"
vm_ping_outside = "pass"
host_iface =
remote_host_iface =
# Ethernet interface specific settings
bridge_name = "br0"
tap_name = "mytap0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from provider.migration import base_steps
from provider.virtual_network import network_base

virsh_dargs = {'debug': True, 'ignore_status': False}
virsh_dargs = {"debug": True, "ignore_status": False}


def run(test, params, env):
Expand All @@ -41,17 +41,31 @@ def run(test, params, env):
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""

def _prepare_tap_for_vm():
remote_host_iface = utils_net.get_default_gateway(
iface_name=True, session=remote_session, force_dhcp=False, json=True)
remote_host_iface = params.get("remote_host_iface")
if not remote_host_iface:
remote_host_iface = utils_net.get_default_gateway(
iface_name=True, session=remote_session, force_dhcp=False, json=True
)
params.update({"remote_host_iface": remote_host_iface})

utils_net.create_linux_bridge_tmux(bridge_name, host_iface)
network_base.create_tap(tap_name, bridge_name, unpr_user, tap_mtu=tap_mtu, flag=tap_flag)
network_base.create_tap(
tap_name, bridge_name, unpr_user, tap_mtu=tap_mtu, flag=tap_flag
)

utils_net.create_linux_bridge_tmux(bridge_name, remote_host_iface, session=remote_session)
network_base.create_tap(tap_name, bridge_name, unpr_user,
session=remote_session, tap_mtu=tap_mtu, flag=tap_flag)
utils_net.create_linux_bridge_tmux(
bridge_name, remote_host_iface, session=remote_session
)
network_base.create_tap(
tap_name,
bridge_name,
unpr_user,
session=remote_session,
tap_mtu=tap_mtu,
flag=tap_flag,
)

def create_unpr_user_and_vm(user_name, user_passwd, main_vm_name, remote_session):
"""
Expand All @@ -65,8 +79,10 @@ def create_unpr_user_and_vm(user_name, user_passwd, main_vm_name, remote_session
"""
test.log.info(f"Creating unprivileged user for local and remote: {user_name}")
try:
add_user, add_pwd = (f'useradd -m {user_name}',
f'echo "{user_name}:{user_passwd}" | chpasswd')
add_user, add_pwd = (
f"useradd -m {user_name}",
f'echo "{user_name}:{user_passwd}" | chpasswd',
)
process.run(add_user, shell=True)
process.run(add_pwd, shell=True)
if remote_session:
Expand All @@ -79,18 +95,21 @@ def create_unpr_user_and_vm(user_name, user_passwd, main_vm_name, remote_session
_prepare_tap_for_vm()
root_vmxml = vm_xml.VMXML.new_from_inactive_dumpxml(main_vm_name)
logging.debug("Remove 'dac' security driver for unprivileged user")
root_vmxml.del_seclabel(by_attr=[('model', 'dac')])
root_vmxml.del_seclabel(by_attr=[("model", "dac")])

# Define VM XML for unprivileged user
unpr_vmxml = network_base.prepare_vmxml_for_unprivileged_user(user_name, root_vmxml)
libvirt_vmxml.modify_vm_device(unpr_vmxml, 'interface', iface_attrs)
unpr_vmxml = network_base.prepare_vmxml_for_unprivileged_user(
user_name, root_vmxml
)
libvirt_vmxml.modify_vm_device(unpr_vmxml, "interface", iface_attrs)
new_vm_name = unpr_vmxml.vm_name
test.log.info(f"Defining VM for unprivileged user: {user_name}")
network_base.define_vm_for_unprivileged_user(user_name, unpr_vmxml)

# Get unprivileged VM object
unpr_vm = libvirt_unprivileged.get_unprivileged_vm(
new_vm_name, user_name, user_passwd, **unpr_vm_args)
new_vm_name, user_name, user_passwd, **unpr_vm_args
)
params.update({"migrate_main_vm": new_vm_name})

return unpr_vm
Expand All @@ -101,7 +120,9 @@ def setup_test():
"""
test.log.info("TEST_SETUP: Setting up test environment")
virsh.start(unpr_vm_obj.name, debug=True, unprivileged_user=unpr_user)
test.log.debug("Test with guest xml:%s", vm_xml.VMXML.new_from_dumpxml(unpr_vm_obj.name))
test.log.debug(
"Test with guest xml:%s", vm_xml.VMXML.new_from_dumpxml(unpr_vm_obj.name)
)
unpr_vm_obj.wait_for_login().close()

def run_test():
Expand All @@ -119,14 +140,18 @@ def run_test():
option=virsh_migrate_options,
extra=virsh_migrate_extra,
unprivileged_user=unpr_user,
**virsh_dargs
**virsh_dargs,
)

if not cancel_migration:
test.log.info("TEST_STEP: Checking VM network connectivity on target host")
session = network_base.unprivileged_user_login(unpr_vm_obj.name, unpr_user, params.get('username'),
params.get('password'))
ips = {'outside_ip': outside_ip}
session = network_base.unprivileged_user_login(
unpr_vm_obj.name,
unpr_user,
params.get("username"),
params.get("password"),
)
ips = {"outside_ip": outside_ip}
network_base.ping_check(params, ips, session)

if migrate_vm_back:
Expand All @@ -142,20 +167,26 @@ def teardown_test():
network_base.delete_tap(tap_name, session=remote_session)
utils_net.delete_linux_bridge_tmux(bridge_name, iface_name=host_iface)
utils_net.delete_linux_bridge_tmux(
bridge_name, iface_name=params.get("remote_host_iface"),
session=remote_session)
bridge_name,
iface_name=params.get("remote_host_iface"),
session=remote_session,
)

if unpr_vm_obj.is_alive():
virsh.destroy(unpr_vm_obj.name, unprivileged_user=unpr_user, debug=True)
virsh.undefine(unpr_vm_obj.name, options='--nvram',
unprivileged_user=unpr_user, debug=True)
virsh.undefine(
unpr_vm_obj.name, options="--nvram", unprivileged_user=unpr_user, debug=True
)

migration_obj.cleanup_connection()
if remote_session:
remote_session.cmd(f'pkill -u {unpr_user};userdel -f -r {unpr_user}')
remote_session.cmd(f"pkill -u {unpr_user};userdel -f -r {unpr_user}")
remote_session.close()
process.run(f'pkill -u {unpr_user};userdel -f -r {unpr_user}',
shell=True, ignore_status=True)
process.run(
f"pkill -u {unpr_user};userdel -f -r {unpr_user}",
shell=True,
ignore_status=True,
)
Comment on lines +183 to +189
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid shell command construction with untrusted user input.

unpr_user is configurable; interpolating it into shell strings risks command injection. Prefer argv lists and/or quoting.

🔒 Suggested fix (avoid shell strings)
@@
-import logging
+import logging
+import shlex
@@
-        if remote_session:
-            remote_session.cmd(f"pkill -u {unpr_user};userdel -f -r {unpr_user}")
+        if remote_session:
+            safe_user = shlex.quote(unpr_user)
+            remote_session.cmd(f"pkill -u {safe_user}; userdel -f -r {safe_user}")
             remote_session.close()
-        process.run(
-            f"pkill -u {unpr_user};userdel -f -r {unpr_user}",
-            shell=True,
-            ignore_status=True,
-        )
+        process.run(["pkill", "-u", unpr_user], ignore_status=True)
+        process.run(["userdel", "-f", "-r", unpr_user], ignore_status=True)
🧰 Tools
🪛 Ruff (0.14.13)

185-185: Function call with shell=True parameter identified, security issue

(S604)

🤖 Prompt for AI Agents
In `@libvirt/tests/src/virtual_network/migrate/migrate_with_ethernet_interface.py`
around lines 183 - 189, The cleanup uses unpr_user directly in
shell-interpolated commands (remote_session.cmd and process.run), which risks
injection; change both calls to avoid shell string interpolation by passing argv
lists (e.g., pass ["pkill", "-u", unpr_user] and ["userdel", "-f", "-r",
unpr_user] to process.run with shell=False) or, if the remote session API only
accepts a command string, safely escape the username with shlex.quote before
composing the command; update the remote_session.cmd invocation and the
process.run call to use these safe forms and ensure ignore_status behavior is
preserved.


server_ip = params.get("migrate_dest_host")
server_user = params.get("server_user")
Expand All @@ -168,28 +199,34 @@ def teardown_test():
cancel_migration = params.get_boolean("cancel_migration")
migrate_vm_back = params.get_boolean("migrate_vm_back", True)
iface_attrs = eval(params.get("iface_attrs", "{}"))
host_iface = params.get('host_iface')
host_iface = host_iface if host_iface else utils_net.get_default_gateway(
iface_name=True, json=True)
host_iface = params.get("host_iface")
host_iface = (
host_iface
if host_iface
else utils_net.get_default_gateway(iface_name=True, json=True)
)

rand_id = utils_misc.generate_random_string(3)
vm_name = params.get('main_vm')
unpr_user = params.get('unpr_user', 'test_unpr') + rand_id
unpr_passwd = params.get('test_passwd', params.get("password"))
vm_name = params.get("main_vm")
unpr_user = params.get("unpr_user", "test_unpr") + rand_id
unpr_passwd = params.get("test_passwd", params.get("password"))
desturi = params.get("virsh_migrate_desturi") % unpr_user
migrate_source_host = params.get("migrate_source_host")
migrate_uri = "qemu+ssh://%s@%s/session" % (unpr_user, migrate_source_host)

virsh_migrate_options = params.get("virsh_migrate_options")
virsh_migrate_extra = params.get("virsh_migrate_extra")
unpr_vm_args = {
'username': params.get('username'),
'password': params.get('password'),
"username": params.get("username"),
"password": params.get("password"),
}

remote_session = aexpect.remote.remote_login(
"ssh", server_ip, "22", server_user, server_pwd, r'[$#%]')
unpr_vm_obj = create_unpr_user_and_vm(unpr_user, unpr_passwd, vm_name, remote_session)
"ssh", server_ip, "22", server_user, server_pwd, r"[$#%]"
)
unpr_vm_obj = create_unpr_user_and_vm(
unpr_user, unpr_passwd, vm_name, remote_session
)
migration_obj = base_steps.MigrationBase(test, unpr_vm_obj, params)

try:
Expand Down