Skip to content

Commit 6bf4cc2

Browse files
authored
Merge pull request #88 from ploxiln/parallel_fork
Fix parallel mode on macOS with python-3.8+ (which change default multiprocessing mode from "fork" to "spawn" due to macOS system libraries using threads ...) This situation may affect Linux too starting with Python-3.14 because "spawn" will be the default *if any threads are created*. (In practice Fabric / fab-classic have worked OK with "fork" mode, even with some threads running ...) Also fix-up support for Threads in job_queue.py (even though fab-classic doesn't use it)
2 parents 7c4f256 + a2a278f commit 6bf4cc2

File tree

2 files changed

+22
-16
lines changed

2 files changed

+22
-16
lines changed

fabric/job_queue.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77

88
import time
99
import queue as Queue
10-
from multiprocessing import Process
10+
from collections import namedtuple
1111

1212
from fabric.network import ssh
1313
from fabric.context_managers import settings
1414

1515

16+
DoneProc = namedtuple('DoneProc', ['name', 'exitcode'])
17+
1618
class JobQueue(object):
1719
"""
1820
The goal of this class is to make a queue of processes to run, and go
@@ -73,7 +75,7 @@ def close(self):
7375

7476
def append(self, process):
7577
"""
76-
Add the Process() to the queue, so that later it can be checked up on.
78+
Add the Process to the queue, so that later it can be checked up on.
7779
That is if the JobQueue is still open.
7880
7981
If the queue is closed, this will just silently do nothing.
@@ -145,12 +147,15 @@ def _advance_the_queue():
145147
if self._debug:
146148
print("Job queue found finished proc: %s." % job.name)
147149
done = self._running.pop(id)
148-
self._completed.append((done.name, done.exitcode))
149-
if hasattr(done, 'close') and callable(done.close):
150-
done.close()
151-
# multiprocessing.Process.close() added in Python-3.7
152-
# for older versions of python, GC will have to do
153-
del done
150+
# might be a Process or a Thread
151+
if hasattr(done, 'exitcode'):
152+
proc = done
153+
done = DoneProc(proc.name, proc.exitcode)
154+
# multiprocessing.Process.close() added in Python-3.7
155+
if hasattr(proc, 'close'):
156+
proc.close()
157+
del proc
158+
self._completed.append(done)
154159

155160
if self._debug:
156161
print("Job queue has %d running." % len(self._running))
@@ -174,9 +179,9 @@ def _advance_the_queue():
174179
self._fill_results(results)
175180

176181
# Attach exit codes now that we're all done & have joined all jobs
177-
for job_name, exit_code in self._completed:
178-
if isinstance(job, Process):
179-
results[job_name]['exit_code'] = exit_code
182+
for job in self._completed:
183+
if hasattr(job, 'exitcode'):
184+
results[job.name]['exit_code'] = job.exitcode
180185

181186
return results
182187

fabric/tasks.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,13 +325,15 @@ def execute(task, *args, **kwargs):
325325
parallel = requires_parallel(task)
326326
if parallel:
327327
import multiprocessing
328+
ctx = multiprocessing.get_context('fork')
329+
# Set up job queue for parallel cases
330+
queue = ctx.Queue()
328331
else:
329-
multiprocessing = None
332+
ctx = None
333+
queue = None
330334

331335
# Get pool size for this task
332336
pool_size = task.get_pool_size(my_env['all_hosts'], state.env.pool_size)
333-
# Set up job queue in case parallel is needed
334-
queue = multiprocessing.Queue() if parallel else None
335337
jobs = JobQueue(pool_size, queue)
336338
if state.output.debug:
337339
jobs._debug = True
@@ -342,8 +344,7 @@ def execute(task, *args, **kwargs):
342344
for host in my_env['all_hosts']:
343345
try:
344346
results[host] = _execute(
345-
task, host, my_env, args, new_kwargs, jobs, queue,
346-
multiprocessing
347+
task, host, my_env, args, new_kwargs, jobs, queue, ctx,
347348
)
348349
except NetworkError as e:
349350
results[host] = e

0 commit comments

Comments
 (0)