|
1 | 1 | package sdk |
2 | 2 |
|
3 | 3 | import ( |
4 | | - "github.com/pkg/errors" |
5 | 4 | "io" |
6 | 5 | "log" |
7 | 6 | "net/http" |
| 7 | + "sync" |
| 8 | + |
| 9 | + "github.com/launchdarkly/ldcli/internal/dev_server/model" |
| 10 | + "github.com/pkg/errors" |
8 | 11 | ) |
9 | 12 |
|
| 13 | +type sdkEventObserver struct { |
| 14 | + updateChan chan<- Message |
| 15 | +} |
| 16 | + |
| 17 | +func (o sdkEventObserver) Handle(message interface{}) { |
| 18 | + str, ok := message.(string) |
| 19 | + if !ok { |
| 20 | + return |
| 21 | + } |
| 22 | + o.updateChan <- Message{Event: TYPE_PUT, Data: []byte(str)} |
| 23 | +} |
| 24 | + |
| 25 | +var observers *model.Observers |
| 26 | +var once sync.Once |
| 27 | + |
10 | 28 | func SdkEventsReceiveHandler(writer http.ResponseWriter, request *http.Request) { |
11 | | - log.Println(request.URL.Path) |
| 29 | + once.Do(func() { |
| 30 | + observers = model.NewObservers() |
| 31 | + }) |
| 32 | + |
12 | 33 | bodyStr, err := io.ReadAll(request.Body) |
13 | 34 | if err != nil { |
14 | 35 | log.Printf("SdkEventsReceiveHandler: error reading request body: %v", err) |
15 | 36 | return |
16 | 37 | } |
17 | | - log.Println(string(bodyStr)) |
| 38 | + if observers != nil { |
| 39 | + observers.Notify(string(bodyStr)) |
| 40 | + } |
18 | 41 |
|
19 | 42 | writer.Header().Set("Content-Type", "application/json") |
20 | 43 | writer.WriteHeader(http.StatusAccepted) |
21 | 44 | } |
22 | 45 |
|
23 | 46 | func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) { |
24 | | - // Initialize SSE |
| 47 | + once.Do(func() { |
| 48 | + observers = model.NewObservers() |
| 49 | + }) |
| 50 | + |
25 | 51 | updateChan, errChan := OpenStream( |
26 | 52 | writer, |
27 | 53 | request.Context().Done(), |
28 | | - Message{Event: TYPE_PUT, Data: []byte("start")}, |
| 54 | + Message{Event: TYPE_PUT, Data: []byte{}}, |
29 | 55 | ) |
30 | 56 | defer close(updateChan) |
31 | 57 |
|
32 | | - // Use updateChan to continually send messages back to the client. OpenStream, above, |
33 | | - // takes care of flushing the data. |
34 | | - // |
35 | | - // If the client cancels the request, OpenStream will notice via request.Context.Done(). |
36 | | - // Otherwise, this connection is never explicitly closed by us. |
37 | | - updateChan <- Message{Event: TYPE_PUT, Data: []byte("data1")} |
38 | | - updateChan <- Message{Event: TYPE_PUT, Data: []byte("data2")} |
| 58 | + observerId := observers.RegisterObserver(sdkEventObserver{updateChan}) |
| 59 | + defer func() { |
| 60 | + ok := observers.DeregisterObserver(observerId) |
| 61 | + if !ok { |
| 62 | + log.Printf("unable to remove observer") |
| 63 | + } |
| 64 | + }() |
39 | 65 |
|
40 | 66 | err := <-errChan |
41 | 67 | if err != nil { |
|
0 commit comments