Skip to content

Patching does not work when running functions inside a multiprocessing pool #176

@martin-s-a

Description

@martin-s-a

I have this module in my code:

import multiprocessing
from typing import List, Tuple, Callable, Any

def run_parallel_jobs(
  funcs: List[Callable],
  func_args: List[Tuple[Any]],
  n_jobs: int=multiprocessing.cpu_count()
) -> List:
	"""
	### Runs the given command list in parallel.

	#### Params:
	- funcs (list(callable)): Functions to run.
	- func_args (list(tuple(any))): Arguments for each function.
	- n_jobs (int): Optional. Number of parallel jobs.

	#### Returns:
	- (list): List containing the return of each function.
	"""
	with multiprocessing.Pool(n_jobs) as p:
		results = p.starmap(__run_lambda, zip(funcs, func_args))
	return results

def __run_lambda(func: Callable, args: Tuple[Any]) -> Any:
	"""
	### Runs the given function with its args.
	
	#### Params:
	- func (callable): Function to run.
	- args (tuple(any)): Arguments for the function.
	
	#### Returns:
	- (any): Return of the called function.
	"""
	return func(*args)

(if you were wondering, __run_lambda is needed because lambda expressions cannot be pickled, which starmap does)

Also, I have this function to run a shell command using subprocess.Popen:

def run_command(cmd: str, cwd: str='./') -> Tuple[int, str]:
	"""
	### Runs the given shell command.

	#### Params:
	- cmd (str): Command to run.
	- cwd (str): Optional. Path to run the command from.

	#### Returns:
	- (int): Return code of the operation.
	- (str): Return message of the operation.
	"""
	process = subprocess.Popen(cmd, cwd=cwd, env=os.environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
	process.wait()
	
	ret_code = process.returncode
	output, err = process.communicate()
	output_str = output.decode() if not ret_code and output else err.decode()
	output_str = output_str[:-1] if output_str.endswith('\n') else output_str
	return ret_code, output_str

And, for some test, I am using a combination of both functions like this:

from my_package import orquestrator

def cat_os_release():
	# This will only work on unix-based systems
	retcode, output = run_command("cat /etc/os-release")
	return output if not retcode else "FAILED"

def cat_pmu_name():
	# This will only work on Unix-based machines with an Intel CPU
	retcode, output = run_command("cat /sys/devices/cpu/caps/pmu_name")
	return output if not retcode else "FAILED"

def test_subprocess_in_pool(fake_process):
	fake_process.register_subprocess(
		shlex.split("cat /etc/os-release"),
		stdout="OS_RELEASE"
	)
	fake_process.register_subprocess(
		shlex.split("cat /sys/devices/cpu/caps/pmu_name"),
		stdout="PMU_NAME"
	)
	results = orquestrator.run_parallel_jobs(
		[cat_os_release, cat_pmu_name],
		[(), ()]
	)
	assert results == ["OS_RELEASE", "PMU_NAME"]

When I run this test (I am on a Windows machine, so, natively, both commands should fail), results variable is ['FAILED', 'FAILED'].
However, if I run each function individually instead of inside the multiprocessing.Pool:

def test_subprocess_in_pool(fake_process):
	fake_process.register_subprocess(
		shlex.split("cat /etc/os-release"),
		stdout="OS_RELEASE"
	)
	fake_process.register_subprocess(
		shlex.split("cat /sys/devices/cpu/caps/pmu_name"),
		stdout="PMU_NAME"
	)
	results = orquestrator.run_parallel_jobs(
		[cat_os_release, cat_pmu_name],
		[(), ()]
	)
	os_release = cat_os_release()
	pmu_name = cat_pmu_name()
	assert results == ["OS_RELEASE", "PMU_NAME"]

And I check with the debugger, individually os_release and pmu_name have the values I set when registering the commands.

So, my conclusion is that, when you run a command inside a process pool, the fake generated by this library is not applied to it, executing the original command instead.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions