Skip to content

Commit 730614a

Browse files
authored
Merge pull request #2 from kafkaesque-io/cognito
AWS Cognito auth plugin
2 parents fdce20f + b66ad6f commit 730614a

File tree

8 files changed

+280
-9
lines changed

8 files changed

+280
-9
lines changed

README.md

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
# pulsar-client-plugin
2-
Pulsar client plugin for auth0, aws, and etc.
3-
4-
## auth0 integration
5-
Integration of auth0 enables Pulsar client authenticated against [auth0](https://www.auth0.com) backend instead of the default Pulsar token. The authentication follows [the recommended M2M flow](https://auth0.com/blog/using-m2m-authorization/).
6-
7-
Auth0 integration consists of the client side plugin and a broker auth plugin. The client plugin generates an auth0 JWT, which in turn can be authenticated and authorized by the broker side. The broker plugin has to be configured on Pulsar and is not part of this repo. Please contact [Kafkaesque](https://kafkaesque.io/contact/#) to enable the broker side plugin.
2+
Pulsar client plugin for auth0 and AWS Cognito authentication.
83

94
The Jar artifact is loaded on GitHub package registry.
105

@@ -36,6 +31,11 @@ In pom.xml,
3631

3732
```
3833

34+
## auth0 integration
35+
Integration of auth0 enables Pulsar client authenticated against [auth0](https://www.auth0.com) backend instead of the default Pulsar token. The authentication follows [the recommended M2M flow](https://auth0.com/blog/using-m2m-authorization/).
36+
37+
Auth0 integration consists of the client side plugin and a broker auth plugin. The client plugin generates an auth0 JWT, which in turn can be authenticated and authorized by the broker side. The broker plugin has to be configured on Pulsar and is not part of this repo. Please contact [Kafkaesque](https://kafkaesque.io/contact/#) to enable the broker side plugin.
38+
3939
Java Client example:
4040
``` example.java
4141
String domain = "https://<your auth0 domain>.auth0.com/oauth/token";
@@ -51,3 +51,28 @@ PulsarClient client = PulsarClient.builder()
5151
)
5252
.build();
5353
```
54+
55+
### AWS Cognito integration
56+
Integration of AWS Cognito enables Pulsar client authenticated against [AWS Cognito](https://aws.amazon.com/cognito/). The authentication flow requires creation of Cognito user pool and App client. The App client must allow `Client credential` OAuth flow, and specifies custome scopes for OAuth 2.0 grants. Here is [a good example](https://lobster1234.github.io/2018/05/31/server-to-server-auth-with-amazon-cognito/) explaining machine to machine authentication with Cognito.
57+
58+
The client plugin enables client credential to exchange an access token following [the Cognito deverloper's guide](https://docs.aws.amazon.com/cognito/latest/developerguide/token-endpoint.html). Under the hood, we will use `client_credentials` as grant_type. Scope must be preconfigured under the a User Pool's resource server and enabled by checking off `App client`'s OAuth2 Allowed Custom Scopes. This can be done via AWS CLI or console. The scope name will be used for authorization.
59+
60+
Resource server's identifier and client Id, that becomes `sub` in the Cognito JWT, can be optionally used for verification on the Pulsar broker side's authentication.
61+
62+
Cognito integration consists of the client side plugin and a broker auth plugin. The client plugin generates an access token, which in turn can be authenticated and authorized by the broker side. The broker plugin has to be configured on Pulsar and is not part of this repo. Please contact [Kafkaesque](https://kafkaesque.io/contact/#) to enable the broker side plugin.
63+
64+
Java Client example:
65+
``` example.java
66+
String domain = "https://<your domain>.auth.us-east-2.amazoncognito.com/oauth2/token";
67+
String clientId = "";
68+
String clientSecret = "";
69+
String scope = "kafkaesque.io/ming.pulsar";
70+
71+
// Create client object
72+
PulsarClient client = PulsarClient.builder()
73+
.serviceUrl(SERVICE_URL)
74+
.authentication(
75+
AuthFactory.cognito(domain, clientId, clientSecret, scope)
76+
)
77+
.build();
78+
```

java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafkaesque.pulsar.client.auth;
22

33
import io.kafkaesque.pulsar.client.auth.auth0.Auth0JWT;
4+
import io.kafkaesque.pulsar.client.auth.cognito.CognitoJWT;
45

56
import org.apache.pulsar.client.api.Authentication;
67

@@ -38,4 +39,17 @@ public static Authentication auth0(String domain, String clientId, String client
3839
return auth0;
3940
}
4041

42+
/**
43+
* Request JWT from AWS Cognito and pass the JWT to pulsar broker.
44+
* @param domain
45+
* @param clientId
46+
* @param clientSecret
47+
* @param scope
48+
* @return
49+
*/
50+
public static Authentication cognito(String domain, String clientId, String clientSecret, String scope) {
51+
return new AuthenticationCognito(CognitoJWT.create(domain, clientId, clientSecret, scope).generateAndCheck());
52+
53+
}
54+
4155
}

java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthMethod.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ public final class AuthMethod {
88
public final static String TOKEN = "token";
99

1010
public final static String AUTH0 = "auth0";
11+
12+
public final static String COGNITO = "cognito";
1113

12-
public final static List<String> supportedMethods = Arrays.asList(TOKEN, AUTH0);
14+
public final static List<String> supportedMethods = Arrays.asList(TOKEN, AUTH0, COGNITO);
1315
}

java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationAuth0.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class AuthenticationAuth0 implements Authentication, EncodedAuthenticatio
2323
*
2424
*/
2525
private static final long serialVersionUID = 1L;
26-
private Supplier<String> tokenSupplier;
26+
protected Supplier<String> tokenSupplier;
2727

2828
private String authMethod = AuthMethod.AUTH0;
2929

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.kafkaesque.pulsar.client.auth;
2+
3+
import com.google.common.base.Charsets;
4+
5+
import java.io.IOException;
6+
import java.net.URI;
7+
import java.nio.file.Files;
8+
import java.nio.file.Paths;
9+
import java.util.Map;
10+
import java.util.function.Supplier;
11+
12+
import org.apache.pulsar.client.api.AuthenticationDataProvider;
13+
import org.apache.pulsar.client.api.PulsarClientException;
14+
15+
/**
16+
* Cognito JWT based authentication provider.
17+
*/
18+
public class AuthenticationCognito extends AuthenticationAuth0 {
19+
20+
/**
21+
*
22+
*/
23+
private static final long serialVersionUID = 1L;
24+
25+
private String authMethod = AuthMethod.COGNITO;
26+
27+
public AuthenticationCognito(String token) {
28+
super(() -> token);
29+
}
30+
31+
public AuthenticationCognito(Supplier<String> tokenSupplier) {
32+
super(tokenSupplier);
33+
}
34+
35+
@Override
36+
public String getAuthMethodName() {
37+
return authMethod;
38+
}
39+
40+
@Override
41+
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
42+
return new AuthenticationDataCognito(tokenSupplier);
43+
}
44+
45+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.kafkaesque.pulsar.client.auth;
2+
3+
import java.util.Map;
4+
5+
import java.util.Set;
6+
import java.util.function.Supplier;
7+
8+
import javax.naming.AuthenticationException;
9+
10+
import org.apache.pulsar.common.api.AuthData;
11+
12+
import org.apache.pulsar.client.api.AuthenticationDataProvider;
13+
14+
import static java.nio.charset.StandardCharsets.UTF_8;
15+
/**
16+
* This plugin is for AWS Cognito JWT Pulsar authentication data.
17+
*/
18+
public class AuthenticationDataCognito implements AuthenticationDataProvider {
19+
20+
/**
21+
*
22+
*/
23+
private static final long serialVersionUID = 1L;
24+
25+
private final Supplier<String> token;
26+
27+
public AuthenticationDataCognito(Supplier<String> token) {
28+
this.token = token;
29+
}
30+
31+
/*
32+
* HTTP
33+
*/
34+
35+
/**
36+
* Check if data for HTTP are available.
37+
*
38+
* @return true if this authentication data contain data for HTTP
39+
*/
40+
public boolean hasDataForHttp() {
41+
return true;
42+
}
43+
44+
/**
45+
*
46+
* @return a authentication scheme, or {@code null} if the request will not be authenticated.
47+
*/
48+
public String getHttpAuthType() {
49+
return null;
50+
}
51+
52+
/**
53+
*
54+
* @return an enumeration of all the header names
55+
*/
56+
public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
57+
return null;
58+
}
59+
60+
/*
61+
* Command
62+
*/
63+
64+
/**
65+
* Check if data from Pulsar protocol are available.
66+
*
67+
* @return true if this authentication data contain data from Pulsar protocol
68+
*/
69+
public boolean hasDataFromCommand() {
70+
return token.get() != null;
71+
}
72+
73+
/**
74+
*
75+
* @return authentication data which will be stored in a command
76+
*/
77+
public String getCommandData() {
78+
return token.get();
79+
}
80+
81+
/**
82+
* For mutual authentication, This method use passed in `data` to evaluate and challenge,
83+
* then returns null if authentication has completed;
84+
* returns authenticated data back to server side, if authentication has not completed.
85+
*
86+
* <p>Mainly used for mutual authentication like sasl.
87+
*/
88+
public AuthData authenticate(AuthData data) throws AuthenticationException {
89+
byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes(UTF_8);
90+
return AuthData.of(bytes);
91+
}
92+
93+
}

java/src/main/java/io/kafkaesque/pulsar/client/auth/auth0/Auth0JWT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public JSONObject generate() throws UnsupportedEncodingException {
6666
int statusCode = response.getStatus();
6767
//TODO: add retry-after 503, 429, 301
6868
if (statusCode != 200) {
69-
throw new JWTVerificationException("invalide auth0.com status code " + statusCode);
69+
throw new JWTVerificationException("invalid auth0.com status code " + statusCode);
7070
}
7171

7272
JSONObject jsonObj = response.getBody().getObject();
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.kafkaesque.pulsar.client.auth.cognito;
2+
3+
import java.io.UnsupportedEncodingException;
4+
5+
import com.auth0.jwt.exceptions.JWTVerificationException;
6+
7+
import org.apache.pulsar.shade.io.netty.util.internal.StringUtil;
8+
import org.apache.pulsar.shade.org.apache.commons.lang3.Validate;
9+
10+
import kong.unirest.HttpResponse;
11+
import kong.unirest.JsonNode;
12+
import kong.unirest.Unirest;
13+
import kong.unirest.json.JSONObject;
14+
15+
/**
16+
* Generate a cognito JWT with basic scope validation.
17+
*/
18+
public class CognitoJWT {
19+
20+
String tokenServerUrl;
21+
String clientId;
22+
String clientSecret;
23+
String scope;
24+
25+
private CognitoJWT(String domain, String clientId, String clientSecret, String scope) {
26+
this.tokenServerUrl = Validate.notEmpty(domain);
27+
if (!domain.endsWith("/oauth2/token")) {
28+
this.tokenServerUrl = domain + "/oauth2/token";
29+
}
30+
this.clientId = Validate.notEmpty(clientId);
31+
this.clientSecret = Validate.notEmpty(clientSecret);
32+
this.scope = Validate.notEmpty(scope);
33+
}
34+
35+
/**
36+
* Create a CognitoJWT object.
37+
* @param domain
38+
* @param clientId
39+
* @param clientSecret
40+
* @param scope
41+
* @return
42+
*/
43+
public static CognitoJWT create(String domain, String clientId, String clientSecret, String scope) {
44+
return new CognitoJWT(domain, clientId, clientSecret, scope);
45+
}
46+
47+
/**
48+
*
49+
* @return
50+
* @throws UnsupportedEncodingException
51+
*/
52+
public JSONObject generate() throws UnsupportedEncodingException {
53+
54+
Unirest.config().enableCookieManagement(false);
55+
String reqBody = "grant_type=client_credentials&scope=" + this.scope;
56+
57+
HttpResponse<JsonNode> response = Unirest.post(this.tokenServerUrl)
58+
.header("content-type", "application/x-www-form-urlencoded")
59+
.basicAuth(this.clientId, this.clientSecret)
60+
.body(reqBody).asJson();
61+
62+
Unirest.config().reset();
63+
int statusCode = response.getStatus();
64+
JSONObject jsonObj = response.getBody().getObject();
65+
//TODO: may retry with some 400 or 500 code
66+
if (statusCode != 200) {
67+
throw new JWTVerificationException("invalid aws cognito status code " + statusCode);
68+
}
69+
70+
return jsonObj;
71+
}
72+
73+
/**
74+
* Generate and returns a Cognito JWT.
75+
* @return
76+
* @throws JWTVerificationException
77+
*/
78+
public String generateAndCheck() throws JWTVerificationException{
79+
JSONObject resp;
80+
try {
81+
resp = generate();
82+
} catch (UnsupportedEncodingException e) {
83+
throw new JWTVerificationException(e.getMessage());
84+
}
85+
String token = resp.getString("access_token");
86+
if (StringUtil.isNullOrEmpty(token)) {
87+
throw new JWTVerificationException("Cognito JWT is empty");
88+
}
89+
return token;
90+
}
91+
92+
}

0 commit comments

Comments
 (0)