Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions internal/gateway/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
package gateway

import (
"context"
"fmt"
"net/http"
"sync/atomic"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/common-go/authz"
"github.com/redpanda-data/common-go/authz/loader"
Expand Down Expand Up @@ -126,3 +131,35 @@ func AuthzMiddleware(
next.ServeHTTP(w, req)
})
}

// GRPCUnaryAuthzInterceptor returns a gRPC unary interceptor that enforces
// authorization checks for the given permission before invoking the handler.
// If the principal is missing or unauthorized, it returns PermissionDenied.
func GRPCUnaryAuthzInterceptor(
policy *FileWatchingAuthzResourcePolicy,
perm authz.PermissionName,
) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
principal, ok := ValidatedPrincipalIDFromContext(ctx)
if !ok || !policy.Authorizer(perm).Check(principal) {
return nil, status.Error(codes.PermissionDenied, "permission denied")
}
return handler(ctx, req)
}
}

// GRPCStreamAuthzInterceptor returns a gRPC stream interceptor that enforces
// authorization checks for the given permission before invoking the handler.
// If the principal is missing or unauthorized, it returns PermissionDenied.
func GRPCStreamAuthzInterceptor(
policy *FileWatchingAuthzResourcePolicy,
perm authz.PermissionName,
) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
principal, ok := ValidatedPrincipalIDFromContext(ss.Context())
if !ok || !policy.Authorizer(perm).Check(principal) {
return status.Error(codes.PermissionDenied, "permission denied")
}
return handler(srv, ss)
}
}
205 changes: 205 additions & 0 deletions internal/gateway/authz_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed as a Redpanda Enterprise file under the Redpanda Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md

package gateway_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/redpanda-data/common-go/authz"
"github.com/redpanda-data/connect/v4/internal/gateway"
)

// testUnaryHandler is a simple gRPC unary handler for testing
func testUnaryHandler(_ context.Context, _ any) (any, error) {
return "OK", nil
}

// testStreamHandler is a simple gRPC stream handler for testing
func testStreamHandler(_ any, _ grpc.ServerStream) error {
return nil
}

// mockServerStream implements grpc.ServerStream for testing
type mockServerStream struct {
grpc.ServerStream
ctx context.Context
}

func (m *mockServerStream) Context() context.Context {
return m.ctx
}

func TestGRPCUnaryAuthzInterceptorAllowAll(t *testing.T) {
t.Log("Given: Policy file granting all permissions")
policy := setupPolicy(t, "testdata/policies/allow_all.yaml")
defer policy.Close()

t.Log("And: Unary interceptor with read permission")
interceptor := gateway.GRPCUnaryAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Request with valid principal in context")
ctx := gateway.ContextWithValidatedPrincipalID(context.Background(), authzTestPrincipal)
result, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, testUnaryHandler)

t.Log("Then: Request succeeds")
require.NoError(t, err)
assert.Equal(t, "OK", result)
}

func TestGRPCUnaryAuthzInterceptorDenyAll(t *testing.T) {
t.Log("Given: Policy file denying all permissions")
policy := setupPolicy(t, "testdata/policies/deny_all.yaml")
defer policy.Close()

t.Log("And: Unary interceptor with read permission")
interceptor := gateway.GRPCUnaryAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Request with valid principal but no permissions")
ctx := gateway.ContextWithValidatedPrincipalID(context.Background(), authzTestPrincipal)
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, testUnaryHandler)

t.Log("Then: Request fails with PermissionDenied")
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
}

func TestGRPCUnaryAuthzInterceptorNoPrincipal(t *testing.T) {
t.Log("Given: Policy file granting all permissions")
policy := setupPolicy(t, "testdata/policies/allow_all.yaml")
defer policy.Close()

t.Log("And: Unary interceptor with read permission")
interceptor := gateway.GRPCUnaryAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Request without principal in context")
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{}, testUnaryHandler)

t.Log("Then: Request fails with PermissionDenied")
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
}

func TestGRPCUnaryAuthzInterceptorSelective(t *testing.T) {
t.Log("Given: Policy file granting only read permission")
policy := setupPolicy(t, "testdata/policies/selective.yaml")
defer policy.Close()

tests := []struct {
name string
perm string
wantErr bool
wantCode codes.Code
}{
{
name: "allowed_read",
perm: string(authzTestPermRead),
wantErr: false,
},
{
name: "denied_write",
perm: string(authzTestPermWrite),
wantErr: true,
wantCode: codes.PermissionDenied,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Logf("When: Request requires %s permission", tc.perm)
interceptor := gateway.GRPCUnaryAuthzInterceptor(policy, authz.PermissionName(tc.perm))
ctx := gateway.ContextWithValidatedPrincipalID(context.Background(), authzTestPrincipal)
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, testUnaryHandler)

if tc.wantErr {
t.Log("Then: Request fails with PermissionDenied")
require.Error(t, err)
assert.Equal(t, tc.wantCode, status.Code(err))
} else {
t.Log("Then: Request succeeds")
require.NoError(t, err)
}
})
}
}

func TestGRPCUnaryAuthzInterceptorWrongPrincipal(t *testing.T) {
t.Log("Given: Policy file granting permissions to specific principal")
policy := setupPolicy(t, "testdata/policies/allow_all.yaml")
defer policy.Close()

t.Log("And: Unary interceptor with read permission")
interceptor := gateway.GRPCUnaryAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Request with different principal not in policy")
ctx := gateway.ContextWithValidatedPrincipalID(context.Background(), authzOtherPrincipal)
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, testUnaryHandler)

t.Log("Then: Request fails with PermissionDenied")
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
}

func TestGRPCStreamAuthzInterceptorAllowAll(t *testing.T) {
t.Log("Given: Policy file granting all permissions")
policy := setupPolicy(t, "testdata/policies/allow_all.yaml")
defer policy.Close()

t.Log("And: Stream interceptor with read permission")
interceptor := gateway.GRPCStreamAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Stream request with valid principal in context")
ctx := gateway.ContextWithValidatedPrincipalID(context.Background(), authzTestPrincipal)
ss := &mockServerStream{ctx: ctx}
err := interceptor(nil, ss, &grpc.StreamServerInfo{}, testStreamHandler)

t.Log("Then: Request succeeds")
require.NoError(t, err)
}

func TestGRPCStreamAuthzInterceptorDenyAll(t *testing.T) {
t.Log("Given: Policy file denying all permissions")
policy := setupPolicy(t, "testdata/policies/deny_all.yaml")
defer policy.Close()

t.Log("And: Stream interceptor with read permission")
interceptor := gateway.GRPCStreamAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Stream request with valid principal but no permissions")
ctx := gateway.ContextWithValidatedPrincipalID(context.Background(), authzTestPrincipal)
ss := &mockServerStream{ctx: ctx}
err := interceptor(nil, ss, &grpc.StreamServerInfo{}, testStreamHandler)

t.Log("Then: Request fails with PermissionDenied")
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
}

func TestGRPCStreamAuthzInterceptorNoPrincipal(t *testing.T) {
t.Log("Given: Policy file granting all permissions")
policy := setupPolicy(t, "testdata/policies/allow_all.yaml")
defer policy.Close()

t.Log("And: Stream interceptor with read permission")
interceptor := gateway.GRPCStreamAuthzInterceptor(policy, authzTestPermRead)

t.Log("When: Stream request without principal in context")
ss := &mockServerStream{ctx: context.Background()}
err := interceptor(nil, ss, &grpc.StreamServerInfo{}, testStreamHandler)

t.Log("Then: Request fails with PermissionDenied")
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
}
104 changes: 104 additions & 0 deletions internal/gateway/gatewaytest/mockoidc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package gatewaytest provides test utilities for gateway components.
package gatewaytest

import (
"encoding/json"
"testing"
"time"

"github.com/golang-jwt/jwt/v5"
"github.com/oauth2-proxy/mockoidc"
"github.com/stretchr/testify/require"
)

// RedpandaUser implements mockoidc.User with Redpanda custom claims.
type RedpandaUser struct {
Subject string
Email string
OrgID string
}

// ID returns the user's subject identifier.
func (u *RedpandaUser) ID() string {
return u.Subject
}

// Userinfo returns the user info claims as JSON.
func (u *RedpandaUser) Userinfo(_ []string) ([]byte, error) {
info := map[string]any{
"sub": u.Subject,
"email": u.Email,
}
return json.Marshal(info)
}

// Claims returns JWT claims with Redpanda custom claims.
func (u *RedpandaUser) Claims(_ []string, claims *mockoidc.IDTokenClaims) (jwt.Claims, error) {
claims.Subject = u.Subject

cc := map[string]any{
"iss": claims.Issuer,
"sub": u.Subject,
"aud": claims.Audience,
"exp": claims.ExpiresAt.Unix(),
"iat": claims.IssuedAt.Unix(),
"https://cloud.redpanda.com/organization_id": u.OrgID,
"account_info": map[string]any{
"email": u.Email,
},
}
return jwt.MapClaims(cc), nil
}

// SetupMockOIDC creates a mockoidc server with Redpanda custom claims support.
// The server is automatically shut down when the test completes.
func SetupMockOIDC(t *testing.T) (*mockoidc.MockOIDC, string) {
t.Helper()

m, err := mockoidc.Run()
require.NoError(t, err)

t.Cleanup(func() {
if err := m.Shutdown(); err != nil {
t.Log(err)
}
})

return m, m.Issuer()
}

// AccessToken performs OAuth flow with mockoidc to get a valid access token.
func AccessToken(t *testing.T, m *mockoidc.MockOIDC, user mockoidc.User) string {
t.Helper()

m.QueueUser(user)
claims, err := user.Claims([]string{"openid", "email"}, &mockoidc.IDTokenClaims{
RegisteredClaims: &jwt.RegisteredClaims{
Issuer: m.Issuer(),
Subject: user.ID(),
Audience: jwt.ClaimStrings{"test-audience"},
IssuedAt: jwt.NewNumericDate(m.Now()),
ExpiresAt: jwt.NewNumericDate(m.Now().Add(time.Hour)),
},
})
require.NoError(t, err)

token, err := m.Keypair.SignJWT(claims)
require.NoError(t, err)

return token
}
Loading