@@ -29,6 +29,7 @@ import (
2929
3030 "github.com/redpanda-data/benthos/v4/public/service"
3131 "github.com/redpanda-data/benthos/v4/public/utils/netutil"
32+ "github.com/redpanda-data/common-go/authz"
3233 "github.com/redpanda-data/connect/v4/internal/gateway"
3334 "github.com/redpanda-data/connect/v4/internal/impl/otlp/otlpconv"
3435 "github.com/redpanda-data/connect/v4/internal/license"
@@ -46,6 +47,8 @@ const (
4647 defaultHTTPReadTimeout = 10 * time .Second
4748 defaultHTTPWriteTimeout = 10 * time .Second
4849 defaultHTTPMaxBodySize = 4 * 1024 * 1024 // 4MB
50+
51+ otlpHTTPPermission authz.PermissionName = "dataplane_otlp_http"
4952)
5053
5154type httpInputConfig struct {
@@ -176,10 +179,11 @@ An optional rate limit resource can be specified to throttle incoming requests.
176179// httpOTLPInput is the HTTP-specific OTLP input
177180type httpOTLPInput struct {
178181 otlpInput
179- conf httpInputConfig
180- rpJWT * gateway.RPJWTMiddleware
181- cors gateway.CORSConfig
182- server * http.Server
182+ conf httpInputConfig
183+ authzPolicy * gateway.FileWatchingAuthzResourcePolicy
184+ rpJWT * gateway.RPJWTMiddleware
185+ cors gateway.CORSConfig
186+ server * http.Server
183187}
184188
185189// HTTPInputFromParsed creates an OTLP HTTP input from a parsed config.
@@ -224,6 +228,22 @@ func HTTPInputFromParsed(pConf *service.ParsedConfig, mgr *service.Resources) (s
224228 return nil , fmt .Errorf ("parse tcp config: %w" , err )
225229 }
226230
231+ // Initialize authorization policy if configured
232+ var authzPolicy * gateway.FileWatchingAuthzResourcePolicy
233+ if authzConf , ok := gateway .ManagerAuthzConfig (mgr ); ok {
234+ authzPolicy , err = gateway .NewFileWatchingAuthzResourcePolicy (
235+ authzConf .ResourceName ,
236+ authzConf .PolicyFile ,
237+ []authz.PermissionName {otlpHTTPPermission },
238+ func (err error ) {
239+ mgr .Logger ().With ("error" , err ).Error ("Authorization policy error" )
240+ },
241+ )
242+ if err != nil {
243+ return nil , fmt .Errorf ("initialize authorization policy: %w" , err )
244+ }
245+ }
246+
227247 // Initialize HTTP-specific middleware
228248 rpJWT , err := gateway .NewRPJWTMiddleware (mgr )
229249 if err != nil {
@@ -234,11 +254,13 @@ func HTTPInputFromParsed(pConf *service.ParsedConfig, mgr *service.Resources) (s
234254 if err != nil {
235255 return nil , err
236256 }
257+
237258 return & httpOTLPInput {
238- otlpInput : otlpIn ,
239- conf : conf ,
240- rpJWT : rpJWT ,
241- cors : gateway .NewCORSConfigFromEnv (),
259+ otlpInput : otlpIn ,
260+ conf : conf ,
261+ authzPolicy : authzPolicy ,
262+ rpJWT : rpJWT ,
263+ cors : gateway .NewCORSConfigFromEnv (),
242264 }, nil
243265}
244266
@@ -262,7 +284,11 @@ func (hi *httpOTLPInput) Connect(ctx context.Context) error {
262284 }
263285
264286 h := hi .handler ()
265- h = hi .cors .WrapHandler (hi .rpJWT .Wrap (h ))
287+ if hi .authzPolicy != nil {
288+ h = gateway .AuthzMiddleware (hi .authzPolicy , otlpHTTPPermission , h )
289+ }
290+ h = hi .rpJWT .Wrap (h )
291+ h = hi .cors .WrapHandler (h )
266292 hi .server = & http.Server {
267293 Addr : hi .conf .Address ,
268294 Handler : h ,
@@ -314,7 +340,7 @@ func (hi *httpOTLPInput) Close(ctx context.Context) error {
314340 defer hi .shutSig .TriggerHasStopped ()
315341
316342 if hi .server == nil {
317- return nil
343+ return hi . authzPolicy . Close ()
318344 }
319345
320346 // Shutdown HTTP server gracefully
@@ -329,7 +355,7 @@ func (hi *httpOTLPInput) Close(ctx context.Context) error {
329355 }
330356 }
331357
332- return nil
358+ return hi . authzPolicy . Close ()
333359}
334360
335361const (
0 commit comments