Skip to content

Commit 4516211

Browse files
committed
Make -P work with --recursive-max-concurrent=false
1 parent c5fa108 commit 4516211

File tree

3 files changed

+31
-24
lines changed

3 files changed

+31
-24
lines changed

main.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -245,18 +245,18 @@ func executeAndFlushTty(command []string) (exitCode int) {
245245
log.Fatalf("Could not find executable %s: %v\n", command[0], err)
246246
}
247247

248-
// this process won't be used for anything much more, let's cap memory usage a bit
249-
// this reduces memory usage by a couple of megabytes when running a lot of executeAndFlushTtys
250-
debug.SetMemoryLimit(0)
251-
debug.FreeOSMemory()
252-
253248
process, err := os.StartProcess(path, command, &os.ProcAttr{
254249
Files: standardFdToFile,
255250
})
256251
if err != nil {
257252
log.Fatalf("Could not displaySequentially %s: %v\n", shellescape.QuoteCommand(command), err)
258253
}
259254

255+
// this process won't be used for anything much more, let's cap memory usage a bit
256+
// this reduces memory usage by a couple of megabytes when running a lot of executeAndFlushTtys
257+
debug.SetMemoryLimit(0)
258+
debug.FreeOSMemory()
259+
260260
processState, err := process.Wait()
261261
if err != nil {
262262
log.Fatalf("Could not wait for process %v, %v\n", shellescape.QuoteCommand(command), err)
@@ -291,10 +291,11 @@ func main() {
291291
os.Exit(0)
292292
}
293293

294-
if *flRecursiveProcessLimit {
295-
if _, hasMasterLimitServer := os.LookupEnv(EnvGparallelChildLimitSocket); !hasMasterLimitServer {
296-
createLimitServer()
297-
}
294+
if !*flRecursiveProcessLimit {
295+
_ = os.Unsetenv(EnvGparallelChildLimitSocket)
296+
}
297+
if _, hasMasterLimitServer := os.LookupEnv(EnvGparallelChildLimitSocket); !hasMasterLimitServer {
298+
createLimitServer()
298299
}
299300

300301
processes := chann.New[*ProcessResult]()

recursivetasklimit.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"errors"
6+
"io"
67
"io/fs"
78
"log"
89
"net"
@@ -15,6 +16,17 @@ import (
1516

1617
const EnvGparallelChildLimitSocket = "_GPARALLEL_CHILD_LIMIT_SOCKET"
1718

19+
func readOneByte(reader io.Reader) error {
20+
var b [1]byte
21+
_, err := reader.Read(b[:])
22+
return err
23+
}
24+
25+
func writeOneByte(writer io.Writer) error {
26+
_, err := writer.Write([]byte{1})
27+
return err
28+
}
29+
1830
func serveClients(listener net.Listener) {
1931
for {
2032
conn, err := listener.Accept()
@@ -25,10 +37,9 @@ func serveClients(listener net.Listener) {
2537
log.Fatalf("Error accepting connection on the %s unix socket: %v\n", os.Getenv(EnvGparallelChildLimitSocket), err)
2638
}
2739

28-
_, _ = conn.Write([]byte{1})
40+
_ = writeOneByte(conn)
2941

30-
var buf [1]byte
31-
_, err = conn.Read(buf[:])
42+
err = readOneByte(conn)
3243
if errors.Is(err, net.ErrClosed) {
3344
break
3445
}
@@ -56,6 +67,9 @@ func createLimitServer() {
5667
log.Fatalf("Couldn't listen on unix socket '%s': %v\n", listenPath, err)
5768
}
5869

70+
// Every process has the ability to spawn 1 child of its own, and as many other children
71+
// as there are active serveClients goroutines. That's why we spawn (*flMaxProcesses-1)
72+
// of them.
5973
for i := 0; i < *flMaxProcesses-1; i++ {
6074
go serveClients(listener)
6175
}
@@ -121,11 +135,7 @@ var recursiveTaskLimitClient = sync.OnceValue(func() (client struct {
121135

122136
mutex.Unlock()
123137
ch := make(chan error)
124-
go func() {
125-
var b [1]byte
126-
_, err = conn.Read(b[:])
127-
ch <- err
128-
}()
138+
go func() { ch <- readOneByte(conn) }()
129139
select {
130140
case <-ctx.Done():
131141
err = ctx.Err()
@@ -153,7 +163,7 @@ var recursiveTaskLimitClient = sync.OnceValue(func() (client struct {
153163
}
154164

155165
if toClose != nil && toClose.conn != nil {
156-
_, _ = toClose.conn.Write([]byte{2})
166+
_ = writeOneByte(toClose.conn)
157167
haveToClose("connection to master gparallel", toClose.conn)
158168
toClose.conn = nil
159169
} else if toClose != nil && toClose.cancel != nil {

runner.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ func (proc *ProcessResult) isAlive() bool {
5757
}
5858

5959
func (proc *ProcessResult) wait() error {
60-
if *flRecursiveProcessLimit {
61-
defer recursiveTaskLimitClient().del(proc)
62-
}
60+
defer recursiveTaskLimitClient().del(proc)
6361

6462
// wait for both stdout and stderr if we opened two readers
6563
<-proc.output.streamClosed
@@ -313,9 +311,7 @@ func runWithStdin(command []string, stdin io.Reader) (result *ProcessResult) {
313311
result.originalCommand = command
314312
result.exitCode = make(chan int)
315313

316-
if *flRecursiveProcessLimit {
317-
recursiveTaskLimitClient().addWait(result)
318-
}
314+
recursiveTaskLimitClient().addWait(result)
319315

320316
if stdoutIsTty {
321317
command = append([]string{executable(), "--_execute-and-flush-tty"}, command...)

0 commit comments

Comments
 (0)