Skip to content

Commit f00ec6a

Browse files
committed
Move events api to its own package because its a cross-cutting concern
1 parent 338e2ab commit f00ec6a

File tree

5 files changed

+93
-74
lines changed

5 files changed

+93
-74
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"net/http"
8+
9+
"github.com/google/uuid"
10+
"github.com/launchdarkly/ldcli/internal/dev_server/model"
11+
"github.com/pkg/errors"
12+
13+
"github.com/launchdarkly/ldcli/internal/dev_server/sdk"
14+
)
15+
16+
type sdkEventObserver struct {
17+
ctx context.Context
18+
debugSessionKey string
19+
updateChan chan<- sdk.Message
20+
}
21+
22+
func newSdkEventObserver(updateChan chan<- sdk.Message, ctx context.Context) sdkEventObserver {
23+
debugSessionKey := uuid.New().String()
24+
db := model.EventStoreFromContext(ctx)
25+
err := db.CreateDebugSession(ctx, debugSessionKey)
26+
if err != nil {
27+
log.Printf("sdkEventObserver: error writting debug session: %v", err)
28+
}
29+
return sdkEventObserver{
30+
debugSessionKey: debugSessionKey,
31+
ctx: ctx,
32+
updateChan: updateChan,
33+
}
34+
}
35+
36+
func (o sdkEventObserver) Handle(message interface{}) {
37+
str, ok := message.(json.RawMessage)
38+
if !ok {
39+
return
40+
}
41+
42+
event := sdk.SDKEventBase{}
43+
err := json.Unmarshal(str, &event)
44+
if err != nil {
45+
log.Printf("sdkEventObserver: error unmarshaling event: %v", err)
46+
return
47+
}
48+
49+
db := model.EventStoreFromContext(o.ctx)
50+
51+
err = db.WriteEvent(o.ctx, o.debugSessionKey, event.Kind, str)
52+
if err != nil {
53+
log.Printf("sdkEventObserver: error writting event: %v", err)
54+
return
55+
}
56+
57+
o.updateChan <- sdk.Message{Event: sdk.TYPE_PUT, Data: str}
58+
}
59+
60+
func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) {
61+
updateChan, errChan := sdk.OpenStream(
62+
writer,
63+
request.Context().Done(),
64+
sdk.Message{Event: sdk.TYPE_PUT, Data: []byte{}},
65+
)
66+
defer close(updateChan)
67+
observers := model.GetObserversFromContext(request.Context())
68+
69+
observerId := observers.RegisterObserver(newSdkEventObserver(updateChan, request.Context()))
70+
defer func() {
71+
ok := observers.DeregisterObserver(observerId)
72+
if !ok {
73+
log.Printf("unable to remove observer")
74+
}
75+
}()
76+
77+
err := <-errChan
78+
if err != nil {
79+
sdk.WriteError(request.Context(), writer, errors.Wrap(err, "stream failure"))
80+
return
81+
}
82+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package events
2+
3+
import "github.com/gorilla/mux"
4+
5+
func BindRoutes(router *mux.Router) {
6+
router.HandleFunc("/events/tee", SdkEventsTeeHandler)
7+
}

internal/dev_server/dev_server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package dev_server
33
import (
44
"context"
55
"fmt"
6-
"github.com/launchdarkly/ldcli/internal/dev_server/events_db"
76
"log"
87
"net/http"
98
"os"
@@ -15,7 +14,9 @@ import (
1514
"github.com/launchdarkly/ldcli/internal/client"
1615
"github.com/launchdarkly/ldcli/internal/dev_server/adapters"
1716
"github.com/launchdarkly/ldcli/internal/dev_server/api"
17+
"github.com/launchdarkly/ldcli/internal/dev_server/api/events"
1818
"github.com/launchdarkly/ldcli/internal/dev_server/db"
19+
"github.com/launchdarkly/ldcli/internal/dev_server/events_db"
1920
"github.com/launchdarkly/ldcli/internal/dev_server/model"
2021
"github.com/launchdarkly/ldcli/internal/dev_server/sdk"
2122
"github.com/launchdarkly/ldcli/internal/dev_server/ui"
@@ -77,6 +78,7 @@ func (c LDClient) RunServer(ctx context.Context, serverParams ServerParams) {
7778
ui.AssetHandler.ServeHTTP(w, r)
7879
})
7980
sdk.BindRoutes(r)
81+
events.BindRoutes(r)
8082
handler := api.HandlerFromMux(apiServer, r)
8183
handler = api.CorsHeadersWithConfig(serverParams.CorsEnabled, serverParams.CorsOrigin)(handler)
8284
handler = handlers.CombinedLoggingHandler(os.Stdout, handler)

internal/dev_server/sdk/routes.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ func BindRoutes(router *mux.Router) {
1212
// events
1313
router.HandleFunc("/bulk", SdkEventsReceiveHandler)
1414
router.HandleFunc("/diagnostic", DevNull)
15-
router.HandleFunc("/events/tee", SdkEventsTeeHandler)
1615
router.Handle("/events/bulk/{envId}", EventsCorsHeaders(DevNull))
1716
router.Handle("/events/diagnostic/{envId}", EventsCorsHeaders(DevNull))
1817
router.HandleFunc("/mobile", DevNull)
Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,25 @@
11
package sdk
22

33
import (
4-
"context"
54
"encoding/json"
65
"io"
76
"log"
87
"net/http"
98

10-
"github.com/google/uuid"
119
"github.com/launchdarkly/ldcli/internal/dev_server/model"
12-
"github.com/pkg/errors"
1310
)
1411

15-
func newSdkEventObserver(updateChan chan<- Message, ctx context.Context) sdkEventObserver {
16-
debugSessionKey := uuid.New().String()
17-
db := model.EventStoreFromContext(ctx)
18-
err := db.CreateDebugSession(ctx, debugSessionKey)
19-
if err != nil {
20-
log.Printf("sdkEventObserver: error writting debug session: %v", err)
21-
}
22-
return sdkEventObserver{
23-
debugSessionKey: debugSessionKey,
24-
ctx: ctx,
25-
updateChan: updateChan,
26-
}
27-
}
28-
2912
type SDKEventBase struct {
3013
Kind string `json:"kind"`
3114
}
3215

33-
type sdkEventObserver struct {
34-
ctx context.Context
35-
debugSessionKey string
36-
updateChan chan<- Message
37-
}
38-
39-
func (o sdkEventObserver) Handle(message interface{}) {
40-
str, ok := message.(json.RawMessage)
41-
if !ok {
42-
return
43-
}
44-
45-
event := SDKEventBase{}
46-
err := json.Unmarshal(str, &event)
47-
if err != nil {
48-
log.Printf("sdkEventObserver: error unmarshaling event: %v", err)
49-
return
50-
}
51-
52-
db := model.EventStoreFromContext(o.ctx)
53-
54-
err = db.WriteEvent(o.ctx, o.debugSessionKey, event.Kind, str)
55-
if err != nil {
56-
log.Printf("sdkEventObserver: error writting event: %v", err)
57-
return
58-
}
59-
60-
o.updateChan <- Message{Event: TYPE_PUT, Data: str}
61-
}
62-
63-
var observers *model.Observers = model.NewObservers()
64-
6516
func SdkEventsReceiveHandler(writer http.ResponseWriter, request *http.Request) {
6617
bodyStr, err := io.ReadAll(request.Body)
6718
if err != nil {
6819
log.Printf("SdkEventsReceiveHandler: error reading request body: %v", err)
6920
return
7021
}
22+
observers := model.GetObserversFromContext(request.Context())
7123

7224
var arr []json.RawMessage
7325
err = json.Unmarshal(bodyStr, &arr)
@@ -83,26 +35,3 @@ func SdkEventsReceiveHandler(writer http.ResponseWriter, request *http.Request)
8335
writer.Header().Set("Content-Type", "application/json")
8436
writer.WriteHeader(http.StatusAccepted)
8537
}
86-
87-
func SdkEventsTeeHandler(writer http.ResponseWriter, request *http.Request) {
88-
updateChan, errChan := OpenStream(
89-
writer,
90-
request.Context().Done(),
91-
Message{Event: TYPE_PUT, Data: []byte{}},
92-
)
93-
defer close(updateChan)
94-
95-
observerId := observers.RegisterObserver(newSdkEventObserver(updateChan, request.Context()))
96-
defer func() {
97-
ok := observers.DeregisterObserver(observerId)
98-
if !ok {
99-
log.Printf("unable to remove observer")
100-
}
101-
}()
102-
103-
err := <-errChan
104-
if err != nil {
105-
WriteError(request.Context(), writer, errors.Wrap(err, "stream failure"))
106-
return
107-
}
108-
}

0 commit comments

Comments
 (0)