Skip to content

Commit 713db44

Browse files
ahuang98cmccabe
authored andcommitted
KAFKA-19411: Fix deleteAcls bug which allows more deletions than max records per user op (apache#19974)
If there are more deletion filters after we initially hit the `MAX_RECORDS_PER_USER_OP` bound, we will add an additional deletion record ontop of that for each additional filter. The current error message returned to the client is not useful either, adding logic so client doesn't just get `UNKNOWN_SERVER_EXCEPTION` with no details returned. Conflicts: - In AclControlManagerTest.java, use !isPresent instead of isEmpty so that this will work with java 8.
1 parent 6e62e0b commit 713db44

File tree

2 files changed

+67
-4
lines changed

2 files changed

+67
-4
lines changed

metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ ControllerResult<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filter
176176
validateFilter(filter);
177177
AclDeleteResult result = deleteAclsForFilter(filter, records);
178178
results.add(result);
179+
} catch (BoundedListTooLongException e) {
180+
// we do not return partial results here because the fact that only a portion of the deletions
181+
// succeeded can be easily missed due to response size. instead fail the entire response
182+
throw new InvalidRequestException(e.getMessage(), e);
179183
} catch (Throwable e) {
180184
results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception()));
181185
}
@@ -191,13 +195,14 @@ AclDeleteResult deleteAclsForFilter(AclBindingFilter filter,
191195
StandardAcl acl = entry.getValue();
192196
AclBinding binding = acl.toBinding();
193197
if (filter.matches(binding)) {
194-
deleted.add(new AclBindingDeleteResult(binding));
195-
records.add(new ApiMessageAndVersion(
196-
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
197-
if (records.size() > MAX_RECORDS_PER_USER_OP) {
198+
// check size limitation first before adding additional records
199+
if (records.size() >= MAX_RECORDS_PER_USER_OP) {
198200
throw new BoundedListTooLongException("Cannot remove more than " +
199201
MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.");
200202
}
203+
deleted.add(new AclBindingDeleteResult(binding));
204+
records.add(new ApiMessageAndVersion(
205+
new RemoveAccessControlEntryRecord().setId(id), (short) 0));
201206
}
202207
}
203208
return new AclDeleteResult(deleted);

metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.kafka.server.authorizer.AuthorizationResult;
4949
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
5050
import org.apache.kafka.server.common.ApiMessageAndVersion;
51+
import org.apache.kafka.server.mutable.BoundedListTooLongException;
5152
import org.apache.kafka.timeline.SnapshotRegistry;
5253

5354
import org.junit.jupiter.api.Test;
@@ -71,6 +72,7 @@
7172
import static org.apache.kafka.common.resource.PatternType.LITERAL;
7273
import static org.apache.kafka.common.resource.PatternType.MATCH;
7374
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
75+
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
7476
import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
7577
import static org.junit.jupiter.api.Assertions.assertEquals;
7678
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -366,4 +368,60 @@ public void testDeleteDedupe() {
366368
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id());
367369
assertEquals(2, deleteAclResultsBothFilters.response().size());
368370
}
371+
372+
@Test
373+
public void testDeleteExceedsMaxRecords() {
374+
AclControlManager manager = new AclControlManager.Builder().build();
375+
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
376+
authorizer.loadSnapshot(manager.idToAcl());
377+
378+
List<AclBinding> firstCreate = new ArrayList<>();
379+
List<AclBinding> secondCreate = new ArrayList<>();
380+
381+
// create MAX_RECORDS_PER_USER_OP + 2 ACLs
382+
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
383+
StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(),
384+
new StandardAcl(
385+
ResourceType.TOPIC,
386+
"mytopic_" + i,
387+
PatternType.LITERAL,
388+
"User:alice",
389+
"127.0.0.1",
390+
AclOperation.READ,
391+
AclPermissionType.ALLOW));
392+
393+
// split acl creations between two create requests
394+
if (i % 2 == 0) {
395+
firstCreate.add(acl.toBinding());
396+
} else {
397+
secondCreate.add(acl.toBinding());
398+
}
399+
}
400+
ControllerResult<List<AclCreateResult>> firstCreateResult = manager.createAcls(firstCreate);
401+
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, firstCreateResult.response().size());
402+
for (AclCreateResult result : firstCreateResult.response()) {
403+
assertTrue(!result.exception().isPresent());
404+
}
405+
406+
ControllerResult<List<AclCreateResult>> secondCreateResult = manager.createAcls(secondCreate);
407+
assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, secondCreateResult.response().size());
408+
for (AclCreateResult result : secondCreateResult.response()) {
409+
assertTrue(!result.exception().isPresent());
410+
}
411+
412+
RecordTestUtils.replayAll(manager, firstCreateResult.records());
413+
RecordTestUtils.replayAll(manager, secondCreateResult.records());
414+
assertFalse(manager.idToAcl().isEmpty());
415+
416+
ArrayList<AclBindingFilter> filters = new ArrayList<>();
417+
for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) {
418+
filters.add(new AclBindingFilter(
419+
new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, PatternType.LITERAL),
420+
AccessControlEntryFilter.ANY));
421+
}
422+
423+
Exception exception = assertThrows(InvalidRequestException.class, () -> manager.deleteAcls(filters));
424+
assertEquals(BoundedListTooLongException.class, exception.getCause().getClass());
425+
assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.", exception.getCause().getMessage());
426+
}
369427
}

0 commit comments

Comments
 (0)