Skip to content

Commit 80efbad

Browse files
committed
Fixed bug, SnapshotIterator.Stop() not working from reconnect code fix (#229)
added SnapshotIterator.Done() updated test apps fixed variable scope for resp *http.Response
1 parent cb252b2 commit 80efbad

File tree

5 files changed

+123
-69
lines changed

5 files changed

+123
-69
lines changed

cmd/listen/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func main() {
7373

7474
go func() {
7575
for {
76+
77+
if iter.Done() {
78+
break
79+
}
80+
7681
event, err := iter.Next()
7782

7883
if err != nil {

cmd/listen2/main.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func main() {
8686

8787
go func() {
8888
for {
89+
90+
if iter.Done() {
91+
break
92+
}
93+
8994
event, err := iter.Next()
9095

9196
if err != nil {
@@ -123,6 +128,11 @@ func main() {
123128

124129
go func() {
125130
for {
131+
132+
if iter2.Done() {
133+
break
134+
}
135+
126136
event, err := iter2.Next()
127137

128138
if err != nil {
@@ -136,12 +146,16 @@ func main() {
136146

137147
fmt.Printf("\n >>> open a new separate command line terminal, to trigger events, run: go run . anyvalue\n")
138148
fmt.Printf("\n >>> OR edit value of any key from %s in firebase console to trigger events\n\n", testpath)
139-
fmt.Printf("\n >>> press <enter> to close http connection\n\n")
149+
fmt.Printf("\n >>> press <enter> to stop 1st Listener and close http connection\n\n")
140150
fmt.Printf("Waiting for events...\n\n")
141151

142152
fmt.Scanln()
143153
iter.Stop()
144154

155+
fmt.Printf("\n >>> press <enter> to stop 2nd Listener and close http connection\n\n")
156+
fmt.Scanln()
157+
iter2.Stop()
158+
145159
fmt.Printf("\n >>> press <enter> to exit app\n\n\n")
146160
fmt.Scanln()
147161
}

db/event.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
// SSE = Sever-Sent Events = ssevent
2424

25-
// EventType ...
25+
// EventType specific event type changes
2626
type EventType uint
2727

2828
// EventType ...
@@ -41,16 +41,17 @@ type Event struct {
4141
Path string // snapshot path
4242
}
4343

44-
// SnapshotIterator iterator for continuous event
44+
// SnapshotIterator iterator for continuous events
4545
type SnapshotIterator struct {
4646
Snapshot string // initial snapshot, JSON-encoded, returned from http Respoonse, server sent event, data part
4747
SSEDataChan <-chan string // continuous event snapshot, channel receive only, directional channel
48+
done *bool // Done listening to events, also used to prevent channel block
4849
resp *http.Response // http connection keep alive
49-
active bool // false when resp is closed, used to prevent channel block, defaults to false
5050
}
5151

5252
// Snapshot ssevent data, data part
5353
func (e *Event) Snapshot() string {
54+
5455
return e.Data // ssevent data (snapshot), snapshot only, data part of ssevent data
5556
}
5657

@@ -60,32 +61,38 @@ func (e *Event) Unmarshal(v interface{}) error {
6061
return json.Unmarshal([]byte(e.Data), v)
6162
}
6263

63-
// Next ...
64-
func (i *SnapshotIterator) Next() (*Event, error) {
64+
// Next realtime event
65+
func (it *SnapshotIterator) Next() (*Event, error) {
6566

6667
// prevent channel block
67-
if i.active == false {
68-
return nil, errors.New("SnapshotIterator is not active")
68+
if *it.done == true {
69+
return nil, errors.New("SnapshotIterator is done or no longer active")
6970
}
7071

71-
sseDataString := <-i.SSEDataChan
72+
sseDataString := <-it.SSEDataChan
7273

7374
// todo: determine EventType
7475

75-
path, ss, err := splitSSEData(sseDataString)
76+
path, snapshot, err := splitSSEData(sseDataString)
7677

7778
return &Event{
7879
EventType: ChildChanged,
79-
Data: ss, // snapshot
80+
Data: snapshot,
8081
Path: path,
8182
}, err
8283

8384
} // Next()
8485

85-
// Stop ...
86-
func (i *SnapshotIterator) Stop() {
87-
i.active = false
88-
if i.resp != nil {
89-
i.resp.Body.Close()
86+
// Stop listening for realtime events
87+
// close http connection
88+
func (it *SnapshotIterator) Stop() {
89+
*it.done = true
90+
if it.resp != nil {
91+
it.resp.Body.Close()
9092
}
9193
}
94+
95+
// Done can be used to check if Stop() have been called
96+
func (it *SnapshotIterator) Done() bool {
97+
return *it.done
98+
}

db/event_ref.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"firebase.google.com/go/internal"
2727
)
2828

29-
// Listen ...
29+
// Listen returns an iterator that listens to realtime events
3030
func (r *Ref) Listen(ctx context.Context) (*SnapshotIterator, error) {
3131

3232
sseDataChan := make(chan string) // server-sent event data channel
@@ -38,23 +38,26 @@ func (r *Ref) Listen(ctx context.Context) (*SnapshotIterator, error) {
3838

3939
resp, err := r.sendListen(ctx, "GET", opts...)
4040

41+
done := true
42+
4143
if err != nil {
42-
return &SnapshotIterator{active: false}, err
44+
return &SnapshotIterator{done: &done}, err
4345
}
4446

4547
snapshot, err := getInitialNodeSnapshot(resp)
4648

4749
if err != nil {
48-
return &SnapshotIterator{active: false}, err
50+
return &SnapshotIterator{done: &done}, err
4951
}
5052

51-
go r.startListeningWithReconnect(ctx, opts, resp, sseDataChan)
53+
done = false
54+
go r.startListeningWithReconnect(ctx, opts, resp, &done, sseDataChan)
5255

5356
return &SnapshotIterator{
5457
Snapshot: snapshot,
5558
SSEDataChan: sseDataChan,
56-
resp: resp, //*http.Response
57-
active: true,
59+
done: &done,
60+
resp: resp, // *http.Response
5861
}, err
5962

6063
} // Listen()
@@ -76,19 +79,24 @@ func getInitialNodeSnapshot(resp *http.Response) (string, error) {
7679
b = scanner.Bytes()
7780
s := string(b)
7881

82+
// sample sse data
83+
// s = 'data: {"path":"/","data":{"test3":{"test4":4}}}'
84+
7985
var snapshotMap map[string]interface{}
8086

87+
// We only want the well formed json payload
88+
// exclude or trim the first 6 chars 'data: '
8189
err := json.Unmarshal([]byte(s[6:]), &snapshotMap)
8290
if err != nil {
8391
return "", err
8492
}
93+
8594
snapshotByte, err := json.Marshal(snapshotMap["data"])
8695
if err != nil {
8796
return "", err
8897
}
8998

9099
return string(snapshotByte), nil
91-
92100
}
93101
}
94102

@@ -98,13 +106,23 @@ func getInitialNodeSnapshot(resp *http.Response) (string, error) {
98106
}
99107

100108
// called with goroutine
101-
func (r *Ref) startListeningWithReconnect(ctx context.Context, opts []internal.HTTPOption, resp *http.Response, sseDataChan chan<- string) {
109+
func (r *Ref) startListeningWithReconnect(
110+
ctx context.Context,
111+
opts []internal.HTTPOption,
112+
resp *http.Response,
113+
done *bool,
114+
sseDataChan chan<- string) {
102115

103116
scanner := bufio.NewScanner(resp.Body)
104117

105118
var b []byte
106119

107120
for {
121+
122+
if *done {
123+
break
124+
}
125+
108126
if scanner.Scan() == true {
109127

110128
b = scanner.Bytes()
@@ -127,7 +145,8 @@ func (r *Ref) startListeningWithReconnect(ctx context.Context, opts []internal.H
127145
} else if "event: auth_revoked" == string(b) {
128146

129147
// reconnect to re-establish authentication every hour
130-
resp, err := r.sendListen(ctx, "GET", opts...)
148+
var err error
149+
resp, err = r.sendListen(ctx, "GET", opts...)
131150

132151
if err == nil {
133152
// not part of existing continuing listening events, so we don't send to the listening channel
@@ -139,8 +158,10 @@ func (r *Ref) startListeningWithReconnect(ctx context.Context, opts []internal.H
139158
scanner = bufio.NewScanner(resp.Body)
140159
}
141160
} else {
161+
142162
// attemp to reconnect for other connection problems
143-
resp, err := r.sendListen(ctx, "GET", opts...)
163+
var err error
164+
resp, err = r.sendListen(ctx, "GET", opts...)
144165

145166
if err == nil {
146167
// not part of existing continuing listening events, so we don't send to the listening channel

db/event_test.go

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,50 @@
1-
// Copyright 2019 Google Inc. All Rights Reserved.
2-
//
3-
// Licensed under the Apache License, Version 2.0 (the "License");
4-
// you may not use this file except in compliance with the License.
5-
// You may obtain a copy of the License at
6-
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
8-
//
9-
// Unless required by applicable law or agreed to in writing, software
10-
// distributed under the License is distributed on an "AS IS" BASIS,
11-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12-
// See the License for the specific language governing permissions and
13-
// limitations under the License.
14-
15-
package db
16-
17-
import (
18-
"context"
19-
"testing"
20-
)
21-
22-
func TestListen(t *testing.T) {
23-
mock := &mockServer{}
24-
srv := mock.Start(client)
25-
defer srv.Close()
26-
27-
// mostly just a place holder, currenty test for crash only
28-
29-
iter, err := testref.Listen(context.Background())
30-
_ = iter
31-
_ = err
32-
33-
defer iter.Stop()
34-
35-
event, err := iter.Next()
36-
_ = event
37-
_ = err
38-
39-
if err == nil {
40-
iter.Stop()
41-
}
42-
43-
}
1+
// Copyright 2019 Google Inc. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package db
16+
17+
import (
18+
"context"
19+
"testing"
20+
)
21+
22+
func TestListen(t *testing.T) {
23+
mock := &mockServer{}
24+
srv := mock.Start(client)
25+
defer srv.Close()
26+
27+
// mostly just a place holder, currenty test for crash only
28+
29+
iter, err := testref.Listen(context.Background())
30+
_ = iter
31+
_ = err
32+
33+
defer iter.Stop()
34+
35+
event, err := iter.Next()
36+
_ = event
37+
_ = err
38+
39+
if iter.Done() {
40+
// break
41+
}
42+
43+
if err == nil {
44+
iter.Stop()
45+
}
46+
47+
b := iter.Done()
48+
_ = b
49+
50+
}

0 commit comments

Comments
 (0)