@@ -1132,6 +1132,7 @@ private Path toPathIfExists(File file) {
11321132
11331133 class RLMExpirationTask extends RLMTask {
11341134 private final Logger logger ;
1135+ private volatile boolean isAllSegmentsValid = false ;
11351136
11361137 public RLMExpirationTask (TopicIdPartition topicIdPartition ) {
11371138 super (topicIdPartition );
@@ -1143,6 +1144,16 @@ protected void execute(UnifiedLog log) throws InterruptedException, RemoteStorag
11431144 cleanupExpiredRemoteLogSegments ();
11441145 }
11451146
1147+ @ Override
1148+ public void cancel () {
1149+ isAllSegmentsValid = false ;
1150+ super .cancel ();
1151+ }
1152+
1153+ boolean isAllSegmentsValid () {
1154+ return isAllSegmentsValid ;
1155+ }
1156+
11461157 public void handleLogStartOffsetUpdate (TopicPartition topicPartition , long remoteLogStartOffset ) {
11471158 logger .debug ("Updating {} with remoteLogStartOffset: {}" , topicPartition , remoteLogStartOffset );
11481159 updateRemoteLogStartOffset .accept (topicPartition , remoteLogStartOffset );
@@ -1255,7 +1266,45 @@ private void updateRemoteDeleteLagWith(int segmentsLeftToDelete, long sizeOfDele
12551266 brokerTopicStats .recordRemoteDeleteLagBytes (topic , partition , sizeOfDeletableSegmentsBytes );
12561267 }
12571268
1258- /** Cleanup expired and dangling remote log segments. */
1269+ private static class RemoteLogMetadataStats {
1270+ private final Set <Integer > epochsSet ;
1271+ private final int metadataCount ;
1272+ private final long remoteLogSizeBytes ;
1273+ private final long remoteLogSizeBytesCopyFinishedSegments ;
1274+
1275+ private RemoteLogMetadataStats (Set <Integer > epochsSet , int metadataCount , long remoteLogSizeBytes , long remoteLogSizeBytesCopyFinishedSegments ) {
1276+ this .epochsSet = epochsSet ;
1277+ this .metadataCount = metadataCount ;
1278+ this .remoteLogSizeBytes = remoteLogSizeBytes ;
1279+ this .remoteLogSizeBytesCopyFinishedSegments = remoteLogSizeBytesCopyFinishedSegments ;
1280+ }
1281+ }
1282+
1283+ private RemoteLogMetadataStats calculateMetadataAndSize (Iterator <RemoteLogSegmentMetadata > segmentMetadataIter ) {
1284+ // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
1285+ // instead of going through all the segments and building it here.
1286+ Set <Integer > epochsSet = new HashSet <>();
1287+ int metadataCount = 0 ;
1288+ long remoteLogSizeBytes = 0 ;
1289+ long remoteLogSizeBytesCopyFinishedSegments = 0 ;
1290+ while (segmentMetadataIter .hasNext ()) {
1291+ RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter .next ();
1292+ epochsSet .addAll (segmentMetadata .segmentLeaderEpochs ().keySet ());
1293+ metadataCount ++;
1294+ RemoteLogSegmentState state = segmentMetadata .state ();
1295+ if (state == RemoteLogSegmentState .COPY_SEGMENT_FINISHED ) {
1296+ remoteLogSizeBytesCopyFinishedSegments += segmentMetadata .segmentSizeInBytes ();
1297+ remoteLogSizeBytes += segmentMetadata .segmentSizeInBytes ();
1298+ } else if (state == RemoteLogSegmentState .DELETE_SEGMENT_STARTED ) {
1299+ remoteLogSizeBytes += segmentMetadata .segmentSizeInBytes ();
1300+ }
1301+ }
1302+ return new RemoteLogMetadataStats (epochsSet , metadataCount , remoteLogSizeBytes , remoteLogSizeBytesCopyFinishedSegments );
1303+ }
1304+
1305+ /**
1306+ * Cleanup expired and dangling remote log segments.
1307+ */
12591308 void cleanupExpiredRemoteLogSegments () throws RemoteStorageException , ExecutionException , InterruptedException {
12601309 if (isCancelled ()) {
12611310 logger .info ("Returning from remote log segments cleanup as the task state is changed" );
@@ -1271,29 +1320,17 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
12711320 final UnifiedLog log = logOptional .get ();
12721321
12731322 // Cleanup remote log segments and update the log start offset if applicable.
1274- final Iterator <RemoteLogSegmentMetadata > segmentMetadataIter = remoteLogMetadataManagerPlugin .get ().listRemoteLogSegments (topicIdPartition );
1275- if (!segmentMetadataIter .hasNext ()) {
1323+ Iterator <RemoteLogSegmentMetadata > segmentMetadataIter = remoteLogMetadataManagerPlugin .get ().listRemoteLogSegments (topicIdPartition );
1324+ RemoteLogMetadataStats stats = calculateMetadataAndSize (segmentMetadataIter );
1325+ if (stats .metadataCount == 0 ) {
12761326 updateMetadataCountAndLogSizeWith (0 , 0 );
12771327 logger .debug ("No remote log segments available on remote storage for partition: {}" , topicIdPartition );
12781328 return ;
12791329 }
1280-
1281- final Set <Integer > epochsSet = new HashSet <>();
1282- int metadataCount = 0 ;
1283- long remoteLogSizeBytes = 0 ;
1284- // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
1285- // instead of going through all the segments and building it here.
1286- while (segmentMetadataIter .hasNext ()) {
1287- RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter .next ();
1288- epochsSet .addAll (segmentMetadata .segmentLeaderEpochs ().keySet ());
1289- metadataCount ++;
1290- remoteLogSizeBytes += segmentMetadata .segmentSizeInBytes ();
1291- }
1292-
1293- updateMetadataCountAndLogSizeWith (metadataCount , remoteLogSizeBytes );
1330+ updateMetadataCountAndLogSizeWith (stats .metadataCount , stats .remoteLogSizeBytes );
12941331
12951332 // All the leader epochs in sorted order that exists in remote storage
1296- final List <Integer > remoteLeaderEpochs = new ArrayList <>(epochsSet );
1333+ final List <Integer > remoteLeaderEpochs = new ArrayList <>(stats . epochsSet );
12971334 Collections .sort (remoteLeaderEpochs );
12981335
12991336 LeaderEpochFileCache leaderEpochCache = log .leaderEpochCache ();
@@ -1303,7 +1340,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
13031340 long logStartOffset = log .logStartOffset ();
13041341 long logEndOffset = log .logEndOffset ();
13051342 Optional <RetentionSizeData > retentionSizeData = buildRetentionSizeData (log .config ().retentionSize ,
1306- log .onlyLocalLogSegmentsSize (), logEndOffset , epochWithOffsets );
1343+ log .onlyLocalLogSegmentsSize (), logEndOffset , epochWithOffsets , stats . remoteLogSizeBytesCopyFinishedSegments );
13071344 Optional <RetentionTimeData > retentionTimeData = buildRetentionTimeData (log .config ().retentionMs );
13081345
13091346 RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler (retentionSizeData , retentionTimeData );
@@ -1438,13 +1475,19 @@ private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
14381475 : Optional .empty ();
14391476 }
14401477
1441- private Optional <RetentionSizeData > buildRetentionSizeData (long retentionSize ,
1442- long onlyLocalLogSegmentsSize ,
1443- long logEndOffset ,
1444- NavigableMap <Integer , Long > epochEntries ) throws RemoteStorageException {
1445- if (retentionSize > -1 ) {
1478+ Optional <RetentionSizeData > buildRetentionSizeData (long retentionSize ,
1479+ long onlyLocalLogSegmentsSize ,
1480+ long logEndOffset ,
1481+ NavigableMap <Integer , Long > epochEntries ,
1482+ long fullRemoteLogSizeBytesCopyFinishedSegments ) throws RemoteStorageException {
1483+ if (retentionSize < 0 ) {
1484+ return Optional .empty ();
1485+ }
1486+
1487+ long remoteLogSizeBytes = 0L ;
1488+ if (!isAllSegmentsValid ) {
1489+ boolean isAllValid = true ;
14461490 long startTimeMs = time .milliseconds ();
1447- long remoteLogSizeBytes = 0L ;
14481491 Set <RemoteLogSegmentId > visitedSegmentIds = new HashSet <>();
14491492 for (Integer epoch : epochEntries .navigableKeySet ()) {
14501493 // remoteLogSize(topicIdPartition, epochEntry.epoch) may not be completely accurate as the remote
@@ -1460,26 +1503,32 @@ private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
14601503 // "DELETE_SEGMENT_FINISHED" means deletion completed, so there is nothing to count.
14611504 if (segmentMetadata .state ().equals (RemoteLogSegmentState .COPY_SEGMENT_FINISHED )) {
14621505 RemoteLogSegmentId segmentId = segmentMetadata .remoteLogSegmentId ();
1463- if (!visitedSegmentIds .contains (segmentId ) && isRemoteSegmentWithinLeaderEpochs (segmentMetadata , logEndOffset , epochEntries )) {
1464- remoteLogSizeBytes += segmentMetadata .segmentSizeInBytes ();
1506+ if (!visitedSegmentIds .contains (segmentId )) {
14651507 visitedSegmentIds .add (segmentId );
1508+ boolean isValid = isRemoteSegmentWithinLeaderEpochs (segmentMetadata , logEndOffset , epochEntries );
1509+ if (isValid ) {
1510+ remoteLogSizeBytes += segmentMetadata .segmentSizeInBytes ();
1511+ } else {
1512+ isAllValid = false ;
1513+ }
14661514 }
14671515 }
14681516 }
14691517 }
1470-
1518+ this . isAllSegmentsValid = isAllValid && fullRemoteLogSizeBytesCopyFinishedSegments == remoteLogSizeBytes ;
14711519 brokerTopicStats .recordRemoteLogSizeComputationTime (topicIdPartition .topic (), topicIdPartition .partition (), time .milliseconds () - startTimeMs );
1472-
1473- // This is the total size of segments in local log that have their base-offset > local-log-start-offset
1474- // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
1475- long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes ;
1476- if (totalSize > retentionSize ) {
1477- long remainingBreachedSize = totalSize - retentionSize ;
1478- RetentionSizeData retentionSizeData = new RetentionSizeData (retentionSize , remainingBreachedSize );
1479- return Optional .of (retentionSizeData );
1480- }
1520+ } else {
1521+ // Once all the segments are valid, then the future segments to be uploaded by this leader are also valid.
1522+ remoteLogSizeBytes = fullRemoteLogSizeBytesCopyFinishedSegments ;
1523+ }
1524+ // This is the total size of segments in local log that have their base-offset > local-log-start-offset
1525+ // and size of the segments in remote storage which have their end-offset < local-log-start-offset.
1526+ long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes ;
1527+ if (totalSize > retentionSize ) {
1528+ long remainingBreachedSize = totalSize - retentionSize ;
1529+ RetentionSizeData retentionSizeData = new RetentionSizeData (retentionSize , remainingBreachedSize );
1530+ return Optional .of (retentionSizeData );
14811531 }
1482-
14831532 return Optional .empty ();
14841533 }
14851534 }
@@ -2185,6 +2234,14 @@ public RetentionSizeData(long retentionSize, long remainingBreachedSize) {
21852234 this .retentionSize = retentionSize ;
21862235 this .remainingBreachedSize = remainingBreachedSize ;
21872236 }
2237+
2238+ long retentionSize () {
2239+ return retentionSize ;
2240+ }
2241+
2242+ long remainingBreachedSize () {
2243+ return remainingBreachedSize ;
2244+ }
21882245 }
21892246
21902247 // Visible for testing
0 commit comments