Skip to content

Commit 85ef98d

Browse files
committed
Code fix to reconnect when 'event: auth_revoked' occurs (#229)
1 parent 077a017 commit 85ef98d

File tree

1 file changed

+32
-11
lines changed

1 file changed

+32
-11
lines changed

db/event_ref.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,13 @@ func (r *Ref) Listen(ctx context.Context) (*SnapshotIterator, error) {
4141
return &SnapshotIterator{active: false}, err
4242
}
4343

44-
scanner := bufio.NewScanner(resp.Body)
45-
46-
snapshot, err := getInitialNodeSnapshot(scanner, resp)
44+
snapshot, err := getInitialNodeSnapshot(resp)
4745

4846
if err != nil {
4947
return &SnapshotIterator{active: false}, err
5048
}
5149

52-
go startListening(scanner, resp, sseDataChan)
50+
go r.startListeningWithReconnect(ctx, opts, resp, sseDataChan)
5351

5452
return &SnapshotIterator{
5553
Snapshot: snapshot,
@@ -61,10 +59,12 @@ func (r *Ref) Listen(ctx context.Context) (*SnapshotIterator, error) {
6159
} // Listen()
6260

6361
// return initial snapshot (JSON-encoded string) from Ref.Path node location
64-
func getInitialNodeSnapshot(scanner *bufio.Scanner, resp *http.Response) (string, error) {
62+
func getInitialNodeSnapshot(resp *http.Response) (string, error) {
6563

6664
var b []byte
6765

66+
scanner := bufio.NewScanner(resp.Body)
67+
6868
if scanner.Scan() == true {
6969

7070
b = scanner.Bytes()
@@ -104,7 +104,9 @@ func getInitialNodeSnapshot(scanner *bufio.Scanner, resp *http.Response) (string
104104
}
105105

106106
// called with goroutine
107-
func startListening(scanner *bufio.Scanner, resp *http.Response, sseDataChan chan<- string) { // sseDataChan send only
107+
func (r *Ref) startListeningWithReconnect(ctx context.Context, opts []internal.HTTPOption, resp *http.Response, sseDataChan chan<- string) {
108+
109+
scanner := bufio.NewScanner(resp.Body)
108110

109111
var b []byte
110112

@@ -126,17 +128,36 @@ func startListening(scanner *bufio.Scanner, resp *http.Response, sseDataChan cha
126128
if s[:5] == "data:" {
127129
// trim 'data: '
128130
sseDataChan <- s[6:] // {"path":"/","data":{"test3":{"test4":4}}}
129-
130131
}
131132
}
132-
}
133+
} else if "event: auth_revoked" == string(b) {
133134

135+
// reconnect to re-establish authentication every hour
136+
resp, err := r.sendListen(ctx, "GET", opts...)
137+
138+
if err == nil {
139+
// not part of existing continuing listening events, so we don't send to the listening channel
140+
snapshot, err := getInitialNodeSnapshot(resp)
141+
_ = snapshot
142+
_ = err
143+
}
144+
145+
scanner = bufio.NewScanner(resp.Body)
146+
}
134147
} else {
148+
// attemp to reconnect for other connection problems
149+
resp, err := r.sendListen(ctx, "GET", opts...)
150+
151+
if err == nil {
152+
// not part of existing continuing listening events, so we don't send to the listening channel
153+
snapshot, err := getInitialNodeSnapshot(resp)
154+
_ = snapshot
155+
_ = err
156+
}
135157

136-
break
158+
scanner = bufio.NewScanner(resp.Body)
137159
}
138-
139-
} // for
160+
}
140161
}
141162

142163
// returns path and snapshot

0 commit comments

Comments
 (0)