Skip to content

Commit 5ae9877

Browse files
[feat]PIP 167: Make it Configurable to Require Subscription Permission for Consumer (#246)
1 parent c3db468 commit 5ae9877

File tree

9 files changed

+284
-5
lines changed

9 files changed

+284
-5
lines changed

conf/broker.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
837837
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
838838
authorizationAllowWildcardsMatching=false
839839

840+
# If a namespace has no roles configured in the subscription permission for a given subscription name,
841+
# allow all roles that have permission to consume a the topic to consume from the subscription.
842+
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
843+
# permission.
844+
grantImplicitPermissionOnSubscription=true
845+
840846
# Role names that are treated as "super-user", meaning they will be able to do all admin
841847
# operations and publish/consume from all topics
842848
superUserRoles=

conf/standalone.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
527527
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
528528
authorizationAllowWildcardsMatching=false
529529

530+
# If a namespace has no roles configured in the subscription permission for a given subscription name,
531+
# allow all roles that have permission to consume a the topic to consume from the subscription.
532+
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
533+
# permission.
534+
grantImplicitPermissionOnSubscription=true
535+
530536
# Role names that are treated as "super-user", meaning they will be able to do all admin
531537
# operations and publish/consume from all topics
532538
superUserRoles=

conf/websocket.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorization
9898
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
9999
authorizationAllowWildcardsMatching=false
100100

101+
# If a namespace has no roles configured in the subscription permission for a given subscription name,
102+
# allow all roles that have permission to consume a the topic to consume from the subscription.
103+
# See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
104+
# permission.
105+
grantImplicitPermissionOnSubscription=true
106+
101107
# Role names that are treated as "super-user", meaning they will be able to do all admin
102108
# operations and publish/consume from all topics
103109
superUserRoles=

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,6 +1576,16 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
15761576
+ " or last position eg: *.pulsar.service, pulsar.service.*)")
15771577
private boolean authorizationAllowWildcardsMatching = false;
15781578

1579+
@FieldContext(
1580+
category = CATEGORY_AUTHORIZATION,
1581+
doc = """
1582+
If a namespace has no roles configured in the subscription permission for a given subscription name,
1583+
allow all roles that have permission to consume a the topic to consume from the subscription.
1584+
See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
1585+
permission.
1586+
""")
1587+
private boolean grantImplicitPermissionOnSubscription = true;
1588+
15791589
@FieldContext(
15801590
category = CATEGORY_AUTHORIZATION,
15811591
doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole"

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,18 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
111111
}
112112
} else {
113113
if (isNotBlank(subscription)) {
114-
// validate if role is authorized to access subscription. (skip validation if authorization
115-
// list is empty)
114+
// Reject request if role is unauthorized to access subscription.
115+
// If isGrantImplicitPermissionOnSubscription is true, role must be in the set of roles.
116+
// Otherwise, set of roles must be null or empty, or role must be in set of roles.
116117
Set<String> roles = policies.get().auth_policies
117118
.getSubscriptionAuthentication().get(subscription);
118-
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
119+
boolean isUnauthorized;
120+
if (roles == null || roles.isEmpty()) {
121+
isUnauthorized = !conf.isGrantImplicitPermissionOnSubscription();
122+
} else {
123+
isUnauthorized = !roles.contains(role);
124+
}
125+
if (isUnauthorized) {
119126
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
120127
return CompletableFuture.completedFuture(false);
121128
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,83 @@ public void testSubscriberPermission() throws Exception {
367367
log.info("-- Exiting {} test --", methodName);
368368
}
369369

370+
@Test
371+
public void testSubscriberPermissionRequired() throws Exception {
372+
log.info("-- Starting {} test --", methodName);
373+
374+
// Simplify test by skipping configuration of topic level policies
375+
conf.setTopicLevelPoliciesEnabled(false);
376+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
377+
conf.setGrantImplicitPermissionOnSubscription(false);
378+
setup();
379+
380+
final String tenantRole = "tenant-role";
381+
final String subscriptionRole = "sub-role";
382+
final String subscriptionName = "sub";
383+
final String namespace = "my-property/ns-sub-auth-req";
384+
final String topicName = "persistent://" + namespace + "/my-topic";
385+
Authentication adminAuthentication = new ClientAuthentication("superUser");
386+
387+
clientAuthProviderSupportedRoles.add(subscriptionRole);
388+
389+
@Cleanup
390+
PulsarAdmin superAdmin = spy(
391+
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
392+
393+
Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
394+
@Cleanup
395+
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
396+
.authentication(tenantAdminAuthentication).build());
397+
398+
Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
399+
@Cleanup
400+
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
401+
.authentication(subAdminAuthentication).build());
402+
403+
Authentication authentication = new ClientAuthentication(subscriptionRole);
404+
405+
superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
406+
407+
// Initialize cluster and create namespace and topic
408+
superAdmin.tenants().createTenant("my-property",
409+
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
410+
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
411+
tenantAdmin.topics().createNonPartitionedTopic(topicName);
412+
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
413+
Collections.singleton(AuthAction.consume));
414+
assertNull(superAdmin.namespaces().getPublishRate(namespace));
415+
replacePulsarClient(PulsarClient.builder()
416+
.serviceUrl(pulsar.getBrokerServiceUrl())
417+
.authentication(authentication));
418+
419+
// Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have
420+
// explicit subscription permission. Verify that several operations which rely on subscription permission fail.
421+
try {
422+
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
423+
fail("should have failed with authorization exception");
424+
} catch (Exception e) {
425+
assertTrue(e.getMessage().startsWith(
426+
"Unauthorized to validateTopicOperation for operation [RESET_CURSOR]"));
427+
}
428+
try {
429+
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
430+
fail("should have failed with authorization exception");
431+
} catch (Exception e) {
432+
assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage());
433+
}
434+
435+
// Grant the role permission.
436+
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole));
437+
438+
// Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription)
439+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
440+
.subscribe();
441+
consumer.close();
442+
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
443+
444+
log.info("-- Exiting {} test --", methodName);
445+
}
446+
370447
@Test
371448
public void testClearBacklogPermission() throws Exception {
372449
log.info("-- Starting {} test --", methodName);

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,18 @@ public void test() throws Exception {
106106
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
107107
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
108108
assertFalse(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
109-
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null));
110-
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,null));
109+
// Include a subscription name. The subscription doesn't need to exist for the purpose of this test, but this
110+
// tests the case when service.getConfig().isGrantImplicitPermissionOnSubscription() is true because we
111+
// have not granted permission for this role on the subscription named "sub".
112+
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
113+
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));
114+
115+
// Grant permission to a different role for sub and expect failure
116+
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub", Set.of("no-ones-role"));
117+
// Even though other-role has permission to consume from the topic, the "sub" subscription is locked down and
118+
// only roles with permission granted via grantPermissionOnSubscription have permission to consume from that
119+
// subscription.
120+
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
111121

112122
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null));
113123

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.websocket.proxy;
20+
21+
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
22+
import static org.mockito.ArgumentMatchers.anyBoolean;
23+
import static org.mockito.ArgumentMatchers.anyInt;
24+
import static org.mockito.ArgumentMatchers.anyString;
25+
import static org.mockito.Mockito.doReturn;
26+
import static org.testng.Assert.assertFalse;
27+
import static org.testng.Assert.assertTrue;
28+
import com.google.common.collect.Sets;
29+
import java.util.EnumSet;
30+
import java.util.Optional;
31+
import java.util.Set;
32+
import org.apache.pulsar.broker.authorization.AuthorizationService;
33+
import org.apache.pulsar.client.api.ProducerConsumerBase;
34+
import org.apache.pulsar.common.naming.TopicName;
35+
import org.apache.pulsar.common.policies.data.AuthAction;
36+
import org.apache.pulsar.common.policies.data.ClusterData;
37+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
38+
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
39+
import org.apache.pulsar.websocket.WebSocketService;
40+
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
import org.testng.annotations.AfterMethod;
44+
import org.testng.annotations.BeforeClass;
45+
import org.testng.annotations.Test;
46+
47+
/**
48+
* Class that initializes the WebSocketService disabling {@link WebSocketProxyConfiguration#setGrantImplicitPermissionOnSubscription(boolean)}.
49+
* We must have this class on its own because the WebSocketProxyConfiguration is converted to the ServiceConfiguration
50+
* on start up, so it is not a dynamic property that we can change after the service has started.
51+
*/
52+
53+
@Test(groups = "websocket")
54+
public class ProxyAuthorizationWithoutImplicitPermissionOnSubscriptionTest extends ProducerConsumerBase {
55+
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
56+
private WebSocketService service;
57+
private final String configClusterName = "c1";
58+
59+
@BeforeClass
60+
@Override
61+
protected void setup() throws Exception {
62+
conf.setClusterName(configClusterName);
63+
internalSetup();
64+
65+
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
66+
Set<String> superUser = Sets.newHashSet("");
67+
config.setAuthorizationEnabled(true);
68+
config.setSuperUserRoles(superUser);
69+
config.setClusterName("c1");
70+
config.setWebServicePort(Optional.of(0));
71+
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
72+
config.setGrantImplicitPermissionOnSubscription(false);
73+
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
74+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
75+
.createConfigMetadataStore(anyString(), anyInt(), anyBoolean());
76+
service.start();
77+
}
78+
79+
@AfterMethod(alwaysRun = true)
80+
protected void cleanup() throws Exception {
81+
super.internalCleanup();
82+
if (service != null) {
83+
service.close();
84+
}
85+
log.info("Finished Cleaning Up Test setup");
86+
}
87+
88+
89+
@Test
90+
public void testAuthorizationServiceDirectly() throws Exception {
91+
AuthorizationService auth = service.getAuthorizationService();
92+
93+
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
94+
95+
admin.clusters().createCluster(configClusterName, ClusterData.builder().build());
96+
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
97+
waitForChange();
98+
admin.namespaces().createNamespace("p1/c1/ns1");
99+
waitForChange();
100+
101+
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
102+
103+
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
104+
waitForChange();
105+
106+
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
107+
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
108+
109+
admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
110+
EnumSet.of(AuthAction.consume));
111+
waitForChange();
112+
113+
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
114+
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
115+
assertFalse(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
116+
117+
// Expect false because we disabled the implicit permission on subscription
118+
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
119+
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));
120+
121+
// Grant permission
122+
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub", Set.of("other-role"));
123+
124+
// Expect only true for "other-role" because we granted permission for only that one
125+
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, "sub"));
126+
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null,"sub"));
127+
128+
129+
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null));
130+
131+
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
132+
waitForChange();
133+
134+
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
135+
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));
136+
137+
admin.namespaces().deleteNamespace("p1/c1/ns1");
138+
admin.tenants().deleteTenant("p1");
139+
admin.clusters().deleteCluster("c1");
140+
}
141+
142+
private static void waitForChange() {
143+
try {
144+
Thread.sleep(100);
145+
} catch (InterruptedException e) {
146+
}
147+
}
148+
}

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
120120
+ "presents at first or last position. For example: *.pulsar.service,pulsar.service.*)")
121121
private boolean authorizationAllowWildcardsMatching = false;
122122

123+
@FieldContext(
124+
doc = """
125+
If a namespace has no roles configured in the subscription permission for a given subscription name,
126+
allow all roles that have permission to consume a the topic to consume from the subscription.
127+
See Namespaces#grantPermissionOnSubscription in the Java Admin API Client for details on granting
128+
permission.
129+
""")
130+
private boolean grantImplicitPermissionOnSubscription = true;
131+
123132
@FieldContext(doc = "Proxy authentication settings used to connect to brokers")
124133
private String brokerClientAuthenticationPlugin;
125134

0 commit comments

Comments
 (0)