Skip to content

Commit c1682fa

Browse files
committed
[Driver] <rdar://39504759> Continue demultiplexing subprocess output after initial read.
1 parent 5bfd8e1 commit c1682fa

File tree

6 files changed

+86
-15
lines changed

6 files changed

+86
-15
lines changed

include/swift/Basic/Statistics.def

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ DRIVER_STATISTIC(NumDriverJobsSkipped)
4141
/// EXIT_SUCCESS.
4242
DRIVER_STATISTIC(NumProcessFailures)
4343

44+
/// Total number of driver poll() calls on subprocess pipes.
45+
DRIVER_STATISTIC(NumDriverPipePolls)
46+
47+
/// Total number of driver read() calls on subprocess pipes.
48+
DRIVER_STATISTIC(NumDriverPipeReads)
49+
4450
/// Next 10 statistics count dirtying-events in the driver's dependency graph,
4551
/// which it uses to decide which files are invalid (and thus which files to
4652
/// build). There are two dimensions to each dirtying event:

include/swift/Basic/TaskQueue.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <queue>
2424

2525
namespace swift {
26+
class UnifiedStatsReporter;
2627
namespace sys {
2728

2829
class Task; // forward declared to allow for platform-specific implementations
@@ -46,13 +47,18 @@ class TaskQueue {
4647
/// The number of tasks to execute in parallel.
4748
unsigned NumberOfParallelTasks;
4849

50+
/// Optional place to count I/O and subprocess events.
51+
UnifiedStatsReporter *Stats;
52+
4953
public:
5054
/// \brief Create a new TaskQueue instance.
5155
///
5256
/// \param NumberOfParallelTasks indicates the number of tasks which should
5357
/// be run in parallel. If 0, the TaskQueue will choose the most appropriate
5458
/// number of parallel tasks for the current system.
55-
TaskQueue(unsigned NumberOfParallelTasks = 0);
59+
/// \param Optional stats reporter to count I/O and subprocess events.
60+
TaskQueue(unsigned NumberOfParallelTasks = 0,
61+
UnifiedStatsReporter *USR = nullptr);
5662
virtual ~TaskQueue();
5763

5864
// TODO: remove once -Wdocumentation stops warning for \param, \returns on

lib/Basic/TaskQueue.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ using namespace swift::sys;
2929
#include "Default/TaskQueue.inc"
3030
#endif
3131

32-
TaskQueue::TaskQueue(unsigned NumberOfParallelTasks)
33-
: NumberOfParallelTasks(NumberOfParallelTasks) {}
32+
TaskQueue::TaskQueue(unsigned NumberOfParallelTasks,
33+
UnifiedStatsReporter *USR)
34+
: NumberOfParallelTasks(NumberOfParallelTasks),
35+
Stats(USR){}
3436

3537
TaskQueue::~TaskQueue() = default;
3638

lib/Basic/Unix/TaskQueue.inc

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//===----------------------------------------------------------------------===//
1212

1313
#include "swift/Basic/TaskQueue.h"
14+
#include "swift/Basic/Statistic.h"
1415

1516
#include "llvm/ADT/StringRef.h"
1617
#include "llvm/ADT/DenseMap.h"
@@ -84,12 +85,16 @@ class Task {
8485
/// from the Task.
8586
std::string Errors;
8687

88+
/// Optional place to count I/O and subprocess events.
89+
UnifiedStatsReporter *Stats;
90+
8791
public:
8892
Task(const char *ExecPath, ArrayRef<const char *> Args,
89-
ArrayRef<const char *> Env, void *Context, bool SeparateErrors)
93+
ArrayRef<const char *> Env, void *Context, bool SeparateErrors,
94+
UnifiedStatsReporter *USR)
9095
: ExecPath(ExecPath), Args(Args), Env(Env), Context(Context),
9196
SeparateErrors(SeparateErrors), Pid(-1), Pipe(-1), ErrorPipe(-1),
92-
State(Preparing) {
97+
State(Preparing), Stats(USR) {
9398
assert((Env.empty() || Env.back() == nullptr) &&
9499
"Env must either be empty or null-terminated!");
95100
}
@@ -108,8 +113,11 @@ public:
108113
bool execute();
109114

110115
/// \brief Reads data from the pipes, if any is available.
111-
/// \returns true on error, false on success
112-
bool readFromPipes();
116+
///
117+
/// If \p UntilEnd is true, reads until the end of the stream; otherwise reads
118+
/// once (possibly with a retry on EINTR), and returns.
119+
/// \returns true on error, false on success.
120+
bool readFromPipes(bool UntilEnd = true);
113121

114122
/// \brief Performs any post-execution work for this Task, such as reading
115123
/// piped output and closing the pipe.
@@ -241,7 +249,9 @@ bool Task::execute() {
241249
return false;
242250
}
243251

244-
static bool readFromAPipe(int Pipe, std::string &Output) {
252+
static bool readFromAPipe(int Pipe, std::string &Output,
253+
UnifiedStatsReporter *Stats,
254+
bool UntilEnd) {
245255
char outputBuffer[1024];
246256
ssize_t readBytes = 0;
247257
while ((readBytes = read(Pipe, outputBuffer, sizeof(outputBuffer))) != 0) {
@@ -253,15 +263,19 @@ static bool readFromAPipe(int Pipe, std::string &Output) {
253263
}
254264

255265
Output.append(outputBuffer, readBytes);
266+
if (Stats)
267+
Stats->getDriverCounters().NumDriverPipeReads++;
268+
if (!UntilEnd)
269+
break;
256270
}
257271

258272
return false;
259273
}
260274

261-
bool Task::readFromPipes() {
262-
bool Ret = readFromAPipe(Pipe, Output);
275+
bool Task::readFromPipes(bool UntilEnd) {
276+
bool Ret = readFromAPipe(Pipe, Output, Stats, UntilEnd);
263277
if (SeparateErrors) {
264-
Ret |= readFromAPipe(ErrorPipe, Errors);
278+
Ret |= readFromAPipe(ErrorPipe, Errors, Stats, UntilEnd);
265279
}
266280
return Ret;
267281
}
@@ -302,7 +316,7 @@ void TaskQueue::addTask(const char *ExecPath, ArrayRef<const char *> Args,
302316
ArrayRef<const char *> Env, void *Context,
303317
bool SeparateErrors) {
304318
std::unique_ptr<Task> T(
305-
new Task(ExecPath, Args, Env, Context, SeparateErrors));
319+
new Task(ExecPath, Args, Env, Context, SeparateErrors, Stats));
306320
QueuedTasks.push(std::move(T));
307321
}
308322

@@ -355,6 +369,8 @@ bool TaskQueue::execute(TaskBeganCallback Began, TaskFinishedCallback Finished,
355369
continue;
356370
return true;
357371
}
372+
if (Stats)
373+
Stats->getDriverCounters().NumDriverPipePolls++;
358374

359375
// Holds all fds which have finished during this loop iteration.
360376
std::vector<int> FinishedFds;
@@ -373,8 +389,15 @@ bool TaskQueue::execute(TaskBeganCallback Began, TaskFinishedCallback Finished,
373389
"All outstanding fds must be associated with an executing Task");
374390
Task &T = *iter->second;
375391
if (fd.revents & POLLIN || fd.revents & POLLPRI) {
376-
// There's data available to read.
377-
T.readFromPipes();
392+
// There's data available to read. Read _some_ of it here, but not
393+
// necessarily _all_, since the pipe is in blocking mode and we might
394+
// have other input pending (or soon -- before this subprocess is done
395+
// writing) from other subprocesses.
396+
//
397+
// FIXME: longer term, this should probably either be restructured to
398+
// use O_NONBLOCK, or at very least poll the stderr file descriptor as
399+
// well; the whole loop here is a bit of a mess.
400+
T.readFromPipes(/*UntilEnd = */false);
378401
}
379402

380403
if (fd.revents & POLLHUP || fd.revents & POLLERR) {

lib/Driver/Compilation.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,8 @@ namespace driver {
593593
if (Comp.SkipTaskExecution)
594594
TQ.reset(new DummyTaskQueue(Comp.NumberOfParallelCommands));
595595
else
596-
TQ.reset(new TaskQueue(Comp.NumberOfParallelCommands));
596+
TQ.reset(new TaskQueue(Comp.NumberOfParallelCommands,
597+
Comp.Stats.get()));
597598
if (Comp.ShowIncrementalBuildDecisions || Comp.Stats)
598599
IncrementalTracer = &ActualIncrementalTracer;
599600
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// RUN: %empty-directory(%t/manyfuncs)
2+
// RUN: %empty-directory(%t/stats)
3+
//
4+
// This test is looking at behaviour of the driver's task queue, checking
5+
// to make sure that it drains the output streams of multiple subprocesses
6+
// evenly, rather than one subprocess all-at-once before the next
7+
// all-at-once. This isn't batch-mode specific but it emerges somewhat
8+
// vividly there when combined with lots of files that do
9+
// -debug-time-function-bodies.
10+
//
11+
// Here we do a non-batch-mode variant that has lots of functions in each
12+
// of 4 files.
13+
//
14+
// RUN: %gyb -D N=1 %s -o %t/manyfuncs/file1.swift
15+
// RUN: %gyb -D N=2 %s -o %t/manyfuncs/file2.swift
16+
// RUN: %gyb -D N=3 %s -o %t/manyfuncs/file3.swift
17+
// RUN: %gyb -D N=4 %s -o %t/manyfuncs/file4.swift
18+
//
19+
// We calculate the ratio of poll() calls to read() calls; these should be
20+
// nearly equal (we test abs(read/poll) < 2.0) if we're doing interleaved
21+
// reading. If we're doing non-interleaved reading, they become radically
22+
// different (eg. thousands of reads per poll).
23+
//
24+
// RUN: %target-build-swift -j 4 -module-name manyfuncs -typecheck -stats-output-dir %t/stats -Xfrontend -debug-time-function-bodies %t/manyfuncs/*.swift
25+
// RUN: %utils/process-stats-dir.py --evaluate 'abs(float(NumDriverPipeReads) / float(NumDriverPipePolls)) < 2.0' %t/stats
26+
27+
% for i in range(1,1000):
28+
func process_${N}_function_${i}(_ x: Int) -> Int {
29+
let v = (1 + 2 * 3 + x * 5 + x + 6)
30+
let a = [v, 1, ${i}, 3, ${N}, 4]
31+
return a.reduce(0, {$0 + $1})
32+
}
33+
% end

0 commit comments

Comments
 (0)