Skip to content

Commit 5a57473

Browse files
KAFKA-18484 [1/N]; Handle exceptions from deferred events in coordinator (apache#18661)
Guard against the coordinator getting stuck due to deferred events throwing exceptions. Reviewers: David Jacot <[email protected]>
1 parent 9da516b commit 5a57473

File tree

2 files changed

+216
-10
lines changed

2 files changed

+216
-10
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public int size() {
460460
* A simple container class to hold all the attributes
461461
* related to a pending batch.
462462
*/
463-
private static class CoordinatorBatch {
463+
private class CoordinatorBatch {
464464
/**
465465
* The base (or first) offset of the batch. If the batch fails
466466
* for any reason, the state machines is rolled back to it.
@@ -500,9 +500,9 @@ private static class CoordinatorBatch {
500500
final Optional<TimerTask> lingerTimeoutTask;
501501

502502
/**
503-
* The list of deferred events associated with the batch.
503+
* The deferred events associated with the batch.
504504
*/
505-
final List<DeferredEvent> deferredEvents;
505+
final DeferredEventCollection deferredEvents;
506506

507507
/**
508508
* The next offset. This is updated when records
@@ -527,7 +527,7 @@ private static class CoordinatorBatch {
527527
this.buffer = buffer;
528528
this.builder = builder;
529529
this.lingerTimeoutTask = lingerTimeoutTask;
530-
this.deferredEvents = new ArrayList<>();
530+
this.deferredEvents = new DeferredEventCollection();
531531
}
532532
}
533533

@@ -806,9 +806,7 @@ private void flushCurrentBatch() {
806806
}
807807

808808
// Add all the pending deferred events to the deferred event queue.
809-
for (DeferredEvent event : currentBatch.deferredEvents) {
810-
deferredEventQueue.add(offset, event);
811-
}
809+
deferredEventQueue.add(offset, currentBatch.deferredEvents);
812810

813811
// Free up the current batch.
814812
freeCurrentBatch();
@@ -839,9 +837,7 @@ private void maybeFlushCurrentBatch(long currentTimeMs) {
839837
private void failCurrentBatch(Throwable t) {
840838
if (currentBatch != null) {
841839
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
842-
for (DeferredEvent event : currentBatch.deferredEvents) {
843-
event.complete(t);
844-
}
840+
currentBatch.deferredEvents.complete(t);
845841
freeCurrentBatch();
846842
}
847843
}
@@ -1157,6 +1153,38 @@ public void run() {
11571153
}
11581154
}
11591155

1156+
/**
1157+
* A collection of {@link DeferredEvent}. When completed, completes all the events in the collection
1158+
* and logs any exceptions thrown.
1159+
*/
1160+
class DeferredEventCollection implements DeferredEvent {
1161+
private final List<DeferredEvent> events = new ArrayList<>();
1162+
1163+
@Override
1164+
public void complete(Throwable t) {
1165+
for (DeferredEvent event : events) {
1166+
try {
1167+
event.complete(t);
1168+
} catch (Throwable e) {
1169+
log.error("Completion of event {} failed due to {}.", event, e.getMessage(), e);
1170+
}
1171+
}
1172+
}
1173+
1174+
public boolean add(DeferredEvent event) {
1175+
return events.add(event);
1176+
}
1177+
1178+
public int size() {
1179+
return events.size();
1180+
}
1181+
1182+
@Override
1183+
public String toString() {
1184+
return "DeferredEventCollection(events=" + events + ")";
1185+
}
1186+
}
1187+
11601188
/**
11611189
* A coordinator write operation.
11621190
*

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,10 @@
9494
import static org.junit.jupiter.api.Assertions.assertThrows;
9595
import static org.junit.jupiter.api.Assertions.assertTrue;
9696
import static org.mockito.ArgumentMatchers.any;
97+
import static org.mockito.ArgumentMatchers.anyLong;
9798
import static org.mockito.ArgumentMatchers.argThat;
9899
import static org.mockito.ArgumentMatchers.eq;
100+
import static org.mockito.Mockito.doThrow;
99101
import static org.mockito.Mockito.mock;
100102
import static org.mockito.Mockito.spy;
101103
import static org.mockito.Mockito.times;
@@ -1116,6 +1118,105 @@ public void testScheduleUnloadingWithStalePartitionEpoch() {
11161118
assertEquals(10, ctx.epoch);
11171119
}
11181120

1121+
@Test
1122+
public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
1123+
MockTimer timer = new MockTimer();
1124+
MockPartitionWriter writer = new MockPartitionWriter();
1125+
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
1126+
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
1127+
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
1128+
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);
1129+
1130+
// All operations will throw an exception when completed.
1131+
doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
1132+
1133+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
1134+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
1135+
.withTime(timer.time())
1136+
.withTimer(timer)
1137+
.withDefaultWriteTimeOut(Duration.ofMillis(20))
1138+
.withLoader(new MockCoordinatorLoader())
1139+
.withEventProcessor(new DirectEventProcessor())
1140+
.withPartitionWriter(writer)
1141+
.withCoordinatorShardBuilderSupplier(supplier)
1142+
.withCoordinatorRuntimeMetrics(metrics)
1143+
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
1144+
.withSerializer(new StringSerializer())
1145+
.withAppendLingerMs(10)
1146+
.withExecutorService(mock(ExecutorService.class))
1147+
.build();
1148+
1149+
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
1150+
when(builder.withLogContext(any())).thenReturn(builder);
1151+
when(builder.withTime(any())).thenReturn(builder);
1152+
when(builder.withTimer(any())).thenReturn(builder);
1153+
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
1154+
when(builder.withTopicPartition(any())).thenReturn(builder);
1155+
when(builder.withExecutor(any())).thenReturn(builder);
1156+
when(builder.build()).thenReturn(coordinator);
1157+
when(supplier.get()).thenReturn(builder);
1158+
1159+
// Load the coordinator.
1160+
runtime.scheduleLoadOperation(TP, 10);
1161+
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
1162+
1163+
// Get the max batch size.
1164+
int maxBatchSize = writer.config(TP).maxMessageSize();
1165+
1166+
// Create records with three quarters of the max batch size each, so that it is not
1167+
// possible to have more than one record in a single batch.
1168+
List<String> records = Stream.of('1', '2', '3').map(c -> {
1169+
char[] payload = new char[maxBatchSize * 3 / 4];
1170+
Arrays.fill(payload, c);
1171+
return new String(payload);
1172+
}).collect(Collectors.toList());
1173+
1174+
// Write #1.
1175+
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
1176+
state -> new CoordinatorResult<>(List.of(records.get(0)), "response1")
1177+
);
1178+
1179+
// Write #2.
1180+
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
1181+
state -> new CoordinatorResult<>(List.of(records.get(1)), "response2")
1182+
);
1183+
1184+
// Write #3, to force the flush of write #2.
1185+
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
1186+
state -> new CoordinatorResult<>(List.of(records.get(1)), "response3")
1187+
);
1188+
1189+
// Records have been written to the log.
1190+
assertEquals(List.of(
1191+
records(timer.time().milliseconds(), records.get(0)),
1192+
records(timer.time().milliseconds(), records.get(1))
1193+
), writer.entries(TP));
1194+
1195+
// Verify that no writes are committed yet.
1196+
assertFalse(write1.isDone());
1197+
assertFalse(write2.isDone());
1198+
assertFalse(write3.isDone());
1199+
1200+
// Schedule the unloading.
1201+
runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
1202+
assertEquals(CLOSED, ctx.state);
1203+
1204+
// All write completions throw exceptions after completing their futures.
1205+
// Despite the exceptions, the unload should still complete.
1206+
assertTrue(write1.isDone());
1207+
assertTrue(write2.isDone());
1208+
assertTrue(write3.isDone());
1209+
assertFutureThrows(write1, NotCoordinatorException.class);
1210+
assertFutureThrows(write2, NotCoordinatorException.class);
1211+
assertFutureThrows(write3, NotCoordinatorException.class);
1212+
1213+
// Verify that onUnloaded is called.
1214+
verify(coordinator, times(1)).onUnloaded();
1215+
1216+
// Getting the coordinator context fails because it no longer exists.
1217+
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
1218+
}
1219+
11191220
@Test
11201221
public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
11211222
MockTimer timer = new MockTimer();
@@ -3080,6 +3181,83 @@ public void testHighWatermarkUpdate() {
30803181
assertTrue(write2.isDone());
30813182
}
30823183

3184+
@Test
3185+
public void testHighWatermarkUpdateWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
3186+
MockTimer timer = new MockTimer();
3187+
MockPartitionWriter writer = new MockPartitionWriter();
3188+
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);
3189+
3190+
// All operations will throw an exception when completed.
3191+
doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong());
3192+
3193+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
3194+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
3195+
.withTime(timer.time())
3196+
.withTimer(timer)
3197+
.withDefaultWriteTimeOut(Duration.ofMillis(20))
3198+
.withLoader(new MockCoordinatorLoader())
3199+
.withEventProcessor(new DirectEventProcessor())
3200+
.withPartitionWriter(writer)
3201+
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
3202+
.withCoordinatorRuntimeMetrics(metrics)
3203+
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
3204+
.withSerializer(new StringSerializer())
3205+
.withAppendLingerMs(10)
3206+
.withExecutorService(mock(ExecutorService.class))
3207+
.build();
3208+
3209+
// Load the coordinator.
3210+
runtime.scheduleLoadOperation(TP, 10);
3211+
3212+
// Get the max batch size.
3213+
int maxBatchSize = writer.config(TP).maxMessageSize();
3214+
3215+
// Create records with three quarters of the max batch size each, so that it is not
3216+
// possible to have more than one record in a single batch.
3217+
List<String> records = Stream.of('1', '2', '3').map(c -> {
3218+
char[] payload = new char[maxBatchSize * 3 / 4];
3219+
Arrays.fill(payload, c);
3220+
return new String(payload);
3221+
}).collect(Collectors.toList());
3222+
3223+
// Write #1.
3224+
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
3225+
state -> new CoordinatorResult<>(List.of(records.get(0)), "response1")
3226+
);
3227+
3228+
// Write #2.
3229+
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
3230+
state -> new CoordinatorResult<>(List.of(records.get(1)), "response2")
3231+
);
3232+
3233+
// Write #3, to force the flush of write #2.
3234+
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
3235+
state -> new CoordinatorResult<>(List.of(records.get(1)), "response3")
3236+
);
3237+
3238+
// Records have been written to the log.
3239+
assertEquals(List.of(
3240+
records(timer.time().milliseconds(), records.get(0)),
3241+
records(timer.time().milliseconds(), records.get(1))
3242+
), writer.entries(TP));
3243+
3244+
// Verify that no writes are committed yet.
3245+
assertFalse(write1.isDone());
3246+
assertFalse(write2.isDone());
3247+
assertFalse(write3.isDone());
3248+
3249+
// Commit the first and second record.
3250+
writer.commit(TP, 2);
3251+
3252+
// Write #1 and write #2's completions throw exceptions after completing their futures.
3253+
// Despite the exception from write #1, write #2 should still be completed.
3254+
assertTrue(write1.isDone());
3255+
assertTrue(write2.isDone());
3256+
assertFalse(write3.isDone());
3257+
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
3258+
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
3259+
}
3260+
30833261
@Test
30843262
public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated() {
30853263
MockTimer timer = new MockTimer();

0 commit comments

Comments
 (0)