Skip to content

Commit eefee6d

Browse files
mingyen066chia7712
authored andcommitted
KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040)
* Coordinator starts with a smaller buffer, which can grow as needed. * In freeCurrentBatch, release the appropriate buffer: * The Coordinator recycles the expanded buffer (`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because `MemoryBuilder` may allocate a new `ByteBuffer` if the existing one isn't large enough. * There are two cases that buffer may exceeds `maxMessageSize` 1. If there's a single record whose size exceeds `maxMessageSize` (which, so far, is derived from `max.message.bytes`) and the write is in `non-atomic` mode, it's still possible for the buffer to grow beyond `maxMessageSize`. In this case, the Coordinator should revert to using a smaller buffer afterward. 2. Coordinator do not recycles the buffer that larger than `maxMessageSize`. If the user dynamically reduces `maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the Coordinator should avoid recycling any buffer larger than `maxMessageSize` so that Coordinator can allocate the smaller buffer in the next round. * Add tests to verify the above scenarios. Reviewers: David Jacot <[email protected]>, Sean Quah <[email protected]>, Ken Huang <[email protected]>, PoAn Yang <[email protected]>, TaiJuWu <[email protected]>, Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent d95857a commit eefee6d

File tree

2 files changed

+222
-10
lines changed

2 files changed

+222
-10
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.util.function.Consumer;
7171
import java.util.stream.Collectors;
7272

73+
import static java.lang.Math.min;
7374
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED;
7475

7576
/**
@@ -758,8 +759,14 @@ private void freeCurrentBatch() {
758759
// Cancel the linger timeout.
759760
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
760761

761-
// Release the buffer.
762-
bufferSupplier.release(currentBatch.buffer);
762+
// Release the buffer only if it is not larger than the maxBatchSize.
763+
int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
764+
765+
if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
766+
bufferSupplier.release(currentBatch.builder.buffer());
767+
} else if (currentBatch.buffer.capacity() <= maxBatchSize) {
768+
bufferSupplier.release(currentBatch.buffer);
769+
}
763770

764771
currentBatch = null;
765772
}
@@ -859,7 +866,7 @@ private void maybeAllocateNewBatch(
859866
LogConfig logConfig = partitionWriter.config(tp);
860867
int maxBatchSize = logConfig.maxMessageSize();
861868
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
862-
ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
869+
ByteBuffer buffer = bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
863870

864871
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
865872
buffer,
@@ -1888,9 +1895,9 @@ public void onHighWatermarkUpdated(
18881895
}
18891896

18901897
/**
1891-
* 16KB. Used for initial buffer size for write operations.
1898+
* 512KB. Used for initial buffer size for write operations.
18921899
*/
1893-
static final int MIN_BUFFER_SIZE = 16384;
1900+
static final int INITIAL_BUFFER_SIZE = 512 * 1024;
18941901

18951902
/**
18961903
* The log prefix.

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java

Lines changed: 210 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.KafkaException;
2020
import org.apache.kafka.common.TopicPartition;
2121
import org.apache.kafka.common.compress.Compression;
22+
import org.apache.kafka.common.config.TopicConfig;
2223
import org.apache.kafka.common.errors.NotCoordinatorException;
2324
import org.apache.kafka.common.errors.NotEnoughReplicasException;
2425
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -65,6 +66,7 @@
6566
import java.util.Deque;
6667
import java.util.LinkedList;
6768
import java.util.List;
69+
import java.util.Map;
6870
import java.util.Objects;
6971
import java.util.OptionalInt;
7072
import java.util.Set;
@@ -84,7 +86,7 @@
8486
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
8587
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
8688
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
87-
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
89+
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE;
8890
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
8991
import static org.junit.jupiter.api.Assertions.assertEquals;
9092
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -3486,11 +3488,11 @@ public void testAppendRecordBatchSize() {
34863488
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
34873489

34883490
int maxBatchSize = writer.config(TP).maxMessageSize();
3489-
assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
3491+
assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE);
34903492

3491-
// Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize
3493+
// Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
34923494
List<String> records = new ArrayList<>();
3493-
for (int i = 0; i < 3000; i++) {
3495+
for (int i = 0; i < 50000; i++) {
34943496
records.add("record-" + i);
34953497
}
34963498

@@ -3504,7 +3506,210 @@ public void testAppendRecordBatchSize() {
35043506
assertFalse(write1.isCompletedExceptionally());
35053507

35063508
int batchSize = writer.entries(TP).get(0).sizeInBytes();
3507-
assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
3509+
assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < maxBatchSize);
3510+
}
3511+
3512+
@Test
3513+
public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
3514+
MockTimer timer = new MockTimer();
3515+
InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) {
3516+
@Override
3517+
public LogConfig config(TopicPartition tp) {
3518+
return new LogConfig(Map.of(
3519+
TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB
3520+
));
3521+
}
3522+
};
3523+
StringSerializer serializer = new StringSerializer();
3524+
3525+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
3526+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
3527+
.withTime(timer.time())
3528+
.withTimer(timer)
3529+
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
3530+
.withLoader(new MockCoordinatorLoader())
3531+
.withEventProcessor(new DirectEventProcessor())
3532+
.withPartitionWriter(mockWriter)
3533+
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
3534+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
3535+
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
3536+
.withSerializer(serializer)
3537+
.withExecutorService(mock(ExecutorService.class))
3538+
.build();
3539+
3540+
// Schedule the loading.
3541+
runtime.scheduleLoadOperation(TP, 10);
3542+
3543+
// Verify the initial state.
3544+
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
3545+
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
3546+
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
3547+
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
3548+
3549+
// Generate a record larger than the maxBatchSize.
3550+
List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
3551+
3552+
// Write #1.
3553+
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
3554+
state -> new CoordinatorResult<>(largeRecords, "response1", null, true, false)
3555+
);
3556+
3557+
// Verify that the write has not completed exceptionally.
3558+
// This will catch any exceptions thrown including RecordTooLargeException.
3559+
assertFalse(write1.isCompletedExceptionally());
3560+
3561+
// Verify that the next buffer retrieved from the bufferSupplier is the initial small one, not the large buffer.
3562+
assertEquals(INITIAL_BUFFER_SIZE, ctx.bufferSupplier.get(1).capacity());
3563+
}
3564+
3565+
@Test
3566+
public void testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
3567+
MockTimer timer = new MockTimer();
3568+
InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) {
3569+
@Override
3570+
public LogConfig config(TopicPartition tp) {
3571+
return new LogConfig(Map.of(
3572+
TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024) // 1GB
3573+
));
3574+
}
3575+
};
3576+
StringSerializer serializer = new StringSerializer();
3577+
3578+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
3579+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
3580+
.withTime(timer.time())
3581+
.withTimer(timer)
3582+
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
3583+
.withLoader(new MockCoordinatorLoader())
3584+
.withEventProcessor(new DirectEventProcessor())
3585+
.withPartitionWriter(mockWriter)
3586+
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
3587+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
3588+
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
3589+
.withSerializer(serializer)
3590+
.withExecutorService(mock(ExecutorService.class))
3591+
.build();
3592+
3593+
// Schedule the loading.
3594+
runtime.scheduleLoadOperation(TP, 10);
3595+
3596+
// Verify the initial state.
3597+
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
3598+
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
3599+
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
3600+
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
3601+
3602+
// Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
3603+
List<String> records = new ArrayList<>();
3604+
for (int i = 0; i < 1000000; i++) {
3605+
records.add("record-" + i);
3606+
}
3607+
3608+
// Write #1.
3609+
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
3610+
state -> new CoordinatorResult<>(records, "response1")
3611+
);
3612+
3613+
// Verify that the write has not completed exceptionally.
3614+
// This will catch any exceptions thrown including RecordTooLargeException.
3615+
assertFalse(write1.isCompletedExceptionally());
3616+
3617+
int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
3618+
int maxBatchSize = mockWriter.config(TP).maxMessageSize();
3619+
assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= maxBatchSize);
3620+
3621+
// Verify that the next buffer retrieved from the bufferSupplier is the expanded buffer.
3622+
assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
3623+
}
3624+
3625+
@Test
3626+
public void testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
3627+
MockTimer timer = new MockTimer();
3628+
var mockWriter = new InMemoryPartitionWriter(false) {
3629+
private LogConfig config = new LogConfig(Map.of(
3630+
TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB
3631+
));
3632+
3633+
@Override
3634+
public LogConfig config(TopicPartition tp) {
3635+
return config;
3636+
}
3637+
3638+
public void updateConfig(LogConfig newConfig) {
3639+
this.config = newConfig;
3640+
}
3641+
};
3642+
StringSerializer serializer = new StringSerializer();
3643+
3644+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
3645+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
3646+
.withTime(timer.time())
3647+
.withTimer(timer)
3648+
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
3649+
.withLoader(new MockCoordinatorLoader())
3650+
.withEventProcessor(new DirectEventProcessor())
3651+
.withPartitionWriter(mockWriter)
3652+
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
3653+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
3654+
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
3655+
.withSerializer(serializer)
3656+
.withExecutorService(mock(ExecutorService.class))
3657+
.build();
3658+
3659+
// Schedule the loading.
3660+
runtime.scheduleLoadOperation(TP, 10);
3661+
3662+
// Verify the initial state.
3663+
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
3664+
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
3665+
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
3666+
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
3667+
3668+
List<String> records = new ArrayList<>();
3669+
for (int i = 0; i < 1000; i++) {
3670+
records.add("record-" + i);
3671+
}
3672+
3673+
// Write #1.
3674+
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
3675+
state -> new CoordinatorResult<>(records, "response1")
3676+
);
3677+
3678+
// Verify that the write has not completed exceptionally.
3679+
// This will catch any exceptions thrown including RecordTooLargeException.
3680+
assertFalse(write1.isCompletedExceptionally());
3681+
3682+
int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
3683+
int maxBatchSize = mockWriter.config(TP).maxMessageSize();
3684+
assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= maxBatchSize);
3685+
3686+
ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
3687+
assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
3688+
// ctx.bufferSupplier.get(1); will clear cachedBuffer in bufferSupplier. Use release to put it back to bufferSupplier
3689+
ctx.bufferSupplier.release(cachedBuffer);
3690+
3691+
// Reduce max message size below initial buffer size.
3692+
mockWriter.updateConfig(new LogConfig(
3693+
Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(INITIAL_BUFFER_SIZE - 66))));
3694+
assertEquals(INITIAL_BUFFER_SIZE - 66, mockWriter.config(TP).maxMessageSize());
3695+
3696+
// Write #2.
3697+
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
3698+
state -> new CoordinatorResult<>(records, "response2")
3699+
);
3700+
assertFalse(write2.isCompletedExceptionally());
3701+
3702+
// Verify that there is no cached buffer since the cached buffer size is greater than new maxMessageSize.
3703+
assertEquals(1, ctx.bufferSupplier.get(1).capacity());
3704+
3705+
// Write #3.
3706+
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
3707+
state -> new CoordinatorResult<>(records, "response3")
3708+
);
3709+
assertFalse(write3.isCompletedExceptionally());
3710+
3711+
// Verify that the cached buffer size is equals to new maxMessageSize that less than INITIAL_BUFFER_SIZE.
3712+
assertEquals(mockWriter.config(TP).maxMessageSize(), ctx.bufferSupplier.get(1).capacity());
35083713
}
35093714

35103715
@Test

0 commit comments

Comments
 (0)