Skip to content

Commit fd3aaf5

Browse files
avargitster
authored andcommitted
run-command: add an "ungroup" option to run_process_parallel()
Extend the parallel execution API added in c553c72 (run-command: add an asynchronous parallel child processor, 2015-12-15) to support a mode where the stdout and stderr of the processes isn't captured and output in a deterministic order, instead we'll leave it to the kernel and stdio to sort it out. This gives the API same functionality as GNU parallel's --ungroup option. As we'll see in a subsequent commit the main reason to want this is to support stdout and stderr being connected to the TTY in the case of jobs=1, demonstrated here with GNU parallel: $ parallel --ungroup 'test -t {} && echo TTY || echo NTTY' ::: 1 2 TTY TTY $ parallel 'test -t {} && echo TTY || echo NTTY' ::: 1 2 NTTY NTTY Another is as GNU parallel's documentation notes a potential for optimization. As demonstrated in next commit our results with "git hook run" will be similar, but generally speaking this shows that if you want to run processes in parallel where the exact order isn't important this can be a lot faster: $ hyperfine -r 3 -L o ,--ungroup 'parallel {o} seq ::: 10000000 >/dev/null ' Benchmark 1: parallel seq ::: 10000000 >/dev/null Time (mean ± σ): 220.2 ms ± 9.3 ms [User: 124.9 ms, System: 96.1 ms] Range (min … max): 212.3 ms … 230.5 ms 3 runs Benchmark 2: parallel --ungroup seq ::: 10000000 >/dev/null Time (mean ± σ): 154.7 ms ± 0.9 ms [User: 136.2 ms, System: 25.1 ms] Range (min … max): 153.9 ms … 155.7 ms 3 runs Summary 'parallel --ungroup seq ::: 10000000 >/dev/null ' ran 1.42 ± 0.06 times faster than 'parallel seq ::: 10000000 >/dev/null ' A large part of the juggling in the API is to make the API safer for its maintenance and consumers alike. For the maintenance of the API we e.g. avoid malloc()-ing the "pp->pfd", ensuring that SANITIZE=address and other similar tools will catch any unexpected misuse. For API consumers we take pains to never pass the non-NULL "out" buffer to an API user that provided the "ungroup" option. The resulting code in t/helper/test-run-command.c isn't typical of such a user, i.e. they'd typically use one mode or the other, and would know whether they'd provided "ungroup" or not. We could also avoid the strbuf_init() for "buffered_output" by having "struct parallel_processes" use a static PARALLEL_PROCESSES_INIT initializer, but let's leave that cleanup for later. Using a global "run_processes_parallel_ungroup" variable to enable this option is rather nasty, but is being done here to produce as minimal of a change as possible for a subsequent regression fix. This change is extracted from a larger initial version[1] which ends up with a better end-state for the API, but in doing so needed to modify all existing callers of the API. Let's defer that for now, and narrowly focus on what we need for fixing the regression in the subsequent commit. It's safe to do this with a global variable because: A) hook.c is the only user of it that sets it to non-zero, and before we'll get any other API users we'll refactor away this method of passing in the option, i.e. re-roll [1]. B) Even if hook.c wasn't the only user we don't have callers of this API that concurrently invoke this parallel process starting API itself in parallel. As noted above "A" && "B" are rather nasty, and we don't want to live with those caveats long-term, but for now they should be an acceptable compromise. 1. https://lore.kernel.org/git/[email protected]/ Signed-off-by: Ævar Arnfjörð Bjarmason <[email protected]> Signed-off-by: Junio C Hamano <[email protected]>
1 parent 6cd33dc commit fd3aaf5

File tree

4 files changed

+123
-29
lines changed

4 files changed

+123
-29
lines changed

run-command.c

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,7 @@ enum child_state {
14711471
GIT_CP_WAIT_CLEANUP,
14721472
};
14731473

1474+
int run_processes_parallel_ungroup;
14741475
struct parallel_processes {
14751476
void *data;
14761477

@@ -1494,6 +1495,7 @@ struct parallel_processes {
14941495
struct pollfd *pfd;
14951496

14961497
unsigned shutdown : 1;
1498+
unsigned ungroup : 1;
14971499

14981500
int output_owner;
14991501
struct strbuf buffered_output; /* of finished children */
@@ -1537,7 +1539,7 @@ static void pp_init(struct parallel_processes *pp,
15371539
get_next_task_fn get_next_task,
15381540
start_failure_fn start_failure,
15391541
task_finished_fn task_finished,
1540-
void *data)
1542+
void *data, int ungroup)
15411543
{
15421544
int i;
15431545

@@ -1559,15 +1561,21 @@ static void pp_init(struct parallel_processes *pp,
15591561
pp->nr_processes = 0;
15601562
pp->output_owner = 0;
15611563
pp->shutdown = 0;
1564+
pp->ungroup = ungroup;
15621565
CALLOC_ARRAY(pp->children, n);
1563-
CALLOC_ARRAY(pp->pfd, n);
1566+
if (pp->ungroup)
1567+
pp->pfd = NULL;
1568+
else
1569+
CALLOC_ARRAY(pp->pfd, n);
15641570
strbuf_init(&pp->buffered_output, 0);
15651571

15661572
for (i = 0; i < n; i++) {
15671573
strbuf_init(&pp->children[i].err, 0);
15681574
child_process_init(&pp->children[i].process);
1569-
pp->pfd[i].events = POLLIN | POLLHUP;
1570-
pp->pfd[i].fd = -1;
1575+
if (pp->pfd) {
1576+
pp->pfd[i].events = POLLIN | POLLHUP;
1577+
pp->pfd[i].fd = -1;
1578+
}
15711579
}
15721580

15731581
pp_for_signal = pp;
@@ -1615,32 +1623,40 @@ static int pp_start_one(struct parallel_processes *pp)
16151623
BUG("bookkeeping is hard");
16161624

16171625
code = pp->get_next_task(&pp->children[i].process,
1618-
&pp->children[i].err,
1626+
pp->ungroup ? NULL : &pp->children[i].err,
16191627
pp->data,
16201628
&pp->children[i].data);
16211629
if (!code) {
1622-
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
1623-
strbuf_reset(&pp->children[i].err);
1630+
if (!pp->ungroup) {
1631+
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
1632+
strbuf_reset(&pp->children[i].err);
1633+
}
16241634
return 1;
16251635
}
1626-
pp->children[i].process.err = -1;
1627-
pp->children[i].process.stdout_to_stderr = 1;
1636+
if (!pp->ungroup) {
1637+
pp->children[i].process.err = -1;
1638+
pp->children[i].process.stdout_to_stderr = 1;
1639+
}
16281640
pp->children[i].process.no_stdin = 1;
16291641

16301642
if (start_command(&pp->children[i].process)) {
1631-
code = pp->start_failure(&pp->children[i].err,
1643+
code = pp->start_failure(pp->ungroup ? NULL :
1644+
&pp->children[i].err,
16321645
pp->data,
16331646
pp->children[i].data);
1634-
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
1635-
strbuf_reset(&pp->children[i].err);
1647+
if (!pp->ungroup) {
1648+
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
1649+
strbuf_reset(&pp->children[i].err);
1650+
}
16361651
if (code)
16371652
pp->shutdown = 1;
16381653
return code;
16391654
}
16401655

16411656
pp->nr_processes++;
16421657
pp->children[i].state = GIT_CP_WORKING;
1643-
pp->pfd[i].fd = pp->children[i].process.err;
1658+
if (pp->pfd)
1659+
pp->pfd[i].fd = pp->children[i].process.err;
16441660
return 0;
16451661
}
16461662

@@ -1674,6 +1690,7 @@ static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
16741690
static void pp_output(struct parallel_processes *pp)
16751691
{
16761692
int i = pp->output_owner;
1693+
16771694
if (pp->children[i].state == GIT_CP_WORKING &&
16781695
pp->children[i].err.len) {
16791696
strbuf_write(&pp->children[i].err, stderr);
@@ -1696,7 +1713,7 @@ static int pp_collect_finished(struct parallel_processes *pp)
16961713

16971714
code = finish_command(&pp->children[i].process);
16981715

1699-
code = pp->task_finished(code,
1716+
code = pp->task_finished(code, pp->ungroup ? NULL :
17001717
&pp->children[i].err, pp->data,
17011718
pp->children[i].data);
17021719

@@ -1707,10 +1724,13 @@ static int pp_collect_finished(struct parallel_processes *pp)
17071724

17081725
pp->nr_processes--;
17091726
pp->children[i].state = GIT_CP_FREE;
1710-
pp->pfd[i].fd = -1;
1727+
if (pp->pfd)
1728+
pp->pfd[i].fd = -1;
17111729
child_process_init(&pp->children[i].process);
17121730

1713-
if (i != pp->output_owner) {
1731+
if (pp->ungroup) {
1732+
; /* no strbuf_*() work to do here */
1733+
} else if (i != pp->output_owner) {
17141734
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
17151735
strbuf_reset(&pp->children[i].err);
17161736
} else {
@@ -1747,9 +1767,14 @@ int run_processes_parallel(int n,
17471767
int i, code;
17481768
int output_timeout = 100;
17491769
int spawn_cap = 4;
1770+
int ungroup = run_processes_parallel_ungroup;
17501771
struct parallel_processes pp;
17511772

1752-
pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
1773+
/* unset for the next API user */
1774+
run_processes_parallel_ungroup = 0;
1775+
1776+
pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb,
1777+
ungroup);
17531778
while (1) {
17541779
for (i = 0;
17551780
i < spawn_cap && !pp.shutdown &&
@@ -1766,8 +1791,15 @@ int run_processes_parallel(int n,
17661791
}
17671792
if (!pp.nr_processes)
17681793
break;
1769-
pp_buffer_stderr(&pp, output_timeout);
1770-
pp_output(&pp);
1794+
if (ungroup) {
1795+
int i;
1796+
1797+
for (i = 0; i < pp.max_processes; i++)
1798+
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
1799+
} else {
1800+
pp_buffer_stderr(&pp, output_timeout);
1801+
pp_output(&pp);
1802+
}
17711803
code = pp_collect_finished(&pp);
17721804
if (code) {
17731805
pp.shutdown = 1;

run-command.h

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,9 @@ void check_pipe(int err);
406406
* pp_cb is the callback cookie as passed to run_processes_parallel.
407407
* You can store a child process specific callback cookie in pp_task_cb.
408408
*
409+
* See run_processes_parallel() below for a discussion of the "struct
410+
* strbuf *out" parameter.
411+
*
409412
* Even after returning 0 to indicate that there are no more processes,
410413
* this function will be called again until there are no more running
411414
* child processes.
@@ -424,9 +427,8 @@ typedef int (*get_next_task_fn)(struct child_process *cp,
424427
* This callback is called whenever there are problems starting
425428
* a new process.
426429
*
427-
* You must not write to stdout or stderr in this function. Add your
428-
* message to the strbuf out instead, which will be printed without
429-
* messing up the output of the other parallel processes.
430+
* See run_processes_parallel() below for a discussion of the "struct
431+
* strbuf *out" parameter.
430432
*
431433
* pp_cb is the callback cookie as passed into run_processes_parallel,
432434
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
@@ -442,9 +444,8 @@ typedef int (*start_failure_fn)(struct strbuf *out,
442444
/**
443445
* This callback is called on every child process that finished processing.
444446
*
445-
* You must not write to stdout or stderr in this function. Add your
446-
* message to the strbuf out instead, which will be printed without
447-
* messing up the output of the other parallel processes.
447+
* See run_processes_parallel() below for a discussion of the "struct
448+
* strbuf *out" parameter.
448449
*
449450
* pp_cb is the callback cookie as passed into run_processes_parallel,
450451
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
@@ -465,11 +466,26 @@ typedef int (*task_finished_fn)(int result,
465466
*
466467
* The children started via this function run in parallel. Their output
467468
* (both stdout and stderr) is routed to stderr in a manner that output
468-
* from different tasks does not interleave.
469+
* from different tasks does not interleave (but see "ungroup" below).
469470
*
470471
* start_failure_fn and task_finished_fn can be NULL to omit any
471472
* special handling.
473+
*
474+
* If the "ungroup" option isn't specified, the API will set the
475+
* "stdout_to_stderr" parameter in "struct child_process" and provide
476+
* the callbacks with a "struct strbuf *out" parameter to write output
477+
* to. In this case the callbacks must not write to stdout or
478+
* stderr as such output will mess up the output of the other parallel
479+
* processes. If "ungroup" option is specified callbacks will get a
480+
* NULL "struct strbuf *out" parameter, and are responsible for
481+
* emitting their own output, including dealing with any race
482+
* conditions due to writing in parallel to stdout and stderr.
483+
* The "ungroup" option can be enabled by setting the global
484+
* "run_processes_parallel_ungroup" to "1" before invoking
485+
* run_processes_parallel(), it will be set back to "0" as soon as the
486+
* API reads that setting.
472487
*/
488+
extern int run_processes_parallel_ungroup;
473489
int run_processes_parallel(int n,
474490
get_next_task_fn,
475491
start_failure_fn,

t/helper/test-run-command.c

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ static int parallel_next(struct child_process *cp,
3131
return 0;
3232

3333
strvec_pushv(&cp->args, d->args.v);
34-
strbuf_addstr(err, "preloaded output of a child\n");
34+
if (err)
35+
strbuf_addstr(err, "preloaded output of a child\n");
36+
else
37+
fprintf(stderr, "preloaded output of a child\n");
38+
3539
number_callbacks++;
3640
return 1;
3741
}
@@ -41,7 +45,10 @@ static int no_job(struct child_process *cp,
4145
void *cb,
4246
void **task_cb)
4347
{
44-
strbuf_addstr(err, "no further jobs available\n");
48+
if (err)
49+
strbuf_addstr(err, "no further jobs available\n");
50+
else
51+
fprintf(stderr, "no further jobs available\n");
4552
return 0;
4653
}
4754

@@ -50,7 +57,10 @@ static int task_finished(int result,
5057
void *pp_cb,
5158
void *pp_task_cb)
5259
{
53-
strbuf_addstr(err, "asking for a quick stop\n");
60+
if (err)
61+
strbuf_addstr(err, "asking for a quick stop\n");
62+
else
63+
fprintf(stderr, "asking for a quick stop\n");
5464
return 1;
5565
}
5666

@@ -407,6 +417,12 @@ int cmd__run_command(int argc, const char **argv)
407417
if (!strcmp(argv[1], "run-command"))
408418
exit(run_command(&proc));
409419

420+
if (!strcmp(argv[1], "--ungroup")) {
421+
argv += 1;
422+
argc -= 1;
423+
run_processes_parallel_ungroup = 1;
424+
}
425+
410426
jobs = atoi(argv[2]);
411427
strvec_clear(&proc.args);
412428
strvec_pushv(&proc.args, (const char **)argv + 3);

t/t0061-run-command.sh

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,34 @@ test_expect_success 'run_command runs in parallel with more jobs available than
134134
test_cmp expect actual
135135
'
136136

137+
test_expect_success 'run_command runs ungrouped in parallel with more jobs available than tasks' '
138+
test-tool run-command --ungroup run-command-parallel 5 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
139+
test_line_count = 8 out &&
140+
test_line_count = 4 err
141+
'
142+
137143
test_expect_success 'run_command runs in parallel with as many jobs as tasks' '
138144
test-tool run-command run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
139145
test_cmp expect actual
140146
'
141147

148+
test_expect_success 'run_command runs ungrouped in parallel with as many jobs as tasks' '
149+
test-tool run-command --ungroup run-command-parallel 4 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
150+
test_line_count = 8 out &&
151+
test_line_count = 4 err
152+
'
153+
142154
test_expect_success 'run_command runs in parallel with more tasks than jobs available' '
143155
test-tool run-command run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
144156
test_cmp expect actual
145157
'
146158

159+
test_expect_success 'run_command runs ungrouped in parallel with more tasks than jobs available' '
160+
test-tool run-command --ungroup run-command-parallel 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
161+
test_line_count = 8 out &&
162+
test_line_count = 4 err
163+
'
164+
147165
cat >expect <<-EOF
148166
preloaded output of a child
149167
asking for a quick stop
@@ -158,6 +176,12 @@ test_expect_success 'run_command is asked to abort gracefully' '
158176
test_cmp expect actual
159177
'
160178

179+
test_expect_success 'run_command is asked to abort gracefully (ungroup)' '
180+
test-tool run-command --ungroup run-command-abort 3 false >out 2>err &&
181+
test_must_be_empty out &&
182+
test_line_count = 6 err
183+
'
184+
161185
cat >expect <<-EOF
162186
no further jobs available
163187
EOF
@@ -167,6 +191,12 @@ test_expect_success 'run_command outputs ' '
167191
test_cmp expect actual
168192
'
169193

194+
test_expect_success 'run_command outputs (ungroup) ' '
195+
test-tool run-command --ungroup run-command-no-jobs 3 sh -c "printf \"%s\n%s\n\" Hello World" >out 2>err &&
196+
test_must_be_empty out &&
197+
test_cmp expect err
198+
'
199+
170200
test_trace () {
171201
expect="$1"
172202
shift

0 commit comments

Comments
 (0)