8383import java .util .concurrent .ScheduledFuture ;
8484import java .util .concurrent .atomic .AtomicLong ;
8585import java .util .function .Predicate ;
86+ import java .util .function .Supplier ;
8687import java .util .stream .Collectors ;
8788import java .util .stream .StreamSupport ;
8889
@@ -922,7 +923,7 @@ public void close() {
922923 localLog .checkIfMemoryMappedBufferClosed ();
923924 producerExpireCheck .cancel (true );
924925 maybeHandleIOException (
925- "Error while renaming dir for " + topicPartition () + " in dir " + dir ().getParent (),
926+ () -> "Error while renaming dir for " + topicPartition () + " in dir " + dir ().getParent (),
926927 () -> {
927928 // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
928929 // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
@@ -945,7 +946,7 @@ public void close() {
945946 public void renameDir (String name , boolean shouldReinitialize ) {
946947 synchronized (lock ) {
947948 maybeHandleIOException (
948- "Error while renaming dir for " + topicPartition () + " in log dir " + dir ().getParent (),
949+ () -> "Error while renaming dir for " + topicPartition () + " in log dir " + dir ().getParent (),
949950 () -> {
950951 // Flush partitionMetadata file before initializing again
951952 maybeFlushMetadataFile ();
@@ -1087,7 +1088,7 @@ private LogAppendInfo append(MemoryRecords records,
10871088 // they are valid, insert them in the log
10881089 synchronized (lock ) {
10891090 return maybeHandleIOException (
1090- "Error while appending records to " + topicPartition () + " in dir " + dir ().getParent (),
1091+ () -> "Error while appending records to " + topicPartition () + " in dir " + dir ().getParent (),
10911092 () -> {
10921093 MemoryRecords validRecords = trimmedRecords ;
10931094 localLog .checkIfMemoryMappedBufferClosed ();
@@ -1300,7 +1301,7 @@ public boolean maybeIncrementLogStartOffset(long newLogStartOffset, LogStartOffs
13001301 // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
13011302 // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
13021303 return maybeHandleIOException (
1303- "Exception while increasing log start offset for " + topicPartition () + " to " + newLogStartOffset + " in dir " + dir ().getParent (),
1304+ () -> "Exception while increasing log start offset for " + topicPartition () + " to " + newLogStartOffset + " in dir " + dir ().getParent (),
13041305 () -> {
13051306 synchronized (lock ) {
13061307 if (newLogStartOffset > highWatermark ()) {
@@ -1613,7 +1614,7 @@ public List<AbortedTxn> collectAbortedTransactions(long startOffset, long upperB
16131614 */
16141615 public OffsetResultHolder fetchOffsetByTimestamp (long targetTimestamp , Optional <AsyncOffsetReader > remoteOffsetReader ) {
16151616 return maybeHandleIOException (
1616- "Error while fetching offset by timestamp for " + topicPartition () + " in dir " + dir ().getParent (),
1617+ () -> "Error while fetching offset by timestamp for " + topicPartition () + " in dir " + dir ().getParent (),
16171618 () -> {
16181619 logger .debug ("Searching offset for timestamp {}." , targetTimestamp );
16191620
@@ -1831,7 +1832,7 @@ private static <T> Optional<T> nextOption(Iterator<T> iterator) {
18311832 }
18321833
18331834 private int deleteSegments (List <LogSegment > deletable , SegmentDeletionReason reason ) {
1834- return maybeHandleIOException ("Error while deleting segments for " + topicPartition () + " in dir " + dir ().getParent (),
1835+ return maybeHandleIOException (() -> "Error while deleting segments for " + topicPartition () + " in dir " + dir ().getParent (),
18351836 () -> {
18361837 int numToDelete = deletable .size ();
18371838 if (numToDelete > 0 ) {
@@ -2138,7 +2139,7 @@ private void flush(long offset, boolean includingOffset) {
21382139 long flushOffset = includingOffset ? offset + 1 : offset ;
21392140 String includingOffsetStr = includingOffset ? "inclusive" : "exclusive" ;
21402141 maybeHandleIOException (
2141- "Error while flushing log for " + topicPartition () + " in dir " + dir ().getParent () + " with offset " + offset +
2142+ () -> "Error while flushing log for " + topicPartition () + " in dir " + dir ().getParent () + " with offset " + offset +
21422143 " (" + includingOffsetStr + ") and recovery point " + offset ,
21432144 () -> {
21442145 if (flushOffset > localLog .recoveryPoint ()) {
@@ -2158,7 +2159,7 @@ private void flush(long offset, boolean includingOffset) {
21582159 */
21592160 public void delete () {
21602161 maybeHandleIOException (
2161- "Error while deleting log for " + topicPartition () + " in dir " + dir ().getParent (),
2162+ () -> "Error while deleting log for " + topicPartition () + " in dir " + dir ().getParent (),
21622163 () -> {
21632164 synchronized (lock ) {
21642165 localLog .checkIfMemoryMappedBufferClosed ();
@@ -2204,7 +2205,7 @@ public long latestProducerStateEndOffset() {
22042205 // visible for testing
22052206 public void flushProducerStateSnapshot (Path snapshot ) {
22062207 maybeHandleIOException (
2207- "Error while deleting producer state snapshot " + snapshot + " for " + topicPartition () + " in dir " + dir ().getParent (),
2208+ () -> "Error while deleting producer state snapshot " + snapshot + " for " + topicPartition () + " in dir " + dir ().getParent (),
22082209 () -> {
22092210 Utils .flushFileIfExists (snapshot );
22102211 return null ;
@@ -2219,7 +2220,7 @@ public void flushProducerStateSnapshot(Path snapshot) {
22192220 */
22202221 public boolean truncateTo (long targetOffset ) {
22212222 return maybeHandleIOException (
2222- "Error while truncating log to offset " + targetOffset + " for " + topicPartition () + " in dir " + dir ().getParent (),
2223+ () -> "Error while truncating log to offset " + targetOffset + " for " + topicPartition () + " in dir " + dir ().getParent (),
22232224 () -> {
22242225 if (targetOffset < 0 ) {
22252226 throw new IllegalArgumentException ("Cannot truncate partition " + topicPartition () + " to a negative offset (" + targetOffset + ")." );
@@ -2263,7 +2264,7 @@ public boolean truncateTo(long targetOffset) {
22632264 */
22642265 public void truncateFullyAndStartAt (long newOffset , Optional <Long > logStartOffsetOpt ) {
22652266 maybeHandleIOException (
2266- "Error while truncating the entire log for " + topicPartition () + " in dir " + dir ().getParent (),
2267+ () -> "Error while truncating the entire log for " + topicPartition () + " in dir " + dir ().getParent (),
22672268 () -> {
22682269 logger .debug ("Truncate and start at offset {}, logStartOffset: {}" , newOffset , logStartOffsetOpt .orElse (newOffset ));
22692270 synchronized (lock ) {
@@ -2370,8 +2371,8 @@ public void removeLogMetrics() {
23702371 metricNames .clear ();
23712372 }
23722373
2373- private <T > T maybeHandleIOException (String msg , StorageAction <T , IOException > fun ) throws KafkaStorageException {
2374- return LocalLog .maybeHandleIOException (logDirFailureChannel (), parentDir (), () -> msg , fun );
2374+ private <T > T maybeHandleIOException (Supplier < String > msg , StorageAction <T , IOException > fun ) throws KafkaStorageException {
2375+ return LocalLog .maybeHandleIOException (logDirFailureChannel (), parentDir (), msg , fun );
23752376 }
23762377
23772378 public List <LogSegment > splitOverflowedSegment (LogSegment segment ) throws IOException {
0 commit comments