Skip to content

Commit b697140

Browse files
dao-junmukesh-ctds
authored andcommitted
[fix][admin] Verify is policies read only before revoke permissions on topic (apache#23730)
(cherry picked from commit 069cc3d) (cherry picked from commit 7df87f0)
1 parent cd8c2d9 commit b697140

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
305305
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
306306
CompletableFuture<Void> validateAccessForTenantCf =
307307
validateAdminAccessForTenantAsync(namespaceName.getTenant())
308-
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
308+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
309309

310310
var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
311311
if (checkIfTopicExists) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,36 @@ public void testRevokePartitionedTopic() {
10241024
}
10251025
}
10261026

1027+
@Test
1028+
public void testRevokePartitionedTopicWithReadonlyPolicies() throws Exception {
1029+
final String partitionedTopicName = "testRevokePartitionedTopicWithReadonlyPolicies-topic";
1030+
final int numPartitions = 5;
1031+
AsyncResponse response = mock(AsyncResponse.class);
1032+
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
1033+
persistentTopics.createPartitionedTopic(
1034+
response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
1035+
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
1036+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
1037+
String role = "role";
1038+
Set<AuthAction> expectActions = new HashSet<>();
1039+
expectActions.add(AuthAction.produce);
1040+
response = mock(AsyncResponse.class);
1041+
responseCaptor = ArgumentCaptor.forClass(Response.class);
1042+
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
1043+
expectActions);
1044+
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
1045+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
1046+
response = mock(AsyncResponse.class);
1047+
doReturn(CompletableFuture.failedFuture(
1048+
new RestException(Response.Status.FORBIDDEN, "Broker is forbidden to do read-write operations"))
1049+
).when(persistentTopics).validatePoliciesReadOnlyAccessAsync();
1050+
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
1051+
ArgumentCaptor<RestException> exceptionCaptor = ArgumentCaptor.forClass(RestException.class);
1052+
verify(response, timeout(5000).times(1)).resume(exceptionCaptor.capture());
1053+
Assert.assertEquals(exceptionCaptor.getValue().getResponse().getStatus(),
1054+
Response.Status.FORBIDDEN.getStatusCode());
1055+
}
1056+
10271057
@Test
10281058
public void testTriggerCompactionTopic() {
10291059
final String partitionTopicName = "test-part";

0 commit comments

Comments
 (0)