@@ -25,13 +25,14 @@ type TriggerEvent struct {
2525 ReplayID int `json:"replayId"`
2626 Type string `json:"type"`
2727 } `json:"event"`
28- Object json.RawMessage `json:"sobject"`
28+ Object json.RawMessage `json:"sobject"`
29+ Payload json.RawMessage `json:"payload"`
2930 } `json:"data,omitempty"`
3031 Channel string `json:"channel"`
3132 Successful bool `json:"successful,omitempty"`
3233}
3334
34- func (t TriggerEvent ) topic () string {
35+ func (t TriggerEvent ) channel () string {
3536 s := strings .Replace (t .Channel , "/topic/" , "" , 1 )
3637 return s
3738}
@@ -157,15 +158,15 @@ type Replay struct {
157158 Value int
158159}
159160
160- func (b * Bayeux ) subscribe (topic string , replay Replay ) Subscription {
161+ func (b * Bayeux ) subscribe (channel string , replay string ) Subscription {
161162 handshake := fmt .Sprintf (`{
162163 "channel": "/meta/subscribe",
163- "subscription": "/topic/ %s",
164+ "subscription": "%s",
164165 "clientId": "%s",
165166 "ext": {
166- "replay": {"/topic/ %s": "%d "}
167+ "replay": {"%s": "%s "}
167168 }
168- }` , topic , b .id .clientID , topic , replay )
169+ }` , channel , b .id .clientID , channel , replay )
169170 resp , err := b .call (handshake , b .creds .bayeuxUrl ())
170171 if err != nil {
171172 logger .Fatalf ("Cannot subscribe %s" , err )
@@ -174,7 +175,6 @@ func (b *Bayeux) subscribe(topic string, replay Replay) Subscription {
174175 defer resp .Body .Close ()
175176 if os .Getenv ("DEBUG" ) != "" {
176177 logger .Printf ("Response: %+v" , resp )
177- // // Read the content
178178 var b []byte
179179 if resp .Body != nil {
180180 b , _ = ioutil .ReadAll (resp .Body )
@@ -199,25 +199,21 @@ func (b *Bayeux) subscribe(topic string, replay Replay) Subscription {
199199 sub := h [0 ]
200200 status .connected = sub .Successful
201201 status .clientID = sub .ClientID
202- status .channels = append (status .channels , topic )
202+ status .channels = append (status .channels , channel )
203203 logger .Printf ("Established connection(s): %+v" , status )
204204 return sub
205205}
206206
207- func (b * Bayeux ) connect () chan TriggerEvent {
208- out := make (chan TriggerEvent )
207+ func (b * Bayeux ) connect (out chan TriggerEvent ) chan TriggerEvent {
209208 go func () {
210- // TODO: add stop chan to bring this thing to halt
211209 for {
212210 postBody := fmt .Sprintf (`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} ` , b .id .clientID )
213211 resp , err := b .call (postBody , b .creds .bayeuxUrl ())
214212 if err != nil {
215213 logger .Printf ("Cannot connect to bayeux %s" , err )
216214 logger .Println ("Trying again..." )
217215 } else {
218- defer resp .Body .Close ()
219216 if os .Getenv ("DEBUG" ) != "" {
220- // // Read the content
221217 var b []byte
222218 if resp .Body != nil {
223219 b , _ = ioutil .ReadAll (resp .Body )
@@ -277,15 +273,14 @@ func mustGetEnv(s string) string {
277273 return r
278274}
279275
280- func (b * Bayeux ) TopicToChannel ( creds Credentials , topic string ) chan TriggerEvent {
276+ func (b * Bayeux ) Channel ( out chan TriggerEvent , r string , creds Credentials , channel string ) chan TriggerEvent {
281277 b .creds = creds
282278 err := b .getClientID ()
283279 if err != nil {
284280 log .Fatal ("Unable to get bayeux ClientId" )
285281 }
286- r := Replay {ReplayAll }
287- b .subscribe (topic , r )
288- c := b .connect ()
282+ b .subscribe (channel , r )
283+ c := b .connect (out )
289284 wg .Add (1 )
290285 return c
291286}
0 commit comments