Skip to content

Commit 76153e3

Browse files
authored
Bugfix/resolve err handling for subscribe and channel closing (#9)
* Improve bayeux to accept and set signal based channel closing * Add err handler for subscribe method * Update error handling * resolve err handling in suscribe function and channel closing
1 parent c0aef94 commit 76153e3

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

bayeux.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ type Replay struct {
182182
Value int
183183
}
184184

185-
func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) (*Subscription, error) {
185+
func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) error {
186186
handshake := fmt.Sprintf(`{
187187
"channel": "/meta/subscribe",
188188
"subscription": "%s",
@@ -193,7 +193,7 @@ func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) (
193193
}`, channel, b.id.clientID, channel, replay)
194194
resp, err := b.call(ctx, handshake, b.creds.bayeuxUrl())
195195
if err != nil {
196-
return nil, fmt.Errorf("cannot subscribe: %w", err)
196+
return fmt.Errorf("cannot subscribe: %w", err)
197197
}
198198

199199
defer resp.Body.Close()
@@ -211,14 +211,14 @@ func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) (
211211
}
212212

213213
if resp.StatusCode > 299 {
214-
return nil, fmt.Errorf("received non 2XX response: %w", err)
214+
return fmt.Errorf("received non 2XX response: %w", err)
215215
}
216216
decoder := json.NewDecoder(resp.Body)
217217
var h []Subscription
218218
if err := decoder.Decode(&h); err == io.EOF {
219-
return nil, err
219+
return err
220220
} else if err != nil {
221-
return nil, err
221+
return err
222222
}
223223
sub := &h[0]
224224
status.connected = sub.Successful
@@ -227,7 +227,7 @@ func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) (
227227
if os.Getenv("DEBUG") != "" {
228228
logger.Printf("Established connection(s): %+v", status)
229229
}
230-
return sub, nil
230+
return nil
231231
}
232232

233233
func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
@@ -301,8 +301,15 @@ func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds
301301
err := b.getClientID(ctx)
302302
if err != nil {
303303
out <- MaybeMsg{Err: err}
304+
close(out)
305+
return out
306+
}
307+
err = b.subscribe(ctx, channel, r)
308+
if err != nil {
309+
out <- MaybeMsg{Err: err}
310+
close(out)
311+
return out
304312
}
305-
b.subscribe(ctx, channel, r)
306313
c := b.connect(ctx, out)
307314
return c
308315
}

0 commit comments

Comments
 (0)