Skip to content

Commit 50db6fb

Browse files
authored
Merge pull request #21 from wsc1/master
cb cleanup/test
2 parents 028c870 + 99ddce2 commit 50db6fb

File tree

6 files changed

+274
-28
lines changed

6 files changed

+274
-28
lines changed

libsio/cb.c

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include <stdatomic.h>
55
#include <string.h>
66
#include <stdint.h>
7+
#include <time.h>
8+
#include <stdio.h>
79

810
#include "cb.h"
911

@@ -19,6 +21,10 @@ Cb * newCb(int bufSz) {
1921

2022
cb->out = 0;
2123
cb->outF = 0;
24+
cb->time.tv_sec = 0;
25+
cb->time.tv_nsec = 1000;
26+
cb->inGo = 0;
27+
fprintf(stderr, "cb c addr %p inGo c addr on new: %p\n", cb, &cb->inGo);
2228
return cb;
2329
}
2430

@@ -44,7 +50,6 @@ int getOutF(Cb *cb) {
4450

4551
// forward decl.
4652
static void toGoAndBack(Cb *cb);
47-
static int onF(Cb *cb);
4853

4954
/*
5055
From Ian Lance Taylor: in C11 stdatomic terms Go atomic.CompareAndSwap is like
@@ -86,7 +91,7 @@ void duplexCb(Cb *cb, void *out, int *onF, void *in, int inF) {
8691

8792

8893
static void toGoAndBack(Cb *cb) {
89-
_Atomic uint32_t * gp = &cb->inGo;
94+
_Atomic uint32_t * gp = &(cb->inGo);
9095
uint32_t b;
9196
for (;;) {
9297
b = atomic_load_explicit(gp, memory_order_acquire);
@@ -95,6 +100,7 @@ static void toGoAndBack(Cb *cb) {
95100
// can't happen unless the underlying API executes callbacks
96101
// on more than one thread, as anyhow the current thread
97102
// is in this function (no setjmp/longjmp).
103+
fprintf(stderr, "toGoAndBack: %d > 0, does the API guarantee one callback at a time?\n", b);
98104
continue;
99105
}
100106
if (atomic_compare_exchange_weak_explicit(gp, &b, b+1, memory_order_acq_rel, memory_order_relaxed)) {
@@ -103,9 +109,28 @@ static void toGoAndBack(Cb *cb) {
103109
}
104110
b++;
105111
uint32_t cmp;
106-
for (;;) {
112+
int i;
113+
for (i=1; i<=1000000;i++) {
107114
cmp = atomic_load_explicit(gp, memory_order_acquire);
108115
if (cmp != b) {
116+
return;
117+
}
118+
if (i >= 1000 && i%50 == 0) {
119+
// sleep is 1us, but involves syscall so system latency is involved.
120+
// avoid as much as possible without entirely eating the CPU.
121+
// TBD(wsc) make this buffer size real time dependent rather than by cycle
122+
// counts. If the buffer time is large, then we can sleep as in
123+
// cb.go, otherwise either we're in a slack time or contention is causing the
124+
// atomic to fail and it might help to back off.
125+
nanosleep(&cb->time, NULL);
126+
}
127+
}
128+
// paranoid code to reset state to as if Go was running properly
129+
// equivalent of libsio.ErrCApiLost
130+
fprintf(stderr, "atomic failed after 1000000 tries (resetting), did Go die?\n");
131+
while (b>0) {
132+
b = atomic_load_explicit(gp, memory_order_acquire);
133+
if (atomic_compare_exchange_weak_explicit(gp, &b, 0, memory_order_acq_rel, memory_order_relaxed)) {
109134
break;
110135
}
111136
}

libsio/cb.go

Lines changed: 102 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ package libsio
66
// #include "cb.h"
77
import "C"
88
import (
9+
"errors"
910
"io"
1011
"runtime"
1112
"sync/atomic"
13+
"time"
1214
"unsafe"
1315

1416
"zikichombo.org/sound"
@@ -24,7 +26,7 @@ import (
2426
// as well:
2527
//
2628
// 1. The synchronisation mechanism here assumes that at most one C callback
27-
// thread is executing a callback at a time, so as to avoid syscalls.
29+
// thread is executing a callback at a time.
2830
//
2931
// 2. The C API must accept configuration by buffer size and
3032
// never present the user with a buffer which exceeds this size.
@@ -35,7 +37,8 @@ import (
3537
// - for input, there will be latency and CPU overhead
3638
// - for output, the C API must allow the callback to inform the
3739
// underlying system of the actual number of frames provided, even
38-
// for non EOF conditions.
40+
// for non EOF conditions. Normally, this means the C API has
41+
// latency associated with alignment.
3942
//
4043
// 4. For best reliability, the Go code should be run on a thread with the same
4144
// priority as the C API.
@@ -70,18 +73,40 @@ type Cb struct {
7073
// latency and cpu overhead, there is nothing that can be done as any regular alignment
7174
// of bursts of irregular length data will have this effect.
7275
over []float64
76+
77+
// time tracking
78+
lastTime time.Time
79+
bufDur time.Duration
7380
}
7481

7582
func NewCb(v sound.Form, sco sample.Codec, b int) *Cb {
7683
return &Cb{
77-
Form: v,
78-
sco: sco,
79-
bsz: b,
80-
il: cil.New(v.Channels(), b),
81-
over: make([]float64, b),
82-
c: C.newCb(C.int(b))}
84+
Form: v,
85+
sco: sco,
86+
bsz: b,
87+
il: cil.New(v.Channels(), b),
88+
over: make([]float64, 0, b),
89+
c: C.newCb(C.int(b)),
90+
bufDur: time.Duration(b) * v.SampleRate().Period()}
8391
}
8492

93+
const (
94+
// amount of slack we give between ask for wake up and
95+
// pseudo-spin. guestimated for general OS scheduling
96+
// latency of worst case 1 preempting task + general Go GC
97+
// latency.
98+
sleepSlack = 5 * time.Millisecond
99+
100+
// nb of times to try an atomic before defaulting to
101+
// runtime.Gosched, as the later might on some systems
102+
// and some circumstances invoke a syscall.
103+
atomicTryLen = 10
104+
105+
// max number of tries before we assume something killed
106+
// the C thread
107+
atomicTryLim = 100000000
108+
)
109+
85110
func (r *Cb) Close() error {
86111
C.closeCb(r.c)
87112
C.freeCb(r.c)
@@ -106,19 +131,28 @@ func (r *Cb) Receive(d []float64) (int, error) {
106131
start = len(r.over) / nC
107132
r.over = r.over[:0]
108133
}
134+
r.maybeSleep()
109135

110136
var sl []float64 // per cb subslice of d
111137
addr := (*uint32)(unsafe.Pointer(&r.c.inGo))
112138
bps := r.sco.Bytes()
113139
var nf, onf int // frame counter and overlap frame count
114140
var cbBuf []byte // cast from C pointer callback data
141+
orgTime := r.lastTime
115142
for start < nF {
116-
r.fromC(addr)
143+
if err := r.fromC(addr); err != nil {
144+
return 0, ErrCApiLost
145+
}
146+
if orgTime == r.lastTime {
147+
r.lastTime = time.Now()
148+
}
117149

118150
nf = int(r.c.inF)
119151
if nf == 0 {
120152
r.toC(addr)
121-
break
153+
if err := r.toC(addr); err != nil {
154+
return 0, ErrCApiLost
155+
}
122156
}
123157

124158
// in case the C cb doesn't align to the buffer size
@@ -140,14 +174,17 @@ func (r *Cb) Receive(d []float64) (int, error) {
140174
r.sco.Decode(r.over, cbBuf[nf*bps*nC:])
141175
}
142176

143-
r.toC(addr)
177+
if err := r.toC(addr); err != nil {
178+
return 0, ErrCApiLost
179+
}
144180
start += nf
145181
}
146182

147183
r.il.Deinter(d[:start*nC])
148184
return start, nil
149185
}
150186

187+
// Send is as in sound.Sink.Send
151188
func (r *Cb) Send(d []float64) error {
152189
N := len(d)
153190
nC := r.Channels()
@@ -166,24 +203,34 @@ func (r *Cb) Send(d []float64) error {
166203
bps := r.sco.Bytes()
167204
var nf int
168205
var cbBuf []byte
206+
orgTime := r.lastTime
169207
for start < nF {
170-
r.fromC(addr)
208+
if err := r.fromC(addr); err != nil {
209+
return ErrCApiLost
210+
}
211+
if orgTime == r.lastTime {
212+
r.lastTime = time.Now()
213+
}
171214
// get the slice at buffer size
172215
nf = int(r.c.outF)
173216
if nf == 0 {
174-
r.toC(addr)
217+
if err := r.toC(addr); err != nil {
218+
return ErrCApiLost
219+
}
175220
return io.EOF
176221
}
177222
if start+nf > nF {
178223
nf = nF - start
179224
}
180225
sl = d[start*nC : (start+nf)*nC]
181226
// "render"
182-
cbBuf = (*[1 << 30]byte)(unsafe.Pointer(C.getIn(r.c)))[:nf*bps*nC]
227+
cbBuf = (*[1 << 30]byte)(unsafe.Pointer(r.c.out))[:nf*bps*nC]
183228
r.sco.Encode(cbBuf, sl)
184229
// tell the API about any truncation that happened.
185230
r.c.outF = C.int(nf)
186-
r.toC(addr)
231+
if err := r.toC(addr); err != nil {
232+
return ErrCApiLost
233+
}
187234
start += nf
188235
}
189236
return nil
@@ -193,29 +240,60 @@ func (r *Cb) SendReceive(out, in []float64) (int, error) {
193240
return 0, nil
194241
}
195242

196-
func (r *Cb) setOutF(nf int) {
197-
addr := (*uint32)(unsafe.Pointer(&r.c.outF))
198-
atomic.StoreUint32(addr, uint32(nf))
243+
func (r *Cb) maybeSleep() {
244+
var t time.Time
245+
if r.lastTime == t { // don't sleep on first call.
246+
return
247+
}
248+
// sleep a conservative amount of time if
249+
// latency is high enough (see sleepSlack)
250+
passed := time.Since(r.lastTime)
251+
if passed+sleepSlack < r.bufDur {
252+
time.Sleep(r.bufDur - (passed + sleepSlack))
253+
}
199254
}
200255

201-
func (r *Cb) fromC(addr *uint32) {
256+
// ErrCApiLost can be returned if the thread running the C API
257+
// is somehow killed or the hardware causes the callbacks to
258+
// block.
259+
var ErrCApiLost = errors.New("too many atomic tries, C callbacks aren't happening.")
260+
261+
func (r *Cb) fromC(addr *uint32) error {
202262
var sz uint32
263+
i := 0
203264
for {
204265
sz = atomic.LoadUint32(addr)
205266
if sz != 0 {
206-
return
267+
return nil
268+
}
269+
i++
270+
if i%atomicTryLen == 0 {
271+
if i >= atomicTryLim {
272+
return ErrCApiLost
273+
}
274+
// runtime.Gosched may invoke a syscall if many g's on m
275+
// use sparingly
276+
runtime.Gosched()
207277
}
208-
runtime.Gosched()
209278
}
210279
}
211280

212-
func (r *Cb) toC(addr *uint32) {
281+
func (r *Cb) toC(addr *uint32) error {
213282
var sz uint32
283+
i := 0
214284
for {
215285
sz = atomic.LoadUint32(addr)
216286
if atomic.CompareAndSwapUint32(addr, sz, sz-1) {
217-
return
287+
return nil
288+
}
289+
i++
290+
if i%atomicTryLen == 0 {
291+
if i >= atomicTryLim {
292+
return ErrCApiLost
293+
}
294+
// runtime.Gosched may invoke a syscall if many g's on m
295+
// use sparingly
296+
runtime.Gosched()
218297
}
219-
runtime.Gosched()
220298
}
221299
}

libsio/cb.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <stdatomic.h>
55
#include <stdint.h>
6+
#include <time.h>
67

78
typedef struct Cb {
89
int bufSz;
@@ -11,6 +12,7 @@ typedef struct Cb {
1112
int inF;
1213
void * out;
1314
int outF;
15+
struct timespec time;
1416
} Cb;
1517

1618

libsio/cb_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package libsio
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"zikichombo.org/sound"
8+
"zikichombo.org/sound/sample"
9+
)
10+
11+
func TestCbCapture(t *testing.T) {
12+
N := 1024
13+
v := sound.MonoCd()
14+
c := sample.SFloat32L
15+
b := 512
16+
cb := NewCb(v, c, b)
17+
go runcbsCapture(cb, N, b, c.Bytes())
18+
d := make([]float64, b)
19+
for i := 0; i < N; i++ {
20+
fmt.Printf("cb receive %d\n", i)
21+
n, err := cb.Receive(d)
22+
if err != nil {
23+
t.Error(err)
24+
} else if n != b {
25+
t.Errorf("expected %d got %d\n", b, n)
26+
}
27+
}
28+
}
29+
30+
func TestCbPlay(t *testing.T) {
31+
N := 1024
32+
v := sound.MonoCd()
33+
c := sample.SFloat32L
34+
b := 512
35+
cb := NewCb(v, c, b)
36+
go runcbsPlay(cb, N, b, c.Bytes())
37+
d := make([]float64, b)
38+
for i := 0; i < N; i++ {
39+
fmt.Printf("cb send %d\n", i)
40+
err := cb.Send(d)
41+
if err != nil {
42+
t.Error(err)
43+
}
44+
}
45+
}

libsio/doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2018 The ZikiChombo Authors. All rights reserved. Use of this source
22
// code is governed by a license that can be found in the License file.
33

4-
// Package libsio provides support for the different ports.
4+
// Package libsio provides some support for implementing the different ports.
55
//
66
// Package libsio is part of http://zikichombo.org
77
package libsio /* import "zikichombo.org/sio/libsio" */

0 commit comments

Comments
 (0)