Skip to content

Commit 7937d34

Browse files
JoshVanLItalyPaleAleyaron2berndverst
authored
Pubsub pulsar authentication ~OIDC~ OAuth2 (#3026)
Signed-off-by: joshvanl <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Bernd Verst <[email protected]>
1 parent 566c7fd commit 7937d34

File tree

22 files changed

+874
-80
lines changed

22 files changed

+874
-80
lines changed

.github/workflows/certification.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ jobs:
258258
set +e
259259
gotestsum --jsonfile ${{ env.TEST_OUTPUT_FILE_PREFIX }}_certification.json \
260260
--junitfile ${{ env.TEST_OUTPUT_FILE_PREFIX }}_certification.xml --format standard-quiet -- \
261-
-coverprofile=cover.out -covermode=set -tags=certtests -coverpkg=${{ matrix.source-pkg }}
261+
-coverprofile=cover.out -covermode=set -tags=certtests -timeout=30m -coverpkg=${{ matrix.source-pkg }}
262262
status=$?
263263
echo "Completed certification tests for ${{ matrix.component }} ... "
264264
if test $status -ne 0; then
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package oauth2
15+
16+
import (
17+
"context"
18+
"crypto/tls"
19+
"crypto/x509"
20+
"errors"
21+
"fmt"
22+
"net/http"
23+
"net/url"
24+
"sync"
25+
"time"
26+
27+
"golang.org/x/oauth2"
28+
ccreds "golang.org/x/oauth2/clientcredentials"
29+
30+
"github.com/dapr/kit/logger"
31+
)
32+
33+
// ClientCredentialsMetadata is the metadata fields which can be used by a
34+
// component to configure an OIDC client_credentials token source.
35+
type ClientCredentialsMetadata struct {
36+
TokenCAPEM string `mapstructure:"oauth2TokenCAPEM"`
37+
TokenURL string `mapstructure:"oauth2TokenURL"`
38+
ClientID string `mapstructure:"oauth2ClientID"`
39+
ClientSecret string `mapstructure:"oauth2ClientSecret"`
40+
Audiences []string `mapstructure:"oauth2Audiences"`
41+
Scopes []string `mapstructure:"oauth2Scopes"`
42+
}
43+
44+
type ClientCredentialsOptions struct {
45+
Logger logger.Logger
46+
TokenURL string
47+
ClientID string
48+
ClientSecret string
49+
Scopes []string
50+
Audiences []string
51+
CAPEM []byte
52+
}
53+
54+
// ClientCredentials is an OAuth2 Token Source that uses the client_credentials
55+
// grant type to fetch a token.
56+
type ClientCredentials struct {
57+
log logger.Logger
58+
currentToken *oauth2.Token
59+
httpClient *http.Client
60+
fetchTokenFn func(context.Context) (*oauth2.Token, error)
61+
62+
lock sync.RWMutex
63+
}
64+
65+
func NewClientCredentials(ctx context.Context, opts ClientCredentialsOptions) (*ClientCredentials, error) {
66+
conf, httpClient, err := opts.toConfig()
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
token, err := conf.Token(context.WithValue(ctx, oauth2.HTTPClient, httpClient))
72+
if err != nil {
73+
return nil, fmt.Errorf("error fetching initial oauth2 client_credentials token: %w", err)
74+
}
75+
76+
opts.Logger.Info("Fetched initial oauth2 client_credentials token")
77+
78+
return &ClientCredentials{
79+
log: opts.Logger,
80+
currentToken: token,
81+
httpClient: httpClient,
82+
fetchTokenFn: conf.Token,
83+
}, nil
84+
}
85+
86+
func (c *ClientCredentialsOptions) toConfig() (*ccreds.Config, *http.Client, error) {
87+
if len(c.Scopes) == 0 {
88+
return nil, nil, errors.New("oauth2 client_credentials token source requires at least one scope")
89+
}
90+
91+
if len(c.Audiences) == 0 {
92+
return nil, nil, errors.New("oauth2 client_credentials token source requires at least one audience")
93+
}
94+
95+
_, err := url.Parse(c.TokenURL)
96+
if err != nil {
97+
return nil, nil, fmt.Errorf("error parsing token URL: %w", err)
98+
}
99+
100+
conf := &ccreds.Config{
101+
ClientID: c.ClientID,
102+
ClientSecret: c.ClientSecret,
103+
TokenURL: c.TokenURL,
104+
Scopes: c.Scopes,
105+
EndpointParams: url.Values{"audience": c.Audiences},
106+
}
107+
108+
// If caPool is nil, then the Go TLS library will use the system's root CA.
109+
var caPool *x509.CertPool
110+
if len(c.CAPEM) > 0 {
111+
caPool = x509.NewCertPool()
112+
if !caPool.AppendCertsFromPEM(c.CAPEM) {
113+
return nil, nil, errors.New("failed to parse CA PEM")
114+
}
115+
}
116+
117+
return conf, &http.Client{
118+
Timeout: time.Second * 30,
119+
Transport: &http.Transport{
120+
TLSClientConfig: &tls.Config{
121+
MinVersion: tls.VersionTLS12,
122+
RootCAs: caPool,
123+
},
124+
},
125+
}, nil
126+
}
127+
128+
func (c *ClientCredentials) Token() (string, error) {
129+
c.lock.RLock()
130+
defer c.lock.RUnlock()
131+
132+
if !c.currentToken.Valid() {
133+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
134+
defer cancel()
135+
if err := c.renewToken(ctx); err != nil {
136+
return "", err
137+
}
138+
}
139+
140+
return c.currentToken.AccessToken, nil
141+
}
142+
143+
func (c *ClientCredentials) renewToken(ctx context.Context) error {
144+
c.lock.Lock()
145+
defer c.lock.Unlock()
146+
147+
// We need to check if the current token is valid because we might have lost
148+
// the mutex lock race from the caller and we don't want to double-fetch a
149+
// token unnecessarily!
150+
if c.currentToken.Valid() {
151+
return nil
152+
}
153+
154+
token, err := c.fetchTokenFn(context.WithValue(ctx, oauth2.HTTPClient, c.httpClient))
155+
if err != nil {
156+
return err
157+
}
158+
159+
if !c.currentToken.Valid() {
160+
return errors.New("oauth2 client_credentials token source returned an invalid token")
161+
}
162+
163+
c.currentToken = token
164+
return nil
165+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package oauth2
15+
16+
import (
17+
"net/url"
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
ccreds "golang.org/x/oauth2/clientcredentials"
22+
)
23+
24+
func Test_toConfig(t *testing.T) {
25+
tests := map[string]struct {
26+
opts ClientCredentialsOptions
27+
expConfig *ccreds.Config
28+
expErr bool
29+
}{
30+
"no scopes should error": {
31+
opts: ClientCredentialsOptions{
32+
TokenURL: "https://localhost:8080",
33+
ClientID: "client-id",
34+
ClientSecret: "client-secret",
35+
Audiences: []string{"audience"},
36+
},
37+
expErr: true,
38+
},
39+
"bad URL endpoint should error": {
40+
opts: ClientCredentialsOptions{
41+
TokenURL: "&&htp:/f url",
42+
ClientID: "client-id",
43+
ClientSecret: "client-secret",
44+
Audiences: []string{"audience"},
45+
Scopes: []string{"foo"},
46+
},
47+
expErr: true,
48+
},
49+
"bad CA certificate should error": {
50+
opts: ClientCredentialsOptions{
51+
TokenURL: "http://localhost:8080",
52+
ClientID: "client-id",
53+
ClientSecret: "client-secret",
54+
Audiences: []string{"audience"},
55+
Scopes: []string{"foo"},
56+
CAPEM: []byte("ca-pem"),
57+
},
58+
expErr: true,
59+
},
60+
"no audiences should error": {
61+
opts: ClientCredentialsOptions{
62+
TokenURL: "http://localhost:8080",
63+
ClientID: "client-id",
64+
ClientSecret: "client-secret",
65+
Scopes: []string{"foo"},
66+
},
67+
expErr: true,
68+
},
69+
"should default scope": {
70+
opts: ClientCredentialsOptions{
71+
TokenURL: "http://localhost:8080",
72+
ClientID: "client-id",
73+
ClientSecret: "client-secret",
74+
Audiences: []string{"audience"},
75+
Scopes: []string{"foo", "bar"},
76+
},
77+
expConfig: &ccreds.Config{
78+
ClientID: "client-id",
79+
ClientSecret: "client-secret",
80+
TokenURL: "http://localhost:8080",
81+
Scopes: []string{"foo", "bar"},
82+
EndpointParams: url.Values{"audience": []string{"audience"}},
83+
},
84+
expErr: false,
85+
},
86+
}
87+
88+
for name, test := range tests {
89+
t.Run(name, func(t *testing.T) {
90+
config, _, err := test.opts.toConfig()
91+
assert.Equalf(t, test.expErr, err != nil, "%v", err)
92+
assert.Equal(t, test.expConfig, config)
93+
})
94+
}
95+
}

pubsub/pulsar/metadata.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ limitations under the License.
1313

1414
package pulsar
1515

16-
import "time"
16+
import (
17+
"time"
18+
19+
"github.com/dapr/components-contrib/internal/authentication/oauth2"
20+
)
1721

1822
type pulsarMetadata struct {
1923
Host string `mapstructure:"host"`
@@ -26,12 +30,14 @@ type pulsarMetadata struct {
2630
Tenant string `mapstructure:"tenant"`
2731
Namespace string `mapstructure:"namespace"`
2832
Persistent bool `mapstructure:"persistent"`
29-
Token string `mapstructure:"token"`
3033
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
3134
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
3235
PublicKey string `mapstructure:"publicKey"`
3336
PrivateKey string `mapstructure:"privateKey"`
3437
Keys string `mapstructure:"keys"`
38+
39+
Token string `mapstructure:"token"`
40+
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
3541
}
3642

3743
type schemaMetadata struct {

pubsub/pulsar/pulsar.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import (
2525
"sync/atomic"
2626
"time"
2727

28-
"github.com/hamba/avro/v2"
29-
3028
"github.com/apache/pulsar-client-go/pulsar"
3129
"github.com/apache/pulsar-client-go/pulsar/crypto"
30+
"github.com/hamba/avro/v2"
3231
lru "github.com/hashicorp/golang-lru/v2"
3332

33+
"github.com/dapr/components-contrib/internal/authentication/oauth2"
3434
"github.com/dapr/components-contrib/metadata"
3535
"github.com/dapr/components-contrib/pubsub"
3636
"github.com/dapr/kit/logger"
@@ -157,7 +157,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
157157
return &m, nil
158158
}
159159

160-
func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error {
160+
func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
161161
m, err := parsePulsarMetadata(metadata)
162162
if err != nil {
163163
return err
@@ -173,9 +173,28 @@ func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error {
173173
ConnectionTimeout: 30 * time.Second,
174174
TLSAllowInsecureConnection: !m.EnableTLS,
175175
}
176-
if m.Token != "" {
176+
177+
switch {
178+
case len(m.Token) > 0:
177179
options.Authentication = pulsar.NewAuthenticationToken(m.Token)
180+
case len(m.ClientCredentialsMetadata.TokenURL) > 0:
181+
var cc *oauth2.ClientCredentials
182+
cc, err = oauth2.NewClientCredentials(ctx, oauth2.ClientCredentialsOptions{
183+
Logger: p.logger,
184+
TokenURL: m.ClientCredentialsMetadata.TokenURL,
185+
CAPEM: []byte(m.ClientCredentialsMetadata.TokenCAPEM),
186+
ClientID: m.ClientCredentialsMetadata.ClientID,
187+
ClientSecret: m.ClientCredentialsMetadata.ClientSecret,
188+
Scopes: m.ClientCredentialsMetadata.Scopes,
189+
Audiences: m.ClientCredentialsMetadata.Audiences,
190+
})
191+
if err != nil {
192+
return fmt.Errorf("could not instantiate oauth2 token provider: %w", err)
193+
}
194+
195+
options.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cc.Token)
178196
}
197+
179198
client, err := pulsar.NewClient(options)
180199
if err != nil {
181200
return fmt.Errorf("could not instantiate pulsar client: %v", err)

0 commit comments

Comments
 (0)