Skip to content

Commit c067284

Browse files
Omar YasinÓmar Yasinlhotari
authored andcommitted
[improve][admin] Opt-out of topic-existence check (apache#23709)
Co-authored-by: Ómar Yasin <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit f571aa1) (cherry picked from commit ef57c2a)
1 parent 799230c commit c067284

File tree

3 files changed

+57
-8
lines changed

3 files changed

+57
-8
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3312,6 +3312,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
33123312
)
33133313
private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1;
33143314

3315+
@FieldContext(
3316+
category = CATEGORY_SERVER,
3317+
doc = "Opt-out of topic-existence check when setting permissions"
3318+
)
3319+
private boolean allowAclChangesOnNonExistentTopics = false;
3320+
33153321
@FieldContext(
33163322
category = CATEGORY_SERVER,
33173323
doc = "The class name of the factory that implements the topic compaction service."

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,16 @@ protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync()
221221

222222
protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
223223
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
224-
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
225-
.thenCompose(__ -> internalCheckTopicExists(topicName))
224+
CompletableFuture<Void> validateAccessForTenantCf =
225+
validateAdminAccessForTenantAsync(namespaceName.getTenant());
226+
227+
var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
228+
if (checkIfTopicExists) {
229+
validateAccessForTenantCf = validateAccessForTenantCf
230+
.thenCompose(__ -> internalCheckTopicExists(topicName));
231+
}
232+
233+
return validateAccessForTenantCf
226234
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName));
227235
}
228236

@@ -273,9 +281,16 @@ private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, String
273281
protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse, String role,
274282
Set<AuthAction> actions) {
275283
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
276-
validateAdminAccessForTenantAsync(namespaceName.getTenant())
277-
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
278-
.thenCompose(__ -> internalCheckTopicExists(topicName))
284+
CompletableFuture<Void> validateAccessForTenantCf = validateAdminAccessForTenantAsync(namespaceName.getTenant())
285+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
286+
287+
var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
288+
if (checkIfTopicExists) {
289+
validateAccessForTenantCf = validateAccessForTenantCf
290+
.thenCompose(__ -> internalCheckTopicExists(topicName));
291+
}
292+
293+
validateAccessForTenantCf
279294
.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions))
280295
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))
281296
.exceptionally(ex -> {
@@ -288,9 +303,17 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
288303

289304
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
290305
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
291-
validateAdminAccessForTenantAsync(namespaceName.getTenant())
292-
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
293-
.thenCompose(__ -> internalCheckTopicExists(topicName))
306+
CompletableFuture<Void> validateAccessForTenantCf =
307+
validateAdminAccessForTenantAsync(namespaceName.getTenant())
308+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
309+
310+
var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
311+
if (checkIfTopicExists) {
312+
validateAccessForTenantCf = validateAccessForTenantCf
313+
.thenCompose(__ -> internalCheckTopicExists(topicName));
314+
}
315+
316+
validateAccessForTenantCf
294317
.thenCompose(unused1 -> getPartitionedTopicMetadataAsync(topicName, true, false)
295318
.thenCompose(metadata -> {
296319
int numPartitions = metadata.partitions;

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.mockito.Mockito.doReturn;
2223
import static org.mockito.Mockito.spy;
2324
import static org.mockito.Mockito.times;
@@ -3624,4 +3625,23 @@ public void testPermissions() {
36243625
assertThrows(NotFoundException.class, () -> admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
36253626
assertThrows(NotFoundException.class, () -> admin.topics().revokePermissions(topic, subject));
36263627
}
3628+
3629+
@Test
3630+
@SneakyThrows
3631+
public void testPermissionsAllowAclChangesOnNonExistentTopics() {
3632+
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(true);
3633+
try {
3634+
String namespace = "prop-xyz/ns1/";
3635+
final String random = UUID.randomUUID().toString();
3636+
final String topic = "persistent://" + namespace + random;
3637+
final String subject = UUID.randomUUID().toString();
3638+
admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce));
3639+
assertThat(admin.topics().getPermissions(topic).get(subject)).containsExactly(AuthAction.produce);
3640+
admin.topics().revokePermissions(topic, subject);
3641+
assertThat(admin.topics().getPermissions(topic).get(subject)).isNullOrEmpty();
3642+
} finally {
3643+
// reset config
3644+
pulsar.getConfiguration().setAllowAclChangesOnNonExistentTopics(false);
3645+
}
3646+
}
36273647
}

0 commit comments

Comments
 (0)