Skip to content

Commit 0dcc25a

Browse files
committed
Initial release commit
1 parent af47eed commit 0dcc25a

File tree

3 files changed

+329
-0
lines changed

3 files changed

+329
-0
lines changed

bayeux.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package bayeux
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"io/ioutil"
10+
"log"
11+
"net/http"
12+
"net/url"
13+
"os"
14+
"strings"
15+
"sync"
16+
"time"
17+
)
18+
19+
// TriggerEvent describes an event received from Bayeaux Endpoint
20+
type TriggerEvent struct {
21+
ClientID string `json:"clientId"`
22+
Data struct {
23+
Event struct {
24+
CreatedDate time.Time `json:"createdDate"`
25+
ReplayID int `json:"replayId"`
26+
Type string `json:"type"`
27+
} `json:"event"`
28+
Object json.RawMessage `json:"sobject"`
29+
} `json:"data,omitempty"`
30+
Channel string `json:"channel"`
31+
Successful bool `json:"successful,omitempty"`
32+
}
33+
34+
func (t TriggerEvent) topic() string {
35+
s := strings.Replace(t.Channel, "/topic/", "", 1)
36+
return s
37+
}
38+
39+
// Status is the state of success and subscribed channels
40+
type Status struct {
41+
connected bool
42+
clientID string
43+
channels []string
44+
}
45+
46+
type BayeuxHandshake []struct {
47+
Ext struct {
48+
Replay bool `json:"replay"`
49+
} `json:"ext"`
50+
MinimumVersion string `json:"minimumVersion"`
51+
ClientID string `json:"clientId"`
52+
SupportedConnectionTypes []string `json:"supportedConnectionTypes"`
53+
Channel string `json:"channel"`
54+
Version string `json:"version"`
55+
Successful bool `json:"successful"`
56+
}
57+
58+
type Subscription struct {
59+
ClientID string `json:"clientId"`
60+
Channel string `json:"channel"`
61+
Subscription string `json:"subscription"`
62+
Successful bool `json:"successful"`
63+
}
64+
65+
type Credentials struct {
66+
AccessToken string `json:"access_token"`
67+
InstanceURL string `json:"instance_url"`
68+
IssuedAt int
69+
ID string
70+
TokenType string `json:"token_type"`
71+
Signature string
72+
}
73+
74+
func (c Credentials) bayeuxUrl() string {
75+
return c.InstanceURL + "/cometd/38.0"
76+
}
77+
78+
type clientIDAndCookies struct {
79+
clientID string
80+
cookies []*http.Cookie
81+
}
82+
83+
// Bayeux struct allow for centralized storage of creds, ids, and cookies
84+
type Bayeux struct {
85+
creds Credentials
86+
id clientIDAndCookies
87+
}
88+
89+
var wg sync.WaitGroup
90+
var logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
91+
var status = Status{false, "", []string{}}
92+
93+
// Call is the base function for making bayeux requests
94+
func (b *Bayeux) call(body string, route string) (resp *http.Response, e error) {
95+
var jsonStr = []byte(body)
96+
req, err := http.NewRequest("POST", route, bytes.NewBuffer(jsonStr))
97+
if err != nil {
98+
logger.Fatalf("Bad Call request: %s", err)
99+
}
100+
req.Header.Add("Content-Type", "application/json")
101+
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", b.creds.AccessToken))
102+
// Per Stackexchange comment, passing back cookies is required though undocumented in Salesforce API
103+
// We were unable to get process working without passing cookies back to SF server.
104+
// SF Reference: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/intro_client_specs.htm
105+
for _, cookie := range b.id.cookies {
106+
req.AddCookie(cookie)
107+
}
108+
109+
//logger.Printf("REQUEST: %#v", req)
110+
client := &http.Client{}
111+
resp, err = client.Do(req)
112+
if err == io.EOF {
113+
// Right way to handle EOF?
114+
logger.Printf("Bad bayeuxCall io.EOF: %s\n", err)
115+
logger.Printf("Bad bayeuxCall Response: %+v\n", resp)
116+
} else if err != nil {
117+
e = errors.New(fmt.Sprintf("Unknown error: %s", err))
118+
logger.Printf("Bad unrecoverable Call: %s", err)
119+
}
120+
return resp, e
121+
}
122+
123+
func (b *Bayeux) getClientID() error {
124+
handshake := `{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}`
125+
//var id clientIDAndCookies
126+
// Stub out clientIDAndCookies for first bayeuxCall
127+
resp, err := b.call(handshake, b.creds.bayeuxUrl())
128+
if err != nil {
129+
logger.Fatalf("Cannot get client id %s", err)
130+
}
131+
defer resp.Body.Close()
132+
133+
decoder := json.NewDecoder(resp.Body)
134+
var h BayeuxHandshake
135+
if err := decoder.Decode(&h); err == io.EOF {
136+
logger.Fatal(err)
137+
} else if err != nil {
138+
logger.Fatal(err)
139+
}
140+
creds := clientIDAndCookies{h[0].ClientID, resp.Cookies()}
141+
b.id = creds
142+
return nil
143+
}
144+
145+
// ReplayAll replay for past 24 hrs
146+
const ReplayAll = -2
147+
148+
// ReplayNone start playing events at current moment
149+
const ReplayNone = -1
150+
151+
// Replay accepts the following values
152+
// Value
153+
// -2: replay all events from past 24 hrs
154+
// -1: start at current
155+
// >= 0: start from this event number
156+
type Replay struct {
157+
Value int
158+
}
159+
160+
func (b *Bayeux) subscribe(topic string, replay Replay) Subscription {
161+
handshake := fmt.Sprintf(`{
162+
"channel": "/meta/subscribe",
163+
"subscription": "/topic/%s",
164+
"clientId": "%s",
165+
"ext": {
166+
"replay": {"/topic/%s": "%d"}
167+
}
168+
}`, topic, b.id.clientID, topic, replay)
169+
resp, err := b.call(handshake, b.creds.bayeuxUrl())
170+
if err != nil {
171+
logger.Fatalf("Cannot subscribe %s", err)
172+
}
173+
174+
defer resp.Body.Close()
175+
if os.Getenv("DEBUG") != "" {
176+
logger.Printf("Response: %+v", resp)
177+
// // Read the content
178+
var b []byte
179+
if resp.Body != nil {
180+
b, _ = ioutil.ReadAll(resp.Body)
181+
}
182+
// Restore the io.ReadCloser to its original state
183+
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
184+
// Use the content
185+
s := string(b)
186+
logger.Printf("Response Body: %s", s)
187+
}
188+
189+
if resp.StatusCode > 299 {
190+
logger.Fatalf("Received non 2XX response: HTTP_CODE %d", resp.StatusCode)
191+
}
192+
decoder := json.NewDecoder(resp.Body)
193+
var h []Subscription
194+
if err := decoder.Decode(&h); err == io.EOF {
195+
logger.Fatal(err)
196+
} else if err != nil {
197+
logger.Fatal(err)
198+
}
199+
sub := h[0]
200+
status.connected = sub.Successful
201+
status.clientID = sub.ClientID
202+
status.channels = append(status.channels, topic)
203+
logger.Printf("Established connection(s): %+v", status)
204+
return sub
205+
}
206+
207+
func (b *Bayeux) connect() chan TriggerEvent {
208+
out := make(chan TriggerEvent)
209+
go func() {
210+
// TODO: add stop chan to bring this thing to halt
211+
for {
212+
postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
213+
resp, err := b.call(postBody, b.creds.bayeuxUrl())
214+
if err != nil {
215+
logger.Printf("Cannot connect to bayeux %s", err)
216+
logger.Println("Trying again...")
217+
} else {
218+
defer resp.Body.Close()
219+
if os.Getenv("DEBUG") != "" {
220+
// // Read the content
221+
var b []byte
222+
if resp.Body != nil {
223+
b, _ = ioutil.ReadAll(resp.Body)
224+
}
225+
// Restore the io.ReadCloser to its original state
226+
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
227+
// Use the content
228+
s := string(b)
229+
logger.Printf("Response Body: %s", s)
230+
}
231+
var x []TriggerEvent
232+
decoder := json.NewDecoder(resp.Body)
233+
if err := decoder.Decode(&x); err != nil && err != io.EOF {
234+
logger.Fatal(err)
235+
}
236+
for _, e := range x {
237+
out <- e
238+
}
239+
}
240+
}
241+
}()
242+
return out
243+
}
244+
245+
func GetSalesforceCredentials() Credentials {
246+
route := "https://login.salesforce.com/services/oauth2/token"
247+
clientID := mustGetEnv("SALESFORCE_CONSUMER_KEY")
248+
clientSecret := mustGetEnv("SALESFORCE_CONSUMER_SECRET")
249+
username := mustGetEnv("SALESFORCE_USER")
250+
password := mustGetEnv("SALESFORCE_PASSWORD")
251+
params := url.Values{"grant_type": {"password"},
252+
"client_id": {clientID},
253+
"client_secret": {clientSecret},
254+
"username": {username},
255+
"password": {password}}
256+
res, err := http.PostForm(route, params)
257+
if err != nil {
258+
logger.Fatal(err)
259+
}
260+
decoder := json.NewDecoder(res.Body)
261+
var creds Credentials
262+
if err := decoder.Decode(&creds); err == io.EOF {
263+
logger.Fatal(err)
264+
} else if err != nil {
265+
logger.Fatal(err)
266+
} else if creds.AccessToken == "" {
267+
logger.Fatalf("Unable to fetch access token. Check credentials in environmental variables")
268+
}
269+
return creds
270+
}
271+
272+
func mustGetEnv(s string) string {
273+
r := os.Getenv(s)
274+
if r == "" {
275+
panic(fmt.Sprintf("Could not fetch key %s", s))
276+
}
277+
return r
278+
}
279+
280+
func (b *Bayeux) TopicToChannel(creds Credentials, topic string) chan TriggerEvent {
281+
b.creds = creds
282+
err := b.getClientID()
283+
if err != nil {
284+
log.Fatal("Unable to get bayeux ClientId")
285+
}
286+
r := Replay{ReplayAll}
287+
b.subscribe(topic, r)
288+
c := b.connect()
289+
wg.Add(1)
290+
return c
291+
}

example_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package bayeux
2+
3+
import "fmt"
4+
5+
func Example() {
6+
b := Bayeux{}
7+
creds := GetSalesforceCredentials()
8+
c := b.TopicToChannel(creds, "topicName")
9+
for {
10+
select {
11+
case e := <-c:
12+
fmt.Printf("TriggerEvent Received: %+v", e)
13+
}
14+
}
15+
}

examples/main.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
bay "github.com/zph/bayeux"
7+
)
8+
9+
func Example() {
10+
b := bay.Bayeux{}
11+
creds := bay.GetSalesforceCredentials()
12+
c := b.TopicToChannel(creds, "topicName")
13+
for {
14+
select {
15+
case e := <-c:
16+
fmt.Printf("TriggerEvent Received: %+v", e)
17+
}
18+
}
19+
}
20+
21+
func main() {
22+
Example()
23+
}

0 commit comments

Comments
 (0)