Skip to content

Commit 860b580

Browse files
committed
Enforce a read deadline on execread
1 parent cca9a81 commit 860b580

File tree

1 file changed

+76
-39
lines changed

1 file changed

+76
-39
lines changed

input/common/execread/execread.go

Lines changed: 76 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/exec"
1111
"sync"
12+
"time"
1213

1314
"github.com/noriah/catnip/input"
1415
"github.com/pkg/errors"
@@ -47,89 +48,125 @@ func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan
4748
return errors.New("invalid dst length given")
4849
}
4950

50-
// Take argv and free it soon after, since we won't be needing it again.
5151
cmd := exec.CommandContext(ctx, s.argv[0], s.argv[1:]...)
5252
cmd.Stderr = os.Stderr
53-
s.argv = nil
5453

5554
o, err := cmd.StdoutPipe()
5655
if err != nil {
5756
return errors.Wrap(err, "failed to get stdout pipe")
5857
}
5958
defer o.Close()
6059

61-
bufsz := s.samples * 4
62-
if !s.f32mode {
63-
bufsz *= 2
64-
}
65-
66-
framesz := s.cfg.FrameSize
67-
reader := floatReader{
68-
order: binary.LittleEndian,
69-
f64: !s.f32mode,
60+
// We need o as an *os.File for SetWriteDeadline.
61+
of, ok := o.(*os.File)
62+
if !ok {
63+
return errors.New("stdout pipe is not an *os.File (bug)")
7064
}
7165

72-
// Allocate 4 times the buffer. We should ensure that we can read some of
73-
// the overflow.
74-
raw := make([]byte, bufsz)
75-
7666
if err := cmd.Start(); err != nil {
77-
return errors.Wrap(err, "failed to start ffmpeg")
67+
return errors.Wrap(err, "failed to start "+s.argv[0])
7868
}
79-
defer cmd.Process.Signal(os.Interrupt)
8069

8170
if s.OnStart != nil {
8271
if err := s.OnStart(ctx, cmd); err != nil {
8372
return err
8473
}
8574
}
8675

87-
for {
88-
reader.reset(raw)
76+
framesz := s.cfg.FrameSize
77+
reader := floatReader{
78+
order: binary.LittleEndian,
79+
f64: !s.f32mode,
80+
}
8981

90-
mu.Lock()
91-
for n := 0; n < s.samples; n++ {
92-
dst[n%framesz][n/framesz] = reader.next()
93-
}
94-
mu.Unlock()
82+
bufsz := s.samples
83+
if !s.f32mode {
84+
bufsz *= 2
85+
}
9586

96-
select {
97-
case <-ctx.Done():
98-
return ctx.Err()
99-
// default:
100-
case kickChan <- true:
87+
raw := make([]byte, bufsz*4)
88+
89+
// We double this as a workaround because sampleDuration is less than the
90+
// actual time that ReadFull blocks for some reason, probably because the
91+
// process decides to discard audio when it overflows.
92+
sampleDuration := time.Duration(
93+
float64(s.cfg.SampleSize) / s.cfg.SampleRate * float64(time.Second))
94+
// We also keep track of whether the deadline was hit once so we can half
95+
// the sample duration. This smooths out the jitter.
96+
var readExpired bool
97+
98+
for {
99+
// Set us a read deadline. If the deadline is reached, we'll write zeros
100+
// to the buffer.
101+
timeout := sampleDuration
102+
if !readExpired {
103+
timeout *= 2
104+
}
105+
if err := of.SetReadDeadline(time.Now().Add(timeout)); err != nil {
106+
return errors.Wrap(err, "failed to set read deadline")
101107
}
102108

103109
_, err := io.ReadFull(o, raw)
104110
if err != nil {
105-
if errors.Is(err, io.EOF) {
111+
switch {
112+
case errors.Is(err, io.EOF):
106113
return nil
114+
case errors.Is(err, os.ErrDeadlineExceeded):
115+
readExpired = true
116+
default:
117+
return err
107118
}
108-
return err
119+
} else {
120+
readExpired = false
121+
}
122+
123+
if readExpired {
124+
mu.Lock()
125+
// We can write directly to dst just so we can avoid parsing zero
126+
// bytes to floats.
127+
for _, buf := range dst {
128+
// Go should optimize this to a memclr.
129+
for i := range buf {
130+
buf[i] = 0
131+
}
132+
}
133+
mu.Unlock()
134+
} else {
135+
reader.reset(raw)
136+
mu.Lock()
137+
for n := 0; n < s.samples; n++ {
138+
dst[n%framesz][n/framesz] = reader.next()
139+
}
140+
mu.Unlock()
141+
}
142+
143+
// Signal that we've written to dst.
144+
select {
145+
case <-ctx.Done():
146+
return ctx.Err()
147+
case kickChan <- true:
109148
}
110149
}
111150
}
112151

113152
type floatReader struct {
114153
order binary.ByteOrder
115154
buf []byte
116-
n int64
117155
f64 bool
118156
}
119157

120158
func (f *floatReader) reset(b []byte) {
121-
f.n = 0
122159
f.buf = b
123160
}
124161

125162
func (f *floatReader) next() float64 {
126-
n := f.n
127-
128163
if f.f64 {
129-
f.n += 8
130-
return math.Float64frombits(f.order.Uint64(f.buf[n:]))
164+
b := f.buf[:8]
165+
f.buf = f.buf[8:]
166+
return math.Float64frombits(f.order.Uint64(b))
131167
}
132168

133-
f.n += 4
134-
return float64(math.Float32frombits(f.order.Uint32(f.buf[n:])))
169+
b := f.buf[:4]
170+
f.buf = f.buf[4:]
171+
return float64(math.Float32frombits(f.order.Uint32(b)))
135172
}

0 commit comments

Comments
 (0)