diff --git a/cmd/listen/main.go b/cmd/listen/main.go new file mode 100644 index 00000000..a98213b9 --- /dev/null +++ b/cmd/listen/main.go @@ -0,0 +1,115 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "os" + + "golang.org/x/net/context" + "google.golang.org/api/option" + + firebase "firebase.google.com/go" + "firebase.google.com/go/db" +) + +func main() { + + // opt := option.WithCredentialsFile("c:/users/username/.firebase/firebase.json") // Windows + // + opt := option.WithCredentialsFile("/home/username/.firebase/firebase.json") // Linux, edit 1. + + config := &firebase.Config{ + DatabaseURL: "https://mydb.firebaseio.com", // edit 2. + } + + app, err := firebase.NewApp(context.Background(), config, opt) + if err != nil { + log.Fatal(err) + } + + ctx := context.Background() + + // DatabaseWithURL + client, err := app.Database(ctx) + + if err != nil { + log.Fatal(err) + } + + testpath := "user1/path1" + ref := client.NewRef(testpath) + + args := os.Args + if len(args) > 1 { + triggerEvent(ctx, client, testpath, args[1]) + return // exit app + } + + // SnapshotIterator + iter, err := ref.Listen(ctx) + if err != nil { + fmt.Printf(" Error: failed to create Listener %v\n", err) + return + } + + fmt.Printf("client app | Ref Path: %s | iter.Snapshot = %v\n", ref.Path, iter.Snapshot) + fmt.Printf(" | Ref Key: %s \n", ref.Key) + + defer iter.Stop() + + go func() { + for { + + if iter.Done() { + break + } + + event, err := iter.Next() + + if err != nil { + // Handle error here based on specific usecase + // We can continue Listening + log.Printf("%v\n", err) + continue // go back to beginning of for loop + } + + fmt.Printf("client app | Ref Path: %s | event.Path %s | event.Snapshot() = %v\n", ref.Path, event.Path, event.Snapshot()) + fmt.Printf("\n") + } + }() + + fmt.Printf("\n >>> edit value of any key from %s in firebase console to trigger event\n\n", testpath) + fmt.Printf("\n >>> press to close http connection\n\n") + fmt.Printf("Waiting for events...\n\n") + + fmt.Scanln() + iter.Stop() + + fmt.Printf("\n >>> press to exit app\n\n\n") + fmt.Scanln() +} + +func triggerEvent(ctx context.Context, client *db.Client, testpath string, val string) { + + ref := client.NewRef(testpath + "/key1") + + if err := ref.Set(ctx, val); err != nil { + log.Fatal(err) + } else { + fmt.Printf("OK - Set %s to %v\n", testpath+"/key1", val) + } +} diff --git a/cmd/listen2/main.go b/cmd/listen2/main.go new file mode 100644 index 00000000..d0a70fff --- /dev/null +++ b/cmd/listen2/main.go @@ -0,0 +1,182 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "os" + + "golang.org/x/net/context" + "google.golang.org/api/option" + + firebase "firebase.google.com/go" + "firebase.google.com/go/db" +) + +// Key is a json-serializable type. +type Key struct { + Key1 string `json:"key1"` +} + +func main() { + + // opt := option.WithCredentialsFile("c:/users/username/.firebase/firebase.json") // Windows + // + opt := option.WithCredentialsFile("/home/username/.firebase/firebase.json") // Linux, edit 1. + + config := &firebase.Config{ + DatabaseURL: "https://databaseName.firebaseio.com", // edit 2. + } + + app, err := firebase.NewApp(context.Background(), config, opt) + if err != nil { + log.Fatal(err) + } + + ctx := context.Background() + + // DatabaseWithURL + client, err := app.Database(ctx) + + if err != nil { + log.Fatal(err) + } + + // https://firebase.google.com/docs/reference/js/firebase.database.Reference.html#key + // + // key = The last part of the Reference's path. + + testpath := "user1/path1" + ref := client.NewRef(testpath) + + args := os.Args + if len(args) > 1 { + triggerEvent(ctx, client, testpath, args[1]) + return // exit app + } + + // SnapshotIterator + iter, err := ref.Listen(ctx) + if err != nil { + fmt.Printf(" Error: failed to create Listener %v\n", err) + return // exit app + } + + fmt.Printf("Initial snapshots:\n") + + fmt.Printf("1st Listener | Ref Path: %s | iter.Snapshot = %v\n", ref.Path, iter.Snapshot) + fmt.Printf(" | Ref Key: %s \n", ref.Key) + + defer iter.Stop() + + var key Key + + go func() { + for { + + if iter.Done() { + break + } + + event, err := iter.Next() + + if err != nil { + // Handle error here based on specific usecase + // We can continue Listening + log.Printf("%v\n", err) + continue // go back to beginning of for loop + } + + err = event.Unmarshal(&key) + + if err != nil { + fmt.Printf("1st Listener | Error: Unmarshal %v\n", err) + } else { + fmt.Printf("1st Listener | Ref Path: %s | event.Path %s | event.Unmarshal(&key) key.Key1 = %s\n", ref.Path, event.Path, key.Key1) + fmt.Printf("1st Listener | Ref Path: %s | event.Path %s | event.Unmarshal(&key) key = %v\n", ref.Path, event.Path, key) + } + + fmt.Printf("1st Listener | Ref Path: %s | event.Path %s | event.Snapshot() = %v\n", ref.Path, event.Path, event.Snapshot()) + fmt.Printf("\n") + } + }() + + // 2nd listener + testpath2 := "user1/path1/path2" + ref2 := client.NewRef(testpath2) + + iter2, err := ref2.Listen(ctx) + if err != nil { + fmt.Printf(" Error: failed to create Listener %v\n", err) + return + } + + fmt.Printf("2nd Listener | Ref Path: %s | iter.Snapshot = %v\n", ref2.Path, iter2.Snapshot) + fmt.Printf(" | Ref Key: %s \n", ref2.Key) + + defer iter2.Stop() + + go func() { + for { + + if iter2.Done() { + break + } + + event, err := iter2.Next() + + if err != nil { + // Handle error here based on specific usecase + // We can continue Listening + log.Printf("%v\n", err) + continue // go back to beginning of for loop + } + + fmt.Printf("2nd Listener | Ref Path: %s | event.Path %s | event.Snapshot() = %v\n", ref2.Path, event.Path, event.Snapshot()) + fmt.Printf("\n") + } + }() + + fmt.Printf("\n >>> open a new separate command line terminal, to trigger events, run: go run . anyvalue\n") + fmt.Printf("\n >>> OR edit value of any key from %s in firebase console to trigger events\n\n", testpath) + fmt.Printf("\n >>> press to stop 1st Listener and close http connection\n\n") + fmt.Printf("Waiting for events...\n\n") + + fmt.Scanln() + iter.Stop() + + fmt.Printf("\n >>> press to stop 2nd Listener and close http connection\n\n") + fmt.Scanln() + iter2.Stop() + + fmt.Printf("\n >>> press to exit app\n\n\n") + fmt.Scanln() +} + +func triggerEvent(ctx context.Context, client *db.Client, testpath string, val string) { + + var key Key + + key.Key1 = val + + ref := client.NewRef(testpath + "/path2/path3") + + if err := ref.Set(ctx, key); err != nil { + log.Fatal(err) + } else { + fmt.Printf("OK - Set %s to key.Key1=%v\n", testpath+"/path2/path3", val) + } +} diff --git a/db/event.go b/db/event.go new file mode 100644 index 00000000..549e9551 --- /dev/null +++ b/db/event.go @@ -0,0 +1,98 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "encoding/json" + "errors" + "net/http" +) + +// SSE = Sever-Sent Events = ssevent + +// EventType specific event type changes +type EventType uint + +// EventType ... +const ( + ChildChanged EventType = iota + ChildAdded // to be implemented + ChildRemoved // to be implemented + ValueChanged // to be implemented +) + +// Event Sever-Sent Event object +type Event struct { + EventType EventType // ChildChanged, ValueChanged, ChildAdded, ChildRemoved + + Data string // JSON-encoded snapshot + Path string // snapshot path +} + +// SnapshotIterator iterator for continuous events +type SnapshotIterator struct { + Snapshot string // initial snapshot, JSON-encoded, returned from http Respoonse, server sent event, data part + SSEDataChan <-chan string // continuous event snapshot, channel receive only, directional channel + done *bool // Done listening to events, also used to prevent channel block + resp *http.Response // http connection keep alive +} + +// Snapshot ssevent data, data part +func (e *Event) Snapshot() string { + + return e.Data // ssevent data (snapshot), snapshot only, data part of ssevent data +} + +// Unmarshal current snapshot Event.Data +func (e *Event) Unmarshal(v interface{}) error { + + return json.Unmarshal([]byte(e.Data), v) +} + +// Next realtime event +func (it *SnapshotIterator) Next() (*Event, error) { + + // prevent channel block + if *it.done == true { + return nil, errors.New("SnapshotIterator is done or no longer active") + } + + sseDataString := <-it.SSEDataChan + + // todo: determine EventType + + path, snapshot, err := splitSSEData(sseDataString) + + return &Event{ + EventType: ChildChanged, + Data: snapshot, + Path: path, + }, err + +} // Next() + +// Stop listening for realtime events +// close http connection +func (it *SnapshotIterator) Stop() { + *it.done = true + if it.resp != nil { + it.resp.Body.Close() + } +} + +// Done can be used to check if Stop() have been called +func (it *SnapshotIterator) Done() bool { + return *it.done +} diff --git a/db/event_ref.go b/db/event_ref.go new file mode 100644 index 00000000..cd0ba16f --- /dev/null +++ b/db/event_ref.go @@ -0,0 +1,251 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "firebase.google.com/go/internal" +) + +// Listen returns an iterator that listens to realtime events +func (r *Ref) Listen(ctx context.Context) (*SnapshotIterator, error) { + + sseDataChan := make(chan string) // server-sent event data channel + + var opts []internal.HTTPOption + opts = append(opts, internal.WithHeader("Cache-Control", "no-cache")) + opts = append(opts, internal.WithHeader("Accept", "text/event-stream")) + opts = append(opts, internal.WithHeader("Connection", "keep-alive")) + + resp, err := r.sendListen(ctx, "GET", opts...) + + // This is temporary true in case initialization fails + done := true + + if err != nil { + return &SnapshotIterator{done: &done}, err + } + + snapshot, err := getInitialNodeSnapshot(resp) + + if err != nil { + return &SnapshotIterator{done: &done}, err + } + + // Initialization passed, we can continue with Listening + done = false + go r.startListeningWithReconnect(ctx, opts, resp, &done, sseDataChan) + + return &SnapshotIterator{ + Snapshot: snapshot, + SSEDataChan: sseDataChan, + done: &done, + resp: resp, // *http.Response + }, err + +} // Listen() + +// return initial snapshot (JSON-encoded string) from Ref.Path node location +func getInitialNodeSnapshot(resp *http.Response) (string, error) { + + var scannerText string + + scanner := bufio.NewScanner(resp.Body) + + if scanner != nil { + + if scanner.Scan() == true { + + scannerText = scanner.Text() + + if "event: put" == scannerText { + + if scanner.Scan() == true { + + s := scanner.Text() + + // sample sse data + // s = 'data: {"path":"/","data":{"test3":{"test4":4}}}' + + var snapshotMap map[string]interface{} + + // We only want the well formed json payload + // exclude or trim the first 6 chars 'data: ' + err := json.Unmarshal([]byte(s[6:]), &snapshotMap) + if err != nil { + return "", err + } + snapshotBytes, err := json.Marshal(snapshotMap["data"]) + if err != nil { + return "", err + } + + return string(snapshotBytes), nil + } + } + + } // if scanner.Scan() == true + + } // if scanner != nil + + return "", errors.New("sse data json error, event: put") +} + +// called with goroutine +func (r *Ref) startListeningWithReconnect( + ctx context.Context, + opts []internal.HTTPOption, + resp *http.Response, + done *bool, + sseDataChan chan<- string) { + + // We'll use this flag to simplify the code and have reconnect code in one block + reconnectState := false + + scanner := bufio.NewScanner(resp.Body) + + var scannerText string + + for { + + if *done { + break + } + + if reconnectState == true { + + var err error + resp, err = r.sendListen(ctx, "GET", opts...) + if err != nil { + time.Sleep(time.Second) + continue // try again + } else { + // Not part of existing continuing listening events, so we don't send to the listening channel + _, err := getInitialNodeSnapshot(resp) + + if err != nil { + time.Sleep(time.Second) + continue // try again + } + } + + scanner = bufio.NewScanner(resp.Body) + } + + if scanner == nil { + time.Sleep(time.Second) + continue // try again + } + + reconnectState = false + + if scanner != nil { + + if scanner.Scan() == true { + + scannerText = scanner.Text() + + if "event: put" == scannerText { + + if scanner.Scan() == true { + s := scanner.Text() + + // sample data + // s = 'data: {"path":"/","data":{"test3":{"test4":4}}}' + + // sse data = path + snapshot + if s[:5] == "data:" { + // trim 'data: ' + sseDataChan <- s[6:] // {"path":"/","data":{"test3":{"test4":4}}} + } + } + } else if "event: auth_revoked" == scannerText { + reconnectState = true + continue // go back to beginning of for loop + } + } else { // if scanner.Scan() == true + reconnectState = true + continue // go back to beginning of for loop + } + } else { // if scanner != nil + reconnectState = true + } + } // for +} + +// returns path and snapshot +func splitSSEData(sseData string) (path string, snapshot string, err error) { + + var sseDataMap map[string]interface{} + + err = json.Unmarshal([]byte(sseData), &sseDataMap) + if err != nil { + return "", "", err + } + + pathByte, err := json.Marshal(sseDataMap["path"]) + if err != nil { + return "", "", err + } + + snapshotByte, err := json.Marshal(sseDataMap["data"]) + if err != nil { + return "", "", err + } + + path = string(pathByte) + snapshot = string(snapshotByte) + + return +} + +func (c *Client) sendListen( + ctx context.Context, + method, path string, + body internal.HTTPEntity, + opts ...internal.HTTPOption) (*http.Response, error) { + + if strings.ContainsAny(path, invalidChars) { + return nil, fmt.Errorf("invalid path with illegal characters: %q", path) + } + if c.authOverride != "" { + opts = append(opts, internal.WithQueryParam(authVarOverride, c.authOverride)) + } + + resp, err := c.hc.DoListen(ctx, &internal.Request{ + Method: method, + URL: fmt.Sprintf("%s%s.json", c.url, path), + Body: body, + Opts: opts, + }) + + return resp, err +} + +func (r *Ref) sendListen( + ctx context.Context, + method string, + opts ...internal.HTTPOption) (*http.Response, error) { + + return r.client.sendListen(ctx, method, r.Path, nil, opts...) +} diff --git a/db/event_test.go b/db/event_test.go new file mode 100644 index 00000000..9e083041 --- /dev/null +++ b/db/event_test.go @@ -0,0 +1,50 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "context" + "testing" +) + +func TestListen(t *testing.T) { + mock := &mockServer{} + srv := mock.Start(client) + defer srv.Close() + + // mostly just a place holder, currenty test for crash only + + iter, err := testref.Listen(context.Background()) + _ = iter + _ = err + + defer iter.Stop() + + event, err := iter.Next() + _ = event + _ = err + + if iter.Done() { + // break + } + + if err == nil { + iter.Stop() + } + + b := iter.Done() + _ = b + +} diff --git a/internal/http_client_event.go b/internal/http_client_event.go new file mode 100644 index 00000000..3333b5d1 --- /dev/null +++ b/internal/http_client_event.go @@ -0,0 +1,36 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "net/http" +) + +// DoListen executes the given Request, and returns a Response. +func (c *HTTPClient) DoListen(ctx context.Context, r *Request) (*http.Response, error) { + req, err := r.buildHTTPRequest() + if err != nil { + return nil, err + } + + resp, err := c.Client.Do(req.WithContext(ctx)) + + if err != nil { + return nil, err + } + + return resp, nil +}