Skip to content

Commit 8433ac4

Browse files
KAFKA-19221 Propagate IOException on LogSegment#close (#20072)
Log segment closure results in right sizing the segment on disk along with the associated index files. This is specially important for TimeIndexes where a failure to right size may eventually cause log roll failures leading to under replication and log cleaner failures. This change uses `Utils.closeAll` which propagates exceptions, resulting in an "unclean" shutdown. That would then cause the broker to attempt to recover the log segment and the index on next startup, thereby avoiding the failures described above. Reviewers: Omnia Ibrahim <[email protected]>, Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 45327fd commit 8433ac4

File tree

8 files changed

+99
-10
lines changed

8 files changed

+99
-10
lines changed

clients/src/main/java/org/apache/kafka/common/record/FileRecords.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ public void flush() throws IOException {
201201
* Close this record set
202202
*/
203203
public void close() throws IOException {
204+
if (!channel.isOpen()) {
205+
return;
206+
}
207+
204208
flush();
205209
trim();
206210
channel.close();

core/src/test/java/kafka/server/LogManagerIntegrationTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import org.apache.kafka.common.test.ClusterInstance;
3535
import org.apache.kafka.common.test.api.ClusterTest;
3636
import org.apache.kafka.common.test.api.Type;
37+
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
3738
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
3839
import org.apache.kafka.test.TestUtils;
3940

41+
import java.io.File;
4042
import java.io.IOException;
4143
import java.time.Duration;
4244
import java.util.ArrayList;
@@ -59,6 +61,70 @@ public LogManagerIntegrationTest(ClusterInstance cluster) {
5961
this.cluster = cluster;
6062
}
6163

64+
@ClusterTest(types = {Type.KRAFT})
65+
public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws IOException, InterruptedException, ExecutionException {
66+
try (Admin admin = cluster.admin()) {
67+
admin.createTopics(List.of(new NewTopic("foo", 1, (short) 1))).all().get();
68+
}
69+
cluster.waitForTopic("foo", 1);
70+
71+
// Produce some data into the topic
72+
Map<String, Object> producerConfigs = Map.of(
73+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
74+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
75+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
76+
);
77+
78+
try (Producer<String, String> producer = new KafkaProducer<>(producerConfigs)) {
79+
producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get();
80+
producer.flush();
81+
}
82+
83+
var broker = cluster.brokers().get(0);
84+
85+
File timeIndexFile = broker.logManager()
86+
.getLog(new TopicPartition("foo", 0), false).get()
87+
.activeSegment()
88+
.timeIndexFile();
89+
90+
// Set read only so that we throw an IOException on shutdown
91+
assertTrue(timeIndexFile.exists());
92+
assertTrue(timeIndexFile.setReadOnly());
93+
94+
broker.shutdown();
95+
96+
assertEquals(1, broker.config().logDirs().size());
97+
String logDir = broker.config().logDirs().head();
98+
CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir);
99+
assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the clean shutdown file to exist");
100+
101+
// Ensure we have a corrupt index on broker shutdown
102+
long maxIndexSize = broker.config().logIndexSizeMaxBytes();
103+
long expectedIndexSize = 12 * (maxIndexSize / 12);
104+
assertEquals(expectedIndexSize, timeIndexFile.length());
105+
106+
// Allow write permissions before startup
107+
assertTrue(timeIndexFile.setWritable(true));
108+
109+
broker.startup();
110+
// make sure there is no error during load logs
111+
assertTrue(cluster.firstFatalException().isEmpty());
112+
try (Admin admin = cluster.admin()) {
113+
TestUtils.waitForCondition(() -> {
114+
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(List.of("foo"))
115+
.topicNameValues().get("foo").get().partitions();
116+
return partitionInfos.get(0).leader().id() == 0;
117+
}, "Partition does not have a leader assigned");
118+
}
119+
120+
// Ensure that sanity check does not fail
121+
broker.logManager()
122+
.getLog(new TopicPartition("foo", 0), false).get()
123+
.activeSegment()
124+
.timeIndex()
125+
.sanityCheck();
126+
}
127+
62128
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
63129
public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {
64130

storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,9 @@ public boolean deleteIfExists() throws IOException {
263263
public void trimToValidSize() throws IOException {
264264
lock.lock();
265265
try {
266-
resize(entrySize() * entries);
266+
if (mmap != null) {
267+
resize(entrySize() * entries);
268+
}
267269
} finally {
268270
lock.unlock();
269271
}

storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -763,10 +763,7 @@ public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(long times
763763
public void close() throws IOException {
764764
if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN)
765765
Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true));
766-
Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER);
767-
Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER);
768-
Utils.closeQuietly(log, "log", LOGGER);
769-
Utils.closeQuietly(txnIndex, "txnIndex", LOGGER);
766+
Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex);
770767
}
771768

772769
/**

storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.storage.internals.log;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.utils.Utils;
2021

2122
import java.io.Closeable;
2223
import java.io.File;
@@ -105,8 +106,7 @@ public void clear() {
105106
*/
106107
@Override
107108
public void close() throws IOException {
108-
for (LogSegment s : values())
109-
s.close();
109+
Utils.closeAll(values().toArray(new LogSegment[0]));
110110
}
111111

112112
/**

storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void reset() throws IOException {
110110

111111
public void close() throws IOException {
112112
FileChannel channel = channelOrNull();
113-
if (channel != null)
113+
if (channel != null && channel.isOpen())
114114
channel.close();
115115
maybeChannel = Optional.empty();
116116
}

storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@
4040
import static org.junit.jupiter.api.Assertions.assertFalse;
4141
import static org.junit.jupiter.api.Assertions.assertThrows;
4242
import static org.junit.jupiter.api.Assertions.assertTrue;
43+
import static org.mockito.Mockito.doThrow;
4344
import static org.mockito.Mockito.mock;
45+
import static org.mockito.Mockito.spy;
46+
import static org.mockito.Mockito.verify;
4447
import static org.mockito.Mockito.when;
4548

4649
public class LogSegmentsTest {
@@ -49,7 +52,7 @@ public class LogSegmentsTest {
4952

5053
/* create a segment with the given base offset */
5154
private static LogSegment createSegment(Long offset) throws IOException {
52-
return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM);
55+
return spy(LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM));
5356
}
5457

5558
@BeforeEach
@@ -276,4 +279,22 @@ public void testUpdateDir() throws IOException {
276279
}
277280
}
278281

282+
@Test
283+
public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() throws IOException {
284+
LogSegment seg1 = createSegment(0L);
285+
LogSegment seg2 = createSegment(100L);
286+
LogSegment seg3 = createSegment(200L);
287+
LogSegments segments = new LogSegments(topicPartition);
288+
segments.add(seg1);
289+
segments.add(seg2);
290+
segments.add(seg3);
291+
292+
doThrow(new IOException("Failure")).when(seg2).close();
293+
294+
assertThrows(IOException.class, segments::close, "Expected IOException to be thrown");
295+
verify(seg1).close();
296+
verify(seg2).close();
297+
verify(seg3).close();
298+
}
299+
279300
}

storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ public void forceUnmapTest() throws IOException {
225225
idx.forceUnmap();
226226
// mmap should be null after unmap causing lookup to throw a NPE
227227
assertThrows(NullPointerException.class, () -> idx.lookup(1));
228-
assertThrows(NullPointerException.class, idx::close);
229228
}
230229

231230
@Test

0 commit comments

Comments
 (0)