Skip to content

Commit a7f2688

Browse files
authored
Implement AuthenticationProviderMTls (#1441)
1 parent 9817118 commit a7f2688

File tree

13 files changed

+982
-0
lines changed

13 files changed

+982
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.oidc.broker.common;
15+
16+
/**
17+
* Constant values related to Apache Pulsar broker OIDC options.
18+
*/
19+
public final class OIDCConstants {
20+
21+
/**
22+
* Timeout value, in seconds, for metadata resource synchronization operations.
23+
*/
24+
public static final int RESOURCE_SYNC_OPERATION_TIMEOUT_SEC = 30;
25+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.oidc.broker.common;
15+
16+
import static io.streamnative.oidc.broker.common.OIDCConstants.RESOURCE_SYNC_OPERATION_TIMEOUT_SEC;
17+
import com.fasterxml.jackson.core.type.TypeReference;
18+
import io.streamnative.oidc.broker.common.pojo.Pool;
19+
import io.streamnative.oidc.broker.common.utils.Paths;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.stream.Collectors;
25+
import javax.validation.constraints.NotNull;
26+
import org.apache.pulsar.broker.resources.BaseResources;
27+
import org.apache.pulsar.common.util.FutureUtil;
28+
import org.apache.pulsar.metadata.api.MetadataStore;
29+
import org.apache.pulsar.metadata.api.MetadataStoreException;
30+
31+
32+
@SuppressWarnings("UnstableApiUsage")
33+
public final class OIDCPoolResources extends BaseResources<Pool> {
34+
private static final String BASE_PATH = "/sn-oidc/pools";
35+
36+
public OIDCPoolResources(@NotNull MetadataStore metadataStore) {
37+
super(metadataStore, new TypeReference<>() { }, RESOURCE_SYNC_OPERATION_TIMEOUT_SEC);
38+
}
39+
40+
public @NotNull Optional<Pool> getPool(@NotNull String poolName) throws MetadataStoreException {
41+
return get(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
42+
}
43+
44+
public @NotNull CompletableFuture<Optional<Pool>> getPoolAsync(@NotNull String poolName) {
45+
return getAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
46+
}
47+
48+
public void createPool(@NotNull Pool pool) throws MetadataStoreException {
49+
create(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), pool);
50+
}
51+
52+
public @NotNull CompletableFuture<Void> createPoolAsync(@NotNull Pool pool) {
53+
return createAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), pool);
54+
}
55+
56+
public @NotNull CompletableFuture<Boolean> existsAsync(@NotNull String poolName) {
57+
return super.existsAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
58+
}
59+
60+
public void deletePool(@NotNull String poolName) throws MetadataStoreException {
61+
super.delete(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
62+
}
63+
64+
public @NotNull CompletableFuture<Void> deletePoolAsync(@NotNull String poolName) {
65+
return super.deleteIfExistsAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
66+
}
67+
68+
public @NotNull CompletableFuture<Void> updatePoolAsync(@NotNull Pool pool) {
69+
return super.setAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), __ -> pool);
70+
}
71+
72+
public @NotNull CompletableFuture<List<String>> listPoolNamesAsync() {
73+
return super.getChildrenAsync(joinPath(BASE_PATH));
74+
}
75+
76+
public @NotNull CompletableFuture<List<Pool>> listPoolsAsync() {
77+
return super.getChildrenAsync(joinPath(BASE_PATH))
78+
.thenCompose(poolNames -> {
79+
List<CompletableFuture<Optional<Pool>>> pools = new ArrayList<>();
80+
for (String name : poolNames) {
81+
pools.add(getAsync(joinPath(BASE_PATH, name)));
82+
}
83+
return FutureUtil.waitForAll(pools)
84+
.thenApply(__ -> pools.stream().map(f -> f.join())
85+
.filter(f -> f.isPresent())
86+
.map(f -> f.get())
87+
.collect(Collectors.toList()));
88+
});
89+
}
90+
91+
public static boolean pathIsFromPool(String path) {
92+
return path.startsWith(BASE_PATH + "/");
93+
}
94+
95+
public static String poolFromPath(String path) {
96+
return path.substring(BASE_PATH.length() + 1);
97+
}
98+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
// Copy from sn-pulsar-plugins, only used to compile stage
15+
package io.streamnative.oidc.broker.common;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.oidc.broker.common.pojo;
15+
16+
import com.fasterxml.jackson.annotation.JsonProperty;
17+
import java.util.Objects;
18+
import javax.validation.constraints.NotNull;
19+
20+
public record Pool(@JsonProperty(value = "name", required = true) @NotNull String name,
21+
@JsonProperty(value = "auth_type", defaultValue = AUTH_TYPE_TOKEN) @NotNull String authType,
22+
@JsonProperty(value = "description", required = true) @NotNull String description,
23+
@JsonProperty(value = "provider_name") @NotNull String providerName,
24+
@JsonProperty(value = "expression", required = true) @NotNull String expression) {
25+
26+
public static final String AUTH_TYPE_TOKEN = "token";
27+
public static final String AUTH_TYPE_MTLS = "mtls";
28+
29+
@Override
30+
public boolean equals(Object o) {
31+
if (this == o) {
32+
return true;
33+
}
34+
if (o == null || getClass() != o.getClass()) {
35+
return false;
36+
}
37+
Pool pool = (Pool) o;
38+
return Objects.equals(name, pool.name);
39+
}
40+
41+
@Override
42+
public int hashCode() {
43+
return Objects.hash(name);
44+
}
45+
46+
public String authType() {
47+
return (authType == null || authType.isEmpty()) ? AUTH_TYPE_TOKEN : authType;
48+
}
49+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.oidc.broker.common.utils;
15+
16+
import java.net.URLDecoder;
17+
import java.net.URLEncoder;
18+
import java.nio.charset.StandardCharsets;
19+
import javax.validation.constraints.NotNull;
20+
import lombok.experimental.UtilityClass;
21+
22+
@UtilityClass
23+
public final class Paths {
24+
25+
public String getUrlEncodedPath(@NotNull String name) {
26+
return URLEncoder.encode(name, StandardCharsets.UTF_8);
27+
}
28+
29+
public String getUrlDecodedPath(@NotNull String name) {
30+
return URLDecoder.decode(name, StandardCharsets.UTF_8);
31+
}
32+
}

0 commit comments

Comments
 (0)