Skip to content

Commit 43640ba

Browse files
committed
Implement unsecured-jwt-info plugin
1 parent 889a018 commit 43640ba

File tree

7 files changed

+193
-25
lines changed

7 files changed

+193
-25
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ plugin.google-id-provider:
6464
plugin.google-id-info:
6565
CGO_ENABLED=0 go build -o build/google-id-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-googleid-info/main.go
6666

67-
all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info
67+
plugin.unsecured-jwt-info:
68+
CGO_ENABLED=0 go build -o build/unsecured-jwt-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-info/main.go
69+
70+
all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info
6871

6972
clean:
7073
@rm -rf build

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ See:
170170
--auth-local-param "--user-attr=uid" \
171171
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
172172
173+
make clean build plugin.unsecured-jwt-info && build/kafka-proxy server \
174+
--auth-local-enable \
175+
--auth-local-command build/unsecured-jwt-info \
176+
--auth-local-mechanism "OAUTHBEARER" \
177+
--auth-local-param "--claim-sub=alice" \
178+
--auth-local-param "--claim-sub=bob" \
179+
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
180+
173181
### Kafka Gateway example
174182
175183
Authentication between Kafka Proxy Client and Kafka Proxy Server with Google-ID (service account JWT)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/base64"
7+
"encoding/json"
8+
"errors"
9+
"flag"
10+
"github.com/grepplabs/kafka-proxy/pkg/apis"
11+
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
12+
"github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
13+
"github.com/hashicorp/go-plugin"
14+
"github.com/sirupsen/logrus"
15+
"os"
16+
"strings"
17+
"time"
18+
)
19+
20+
const (
21+
StatusOK = 0
22+
StatusEmptyToken = 1
23+
StatusParseJWTFailed = 2
24+
StatusWrongAlgorithm = 3
25+
StatusUnauthorized = 4
26+
StatusNoIssueTimeInToken = 5
27+
StatusNoExpirationTimeInToken = 6
28+
StatusTokenTooEarly = 7
29+
StatusTokenExpired = 8
30+
31+
AlgorithmNone = "none"
32+
)
33+
34+
var (
35+
clockSkew = 1 * time.Minute
36+
)
37+
38+
type UnsecuredJWTVerifier struct {
39+
claimSub map[string]struct{}
40+
}
41+
42+
type pluginMeta struct {
43+
claimSub util.ArrayFlags
44+
}
45+
46+
func (f *pluginMeta) flagSet() *flag.FlagSet {
47+
fs := flag.NewFlagSet("unsecured-jwt-info info settings", flag.ContinueOnError)
48+
return fs
49+
}
50+
51+
// Implements apis.TokenInfo
52+
func (v UnsecuredJWTVerifier) VerifyToken(ctx context.Context, request apis.VerifyRequest) (apis.VerifyResponse, error) {
53+
if request.Token == "" {
54+
return getVerifyResponseResponse(StatusEmptyToken)
55+
}
56+
57+
header, claimSet, err := Decode(request.Token)
58+
if err != nil {
59+
return getVerifyResponseResponse(StatusParseJWTFailed)
60+
}
61+
if header.Algorithm != AlgorithmNone {
62+
return getVerifyResponseResponse(StatusWrongAlgorithm)
63+
}
64+
65+
if len(v.claimSub) != 0 {
66+
if _, ok := v.claimSub[claimSet.Sub]; !ok {
67+
return getVerifyResponseResponse(StatusUnauthorized)
68+
}
69+
}
70+
if claimSet.Iat < 1 {
71+
return getVerifyResponseResponse(StatusNoIssueTimeInToken)
72+
}
73+
if claimSet.Exp < 1 {
74+
return getVerifyResponseResponse(StatusNoExpirationTimeInToken)
75+
}
76+
77+
earliest := int64(claimSet.Iat) - int64(clockSkew.Seconds())
78+
latest := int64(claimSet.Exp) + int64(clockSkew.Seconds())
79+
unix := time.Now().Unix()
80+
81+
if unix < earliest {
82+
return getVerifyResponseResponse(StatusTokenTooEarly)
83+
}
84+
if unix > latest {
85+
return getVerifyResponseResponse(StatusTokenExpired)
86+
}
87+
return getVerifyResponseResponse(StatusOK)
88+
}
89+
90+
type Header struct {
91+
Algorithm string `json:"alg"`
92+
}
93+
94+
// kafka client sends float instead of int
95+
type ClaimSet struct {
96+
Sub string `json:"sub,omitempty"`
97+
Exp float64 `json:"exp"`
98+
Iat float64 `json:"iat"`
99+
OtherClaims map[string]interface{} `json:"-"`
100+
}
101+
102+
func Decode(token string) (*Header, *ClaimSet, error) {
103+
args := strings.Split(token, ".")
104+
if len(args) < 2 {
105+
return nil, nil, errors.New("jws: invalid token received")
106+
}
107+
decodedHeader, err := base64.RawURLEncoding.DecodeString(args[0])
108+
if err != nil {
109+
return nil, nil, err
110+
}
111+
decodedPayload, err := base64.RawURLEncoding.DecodeString(args[1])
112+
if err != nil {
113+
return nil, nil, err
114+
}
115+
116+
header := &Header{}
117+
err = json.NewDecoder(bytes.NewBuffer(decodedHeader)).Decode(header)
118+
if err != nil {
119+
return nil, nil, err
120+
}
121+
claimSet := &ClaimSet{}
122+
err = json.NewDecoder(bytes.NewBuffer(decodedPayload)).Decode(claimSet)
123+
if err != nil {
124+
return nil, nil, err
125+
}
126+
return header, claimSet, nil
127+
}
128+
129+
func getVerifyResponseResponse(status int) (apis.VerifyResponse, error) {
130+
success := status == StatusOK
131+
return apis.VerifyResponse{Success: success, Status: int32(status)}, nil
132+
}
133+
134+
func main() {
135+
pluginMeta := &pluginMeta{}
136+
fs := pluginMeta.flagSet()
137+
fs.Var(&pluginMeta.claimSub, "claim-sub", "Allowed subject claim (user name)")
138+
fs.Parse(os.Args[1:])
139+
140+
logrus.Infof("Unsecured JWT sub claims: %v", pluginMeta.claimSub)
141+
142+
unsecuredJWTVerifier := &UnsecuredJWTVerifier{
143+
claimSub: pluginMeta.claimSub.AsMap(),
144+
}
145+
146+
plugin.Serve(&plugin.ServeConfig{
147+
HandshakeConfig: shared.Handshake,
148+
Plugins: map[string]plugin.Plugin{
149+
"unsecuredJWTInfo": &shared.TokenInfoPlugin{Impl: unsecuredJWTVerifier},
150+
},
151+
// A non-nil value here enables gRPC serving for this plugin...
152+
GRPCServer: plugin.DefaultGRPCServer,
153+
})
154+
}

pkg/libs/googleid-info/factory.go

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package googleidinfo
22

33
import (
44
"flag"
5-
"fmt"
65
"github.com/grepplabs/kafka-proxy/pkg/apis"
6+
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
77
"github.com/grepplabs/kafka-proxy/pkg/registry"
88
)
99

@@ -20,27 +20,8 @@ func (f *pluginMeta) flagSet() *flag.FlagSet {
2020
type pluginMeta struct {
2121
timeout int
2222
certsRefreshInterval int
23-
audience arrayFlags
24-
emailsRegex arrayFlags
25-
}
26-
27-
type arrayFlags []string
28-
29-
func (i *arrayFlags) String() string {
30-
return fmt.Sprintf("%v", *i)
31-
}
32-
33-
func (i *arrayFlags) Set(value string) error {
34-
*i = append(*i, value)
35-
return nil
36-
}
37-
38-
func (i *arrayFlags) asMap() map[string]struct{} {
39-
result := make(map[string]struct{})
40-
for _, elem := range *i {
41-
result[elem] = struct{}{}
42-
}
43-
return result
23+
audience util.ArrayFlags
24+
emailsRegex util.ArrayFlags
4425
}
4526

4627
type Factory struct {

pkg/libs/util/flags.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package util
2+
3+
import "fmt"
4+
5+
type ArrayFlags []string
6+
7+
func (i *ArrayFlags) String() string {
8+
return fmt.Sprintf("%v", *i)
9+
}
10+
11+
func (i *ArrayFlags) Set(value string) error {
12+
*i = append(*i, value)
13+
return nil
14+
}
15+
16+
func (i *ArrayFlags) AsMap() map[string]struct{} {
17+
result := make(map[string]struct{})
18+
for _, elem := range *i {
19+
result[elem] = struct{}{}
20+
}
21+
return result
22+
}

proxy/auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (b *AuthServer) receiveAndSendGatewayAuth(conn DeadlineReaderWriter) error
118118
return err
119119
}
120120
if !resp.Success {
121-
return fmt.Errorf("verify token failed with status: %d", resp.Status)
121+
return fmt.Errorf("gateway server verify token failed with status: %d", resp.Status)
122122
}
123123

124124
logrus.Debugf("gateway handshake payload: %s", data)

proxy/sasl_local_auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (p *LocalSaslOauth) doLocalAuth(saslAuthBytes []byte) (err error) {
7070
return err
7171
}
7272
if !resp.Success {
73-
return fmt.Errorf("verify token failed with status: %d", resp.Status)
73+
return fmt.Errorf("local oauth verify token failed with status: %d", resp.Status)
7474
}
7575
return nil
7676
}

0 commit comments

Comments
 (0)