Skip to content

Commit ee5be4e

Browse files
authored
refactor: standardize ingest http handler [OM-87] (#3675)
1 parent 18b337f commit ee5be4e

File tree

13 files changed

+301
-232
lines changed

13 files changed

+301
-232
lines changed

app/common/openmeter_server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConf
107107
func NewIngestService(
108108
collector ingest.Collector,
109109
logger *slog.Logger,
110-
) (*ingest.Service, error) {
111-
return &ingest.Service{
110+
) (ingest.Service, error) {
111+
return ingest.NewService(ingest.Config{
112112
Collector: collector,
113113
Logger: logger,
114-
}, nil
114+
})
115115
}

cmd/server/main.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ import (
1919
"github.com/openmeterio/openmeter/app/common"
2020
"github.com/openmeterio/openmeter/app/config"
2121
"github.com/openmeterio/openmeter/openmeter/debug"
22-
"github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver"
2322
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest"
2423
"github.com/openmeterio/openmeter/openmeter/namespace"
25-
"github.com/openmeterio/openmeter/openmeter/namespace/namespacedriver"
2624
"github.com/openmeterio/openmeter/openmeter/server"
2725
"github.com/openmeterio/openmeter/openmeter/server/router"
2826
"github.com/openmeterio/openmeter/pkg/errorsx"
@@ -112,14 +110,6 @@ func main() {
112110
os.Exit(1)
113111
}
114112

115-
// Initialize HTTP Ingest handler
116-
ingestHandler := ingestdriver.NewIngestEventsHandler(
117-
app.IngestService.IngestEvents,
118-
namespacedriver.StaticNamespaceDecoder(app.NamespaceManager.GetDefaultNamespace()),
119-
nil,
120-
errorsx.NewSlogHandler(logger),
121-
)
122-
123113
// Initialize debug connector
124114
debugConnector := debug.NewDebugConnector(app.StreamingConnector)
125115

@@ -165,7 +155,7 @@ func main() {
165155
FeatureConnector: app.EntitlementRegistry.Feature,
166156
GrantConnector: app.EntitlementRegistry.Grant,
167157
GrantRepo: app.EntitlementRegistry.GrantRepo,
168-
IngestHandler: ingestHandler,
158+
IngestService: app.IngestService,
169159
Logger: logger,
170160
MeterManageService: app.MeterManageService,
171161
MeterEventService: app.MeterEventService,

cmd/server/wire.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type Application struct {
5656
FeatureConnector feature.FeatureConnector
5757
FeatureFlags ffx.Service
5858
IngestCollector ingest.Collector
59-
IngestService *ingest.Service
59+
IngestService ingest.Service
6060
KafkaProducer *kafka.Producer
6161
KafkaMetrics *kafkametrics.Metrics
6262
KafkaIngestNamespaceHandler *kafkaingest.NamespaceHandler

cmd/server/wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package httpdriver
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
"github.com/openmeterio/openmeter/pkg/framework/commonhttp"
8+
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport/encoder"
9+
)
10+
11+
type ErrorInvalidContentType struct {
12+
ContentType string
13+
}
14+
15+
func (e ErrorInvalidContentType) Error() string {
16+
// return "invalid content type"
17+
18+
return "invalid content type: " + e.ContentType
19+
}
20+
21+
func (e ErrorInvalidContentType) Message() string {
22+
return "invalid content type: " + e.ContentType
23+
}
24+
25+
func (e ErrorInvalidContentType) Details() map[string]any {
26+
return map[string]any{
27+
"contentType": e.ContentType,
28+
}
29+
}
30+
31+
type ErrorInvalidEvent struct {
32+
Err error
33+
}
34+
35+
func (e ErrorInvalidEvent) Error() string {
36+
// return "invalid event"
37+
38+
return "invalid event: " + e.Err.Error()
39+
}
40+
41+
func (e ErrorInvalidEvent) Message() string {
42+
return "invalid event: " + e.Err.Error()
43+
}
44+
45+
func errorEncoder() encoder.ErrorEncoder {
46+
return func(ctx context.Context, err error, w http.ResponseWriter, r *http.Request) bool {
47+
return commonhttp.HandleErrorIfTypeMatches[ErrorInvalidContentType](ctx, http.StatusBadRequest, err, w) ||
48+
commonhttp.HandleErrorIfTypeMatches[ErrorInvalidEvent](ctx, http.StatusBadRequest, err, w)
49+
}
50+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package httpdriver
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
8+
"github.com/openmeterio/openmeter/openmeter/ingest"
9+
"github.com/openmeterio/openmeter/openmeter/namespace/namespacedriver"
10+
"github.com/openmeterio/openmeter/pkg/framework/commonhttp"
11+
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
12+
)
13+
14+
type Handler interface {
15+
IngestHandler
16+
}
17+
18+
type IngestHandler interface {
19+
IngestEvents() IngestEventsHandler
20+
}
21+
22+
type handler struct {
23+
service ingest.Service
24+
namespaceDecoder namespacedriver.NamespaceDecoder
25+
options []httptransport.HandlerOption
26+
}
27+
28+
func (h *handler) resolveNamespace(ctx context.Context) (string, error) {
29+
ns, ok := h.namespaceDecoder.GetNamespace(ctx)
30+
if !ok {
31+
return "", commonhttp.NewHTTPError(http.StatusInternalServerError, errors.New("internal server error"))
32+
}
33+
34+
return ns, nil
35+
}
36+
37+
func New(
38+
namespaceDecoder namespacedriver.NamespaceDecoder,
39+
service ingest.Service,
40+
options ...httptransport.HandlerOption,
41+
) Handler {
42+
return &handler{
43+
namespaceDecoder: namespaceDecoder,
44+
service: service,
45+
options: options,
46+
}
47+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package httpdriver
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"net/http"
8+
9+
"github.com/cloudevents/sdk-go/v2/event"
10+
11+
"github.com/openmeterio/openmeter/api"
12+
"github.com/openmeterio/openmeter/openmeter/ingest"
13+
"github.com/openmeterio/openmeter/pkg/framework/commonhttp"
14+
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
15+
)
16+
17+
type (
18+
IngestEventsRequest = ingest.IngestEventsRequest
19+
IngestEventsResponse = struct{}
20+
IngestEventsHandler httptransport.Handler[IngestEventsRequest, IngestEventsResponse]
21+
)
22+
23+
func (h *handler) IngestEvents() IngestEventsHandler {
24+
return httptransport.NewHandler(
25+
func(ctx context.Context, r *http.Request) (IngestEventsRequest, error) {
26+
var req ingest.IngestEventsRequest
27+
28+
namespace, err := h.resolveNamespace(ctx)
29+
if err != nil {
30+
return req, err
31+
}
32+
33+
req.Namespace = namespace
34+
35+
contentType := r.Header.Get("Content-Type")
36+
37+
switch contentType {
38+
case "application/json":
39+
var apiRequest api.IngestEventsBody
40+
41+
err := json.NewDecoder(r.Body).Decode(&apiRequest)
42+
if err != nil {
43+
return req, ErrorInvalidEvent{
44+
Err: err,
45+
}
46+
}
47+
48+
// Try to parse as a single event
49+
e, err := apiRequest.AsEvent()
50+
if err == nil {
51+
req.Events = []event.Event{e}
52+
} else {
53+
// Try to parse as a batch of events
54+
e, err := apiRequest.AsIngestEventsBody1()
55+
if err == nil {
56+
req.Events = e
57+
}
58+
}
59+
60+
// If we still don't have any events, return an error
61+
if len(req.Events) == 0 {
62+
return req, ErrorInvalidEvent{
63+
Err: errors.New("no events found"),
64+
}
65+
}
66+
case "application/cloudevents+json":
67+
var apiRequest api.IngestEventsApplicationCloudeventsPlusJSONRequestBody
68+
69+
err := json.NewDecoder(r.Body).Decode(&apiRequest)
70+
if err != nil {
71+
return req, ErrorInvalidEvent{
72+
Err: err,
73+
}
74+
}
75+
76+
req.Events = []event.Event{apiRequest}
77+
case "application/cloudevents-batch+json":
78+
var apiRequest api.IngestEventsApplicationCloudeventsBatchPlusJSONBody
79+
80+
err := json.NewDecoder(r.Body).Decode(&apiRequest)
81+
if err != nil {
82+
return req, ErrorInvalidEvent{
83+
Err: err,
84+
}
85+
}
86+
87+
req.Events = apiRequest
88+
default:
89+
return req, ErrorInvalidContentType{ContentType: contentType}
90+
}
91+
92+
return req, nil
93+
},
94+
func(ctx context.Context, params IngestEventsRequest) (IngestEventsResponse, error) {
95+
_, err := h.service.IngestEvents(ctx, params)
96+
if err != nil {
97+
return IngestEventsResponse{}, err
98+
}
99+
100+
return IngestEventsResponse{}, nil
101+
},
102+
commonhttp.EmptyResponseEncoder[IngestEventsResponse](http.StatusNoContent),
103+
httptransport.AppendOptions(
104+
h.options,
105+
httptransport.WithOperationName("IngestEvents"),
106+
httptransport.WithErrorEncoder(errorEncoder()),
107+
)...,
108+
)
109+
}

openmeter/ingest/ingestdriver/http_transport_test.go renamed to openmeter/ingest/httpdriver/ingest_test.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package ingestdriver_test
1+
package httpdriver_test
22

33
import (
44
"bytes"
@@ -16,27 +16,25 @@ import (
1616
"github.com/stretchr/testify/require"
1717

1818
"github.com/openmeterio/openmeter/openmeter/ingest"
19-
"github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver"
19+
"github.com/openmeterio/openmeter/openmeter/ingest/httpdriver"
2020
"github.com/openmeterio/openmeter/openmeter/namespace/namespacedriver"
21-
"github.com/openmeterio/openmeter/pkg/errorsx"
2221
)
2322

2423
func TestIngestEvents(t *testing.T) {
2524
collector := ingest.NewInMemoryCollector()
2625

27-
service := ingest.Service{
26+
ingestSvc, err := ingest.NewService(ingest.Config{
2827
Collector: collector,
2928
Logger: slog.Default(),
30-
}
29+
})
30+
require.NoError(t, err)
3131

32-
handler := ingestdriver.NewIngestEventsHandler(
33-
service.IngestEvents,
32+
handler := httpdriver.New(
3433
namespacedriver.StaticNamespaceDecoder("test"),
35-
nil,
36-
errorsx.NewNopHandler(),
34+
ingestSvc,
3735
)
3836

39-
server := httptest.NewServer(handler)
37+
server := httptest.NewServer(handler.IngestEvents())
4038
client := server.Client()
4139

4240
now := time.Date(2023, 0o6, 15, 14, 33, 0o0, 0o0, time.UTC)
@@ -49,7 +47,7 @@ func TestIngestEvents(t *testing.T) {
4947

5048
var buf bytes.Buffer
5149

52-
err := json.NewEncoder(&buf).Encode(ev)
50+
err = json.NewEncoder(&buf).Encode(ev)
5351
require.NoError(t, err)
5452

5553
resp, err := client.Post(server.URL, "application/cloudevents+json", &buf)
@@ -74,19 +72,18 @@ func TestIngestEvents(t *testing.T) {
7472
func TestIngestEvents_InvalidEvent(t *testing.T) {
7573
collector := ingest.NewInMemoryCollector()
7674

77-
service := ingest.Service{
75+
ingestSvc, err := ingest.NewService(ingest.Config{
7876
Collector: collector,
7977
Logger: slog.Default(),
80-
}
78+
})
79+
require.NoError(t, err)
8180

82-
handler := ingestdriver.NewIngestEventsHandler(
83-
service.IngestEvents,
81+
handler := httpdriver.New(
8482
namespacedriver.StaticNamespaceDecoder("test"),
85-
nil,
86-
errorsx.NewNopHandler(),
83+
ingestSvc,
8784
)
8885

89-
server := httptest.NewServer(handler)
86+
server := httptest.NewServer(handler.IngestEvents())
9087
client := server.Client()
9188

9289
resp, err := client.Post(server.URL, "application/cloudevents+json", bytes.NewBuffer([]byte(`invalid`)))
@@ -100,19 +97,18 @@ func TestIngestEvents_InvalidEvent(t *testing.T) {
10097
func TestBatchHandler(t *testing.T) {
10198
collector := ingest.NewInMemoryCollector()
10299

103-
service := ingest.Service{
100+
ingestSvc, err := ingest.NewService(ingest.Config{
104101
Collector: collector,
105102
Logger: slog.Default(),
106-
}
103+
})
104+
require.NoError(t, err)
107105

108-
handler := ingestdriver.NewIngestEventsHandler(
109-
service.IngestEvents,
106+
handler := httpdriver.New(
110107
namespacedriver.StaticNamespaceDecoder("test"),
111-
nil,
112-
errorsx.NewNopHandler(),
108+
ingestSvc,
113109
)
114110

115-
server := httptest.NewServer(handler)
111+
server := httptest.NewServer(handler.IngestEvents())
116112
client := server.Client()
117113

118114
now := time.Date(2023, 0o6, 15, 14, 33, 0o0, 0o0, time.UTC)
@@ -130,7 +126,7 @@ func TestBatchHandler(t *testing.T) {
130126
}
131127

132128
var buf bytes.Buffer
133-
err := json.NewEncoder(&buf).Encode(events)
129+
err = json.NewEncoder(&buf).Encode(events)
134130
require.NoError(t, err)
135131

136132
resp, err := client.Post(server.URL, "application/cloudevents-batch+json", &buf)

0 commit comments

Comments
 (0)