Skip to content

Commit acdef69

Browse files
authored
Add status watcher for bayeux client (#10)
* add status watcher for bayeux client * Resolve review comments * update client struct * resolve comments * Add wait group for in flight messages
1 parent 76153e3 commit acdef69

File tree

1 file changed

+37
-13
lines changed

1 file changed

+37
-13
lines changed

bayeux.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,19 @@ type TriggerEvent struct {
4242
}
4343

4444
// Status is the state of success and subscribed channels
45-
type Status struct {
46-
connected bool
47-
clientID string
48-
channels []string
45+
type status struct {
46+
connected bool
47+
clientID string
48+
channels []string
49+
connectCount int
50+
}
51+
52+
func (st *status) connect() {
53+
st.connectCount++
54+
}
55+
56+
func (st *status) disconnect() {
57+
st.connectCount--
4958
}
5059

5160
type BayeuxHandshake []struct {
@@ -101,7 +110,7 @@ type Bayeux struct {
101110

102111
var wg sync.WaitGroup
103112
var logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
104-
var status = Status{false, "", []string{}}
113+
var st = status{false, "", []string{}, 0}
105114

106115
// newHTTPRequest is to create requests with context
107116
func (b *Bayeux) newHTTPRequest(ctx context.Context, body string, route string) (*http.Request, error) {
@@ -221,20 +230,26 @@ func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) e
221230
return err
222231
}
223232
sub := &h[0]
224-
status.connected = sub.Successful
225-
status.clientID = sub.ClientID
226-
status.channels = append(status.channels, channel)
233+
st.connected = sub.Successful
234+
st.clientID = sub.ClientID
235+
st.channels = append(st.channels, channel)
236+
st.connect()
227237
if os.Getenv("DEBUG") != "" {
228-
logger.Printf("Established connection(s): %+v", status)
238+
logger.Printf("Established connection(s): %+v", st)
229239
}
230240
return nil
231241
}
232242

233243
func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
244+
var waitMsgs sync.WaitGroup
234245
wg.Add(1)
235246
go func() {
236-
defer wg.Done()
237-
defer close(out)
247+
defer func() {
248+
waitMsgs.Wait()
249+
close(out)
250+
st.disconnect()
251+
wg.Done()
252+
}()
238253
for {
239254
select {
240255
case <-ctx.Done():
@@ -265,8 +280,12 @@ func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
265280
out <- MaybeMsg{Err: err}
266281
return
267282
}
268-
for _, e := range x {
269-
out <- MaybeMsg{Msg: e}
283+
for i := range x {
284+
waitMsgs.Add(1)
285+
go func(e TriggerEvent) {
286+
defer waitMsgs.Done()
287+
out <- MaybeMsg{Msg: e}
288+
}(x[i])
270289
}
271290
}
272291
}
@@ -275,6 +294,11 @@ func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
275294
return out
276295
}
277296

297+
// GetConnectedCount returns count of subcriptions
298+
func GetConnectedCount() int {
299+
return st.connectCount
300+
}
301+
278302
func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials, err error) {
279303
params := url.Values{"grant_type": {"password"},
280304
"client_id": {ap.ClientID},

0 commit comments

Comments
 (0)