@@ -9,13 +9,17 @@ import (
99 "os"
1010 "os/exec"
1111 "sync"
12+ "time"
1213
1314 "github.com/noriah/catnip/input"
1415 "github.com/pkg/errors"
1516)
1617
1718// Session is a session that reads floating-point audio values from a Cmd.
1819type Session struct {
20+ // OnStart is called when the session starts. Nil by default.
21+ OnStart func (ctx context.Context , cmd * exec.Cmd ) error
22+
1923 argv []string
2024 cfg input.SessionConfig
2125
@@ -26,38 +30,47 @@ type Session struct {
2630}
2731
2832// NewSession creates a new execread session. It never returns an error.
29- func NewSession (argv []string , f32mode bool , cfg input.SessionConfig ) ( * Session , error ) {
33+ func NewSession (argv []string , f32mode bool , cfg input.SessionConfig ) * Session {
3034 if len (argv ) < 1 {
31- return nil , errors . New ("argv has no arg0" )
35+ panic ("argv has no arg0" )
3236 }
3337
3438 return & Session {
3539 argv : argv ,
3640 cfg : cfg ,
3741 f32mode : f32mode ,
3842 samples : cfg .SampleSize * cfg .FrameSize ,
39- }, nil
43+ }
4044}
4145
4246func (s * Session ) Start (ctx context.Context , dst [][]input.Sample , kickChan chan bool , mu * sync.Mutex ) error {
4347 if ! input .EnsureBufferLen (s .cfg , dst ) {
4448 return errors .New ("invalid dst length given" )
4549 }
4650
47- // Take argv and free it soon after, since we won't be needing it again.
4851 cmd := exec .CommandContext (ctx , s .argv [0 ], s .argv [1 :]... )
4952 cmd .Stderr = os .Stderr
50- s .argv = nil
5153
5254 o , err := cmd .StdoutPipe ()
5355 if err != nil {
5456 return errors .Wrap (err , "failed to get stdout pipe" )
5557 }
5658 defer o .Close ()
5759
58- bufsz := s .samples * 4
59- if ! s .f32mode {
60- bufsz *= 2
60+ // We need o as an *os.File for SetWriteDeadline.
61+ of , ok := o .(* os.File )
62+ if ! ok {
63+ return errors .New ("stdout pipe is not an *os.File (bug)" )
64+ }
65+
66+ if err := cmd .Start (); err != nil {
67+ return errors .Wrap (err , "failed to start " + s .argv [0 ])
68+ }
69+
70+ if s .OnStart != nil {
71+ if err := s .OnStart (ctx , cmd ); err != nil {
72+ return err
73+ }
6174 }
6275
6376 framesz := s .cfg .FrameSize
@@ -66,60 +79,94 @@ func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan
6679 f64 : ! s .f32mode ,
6780 }
6881
69- // Allocate 4 times the buffer. We should ensure that we can read some of
70- // the overflow.
71- raw := make ([]byte , bufsz )
72-
73- if err := cmd .Start (); err != nil {
74- return errors .Wrap (err , "failed to start ffmpeg" )
82+ bufsz := s .samples
83+ if ! s .f32mode {
84+ bufsz *= 2
7585 }
7686
77- for {
78- reader .reset (raw )
87+ raw := make ([]byte , bufsz * 4 )
7988
80- mu .Lock ()
81- for n := 0 ; n < s .samples ; n ++ {
82- dst [n % framesz ][n / framesz ] = reader .next ()
83- }
84- mu .Unlock ()
89+ // We double this as a workaround because sampleDuration is less than the
90+ // actual time that ReadFull blocks for some reason, probably because the
91+ // process decides to discard audio when it overflows.
92+ sampleDuration := time .Duration (
93+ float64 (s .cfg .SampleSize ) / s .cfg .SampleRate * float64 (time .Second ))
94+ // We also keep track of whether the deadline was hit once so we can half
95+ // the sample duration. This smooths out the jitter.
96+ var readExpired bool
8597
86- select {
87- case <- ctx .Done ():
88- return ctx .Err ()
89- // default:
90- case kickChan <- true :
98+ for {
99+ // Set us a read deadline. If the deadline is reached, we'll write zeros
100+ // to the buffer.
101+ timeout := sampleDuration
102+ if ! readExpired {
103+ timeout *= 2
104+ }
105+ if err := of .SetReadDeadline (time .Now ().Add (timeout )); err != nil {
106+ return errors .Wrap (err , "failed to set read deadline" )
91107 }
92108
93109 _ , err := io .ReadFull (o , raw )
94110 if err != nil {
95- if errors .Is (err , io .EOF ) {
111+ switch {
112+ case errors .Is (err , io .EOF ):
96113 return nil
114+ case errors .Is (err , os .ErrDeadlineExceeded ):
115+ readExpired = true
116+ default :
117+ return err
97118 }
98- return err
119+ } else {
120+ readExpired = false
121+ }
122+
123+ if readExpired {
124+ mu .Lock ()
125+ // We can write directly to dst just so we can avoid parsing zero
126+ // bytes to floats.
127+ for _ , buf := range dst {
128+ // Go should optimize this to a memclr.
129+ for i := range buf {
130+ buf [i ] = 0
131+ }
132+ }
133+ mu .Unlock ()
134+ } else {
135+ reader .reset (raw )
136+ mu .Lock ()
137+ for n := 0 ; n < s .samples ; n ++ {
138+ dst [n % framesz ][n / framesz ] = reader .next ()
139+ }
140+ mu .Unlock ()
141+ }
142+
143+ // Signal that we've written to dst.
144+ select {
145+ case <- ctx .Done ():
146+ return ctx .Err ()
147+ case kickChan <- true :
99148 }
100149 }
101150}
102151
103152type floatReader struct {
104153 order binary.ByteOrder
105154 buf []byte
106- n int64
107155 f64 bool
108156}
109157
110158func (f * floatReader ) reset (b []byte ) {
111- f .n = 0
112159 f .buf = b
113160}
114161
115162func (f * floatReader ) next () float64 {
116- n := f .n
117-
118163 if f .f64 {
119- f .n += 8
120- return math .Float64frombits (f .order .Uint64 (f .buf [n :]))
164+ b := f .buf [:8 ]
165+ f .buf = f .buf [8 :]
166+ return math .Float64frombits (f .order .Uint64 (b ))
121167 }
122168
123- f .n += 4
124- return float64 (math .Float32frombits (f .order .Uint32 (f .buf [n :])))
169+ b := f .buf [:4 ]
170+ f .buf = f .buf [4 :]
171+ return float64 (math .Float32frombits (f .order .Uint32 (b )))
125172}
0 commit comments