forked from luno/reflex
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmocks_test.go
More file actions
63 lines (54 loc) · 1.06 KB
/
mocks_test.go
File metadata and controls
63 lines (54 loc) · 1.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package reflex_test
import (
"context"
"time"
"github.com/luno/reflex"
)
func newMockStreamer(events []*reflex.Event, endError error) *mockStreamer {
return &mockStreamer{
events: events,
endError: endError,
}
}
type mockStreamer struct {
events []*reflex.Event
endError error
}
func (m *mockStreamer) AddEvents(events ...*reflex.Event) {
m.events = append(m.events, events...)
}
func (m *mockStreamer) Stream(ctx context.Context, after string, opts ...reflex.StreamOption) (reflex.StreamClient, error) {
index := -1
for i, e := range m.events {
if e.ID > after {
break
}
index = i
}
index++
return &sc{
mockStreamer: m,
ctx: ctx,
index: index,
}, nil
}
type sc struct {
*mockStreamer
ctx context.Context
index int
}
func (c *sc) Recv() (*reflex.Event, error) {
for len(c.events) <= c.index {
if c.endError != nil {
return nil, c.endError
}
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
case <-time.NewTimer(time.Millisecond * 10).C:
}
}
e := c.events[c.index]
c.index++
return e, nil
}