Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ private Path toPathIfExists(File file) {

class RLMExpirationTask extends RLMTask {
private final Logger logger;
private volatile boolean isAllSegmentsValid = false;

public RLMExpirationTask(TopicIdPartition topicIdPartition) {
super(topicIdPartition);
Expand All @@ -1148,6 +1149,16 @@ protected void execute(UnifiedLog log) throws InterruptedException, RemoteStorag
cleanupExpiredRemoteLogSegments();
}

@Override
public void cancel() {
isAllSegmentsValid = false;
super.cancel();
}

boolean isAllSegmentsValid() {
return isAllSegmentsValid;
}

public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset);
updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
Expand Down Expand Up @@ -1260,7 +1271,47 @@ private void updateRemoteDeleteLagWith(int segmentsLeftToDelete, long sizeOfDele
brokerTopicStats.recordRemoteDeleteLagBytes(topic, partition, sizeOfDeletableSegmentsBytes);
}

/** Cleanup expired and dangling remote log segments. */
private static class RemoteLogMetadataStats {
private final Set<Integer> epochsSet;
private final int metadataCount;
private final long sizeInBytes;
private final long copyFinishedSegmentsSizeInBytes;

private RemoteLogMetadataStats(Set<Integer> epochsSet, int metadataCount, long sizeInBytes, long copyFinishedSegmentsSizeInBytes) {
this.epochsSet = epochsSet;
this.metadataCount = metadataCount;
this.sizeInBytes = sizeInBytes;
this.copyFinishedSegmentsSizeInBytes = copyFinishedSegmentsSizeInBytes;
}
}

private RemoteLogMetadataStats calculateMetadataAndSize(Iterator<RemoteLogSegmentMetadata> segmentMetadataIter) {
// Good to have an API from RLMM to get the RemoteLogMetadataStats instead of going through all the segments
// and building it here.
Set<Integer> epochsSet = new HashSet<>();
int metadataCount = 0;
long sizeInBytes = 0;
long copyFinishedSegmentsSizeInBytes = 0;
while (segmentMetadataIter.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
metadataCount++;
RemoteLogSegmentState state = segmentMetadata.state();
// COPY_SEGMENT_STARTED state is excluded from the `sizeInBytes` calculation as it can pollute the
// metric during the upload retries.
if (state == RemoteLogSegmentState.COPY_SEGMENT_FINISHED) {
copyFinishedSegmentsSizeInBytes += segmentMetadata.segmentSizeInBytes();
sizeInBytes += segmentMetadata.segmentSizeInBytes();
} else if (state == RemoteLogSegmentState.DELETE_SEGMENT_STARTED) {
sizeInBytes += segmentMetadata.segmentSizeInBytes();
}
}
return new RemoteLogMetadataStats(epochsSet, metadataCount, sizeInBytes, copyFinishedSegmentsSizeInBytes);
}

/**
* Cleanup expired and dangling remote log segments.
*/
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
Expand All @@ -1276,29 +1327,17 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
final UnifiedLog log = logOptional.get();

// Cleanup remote log segments and update the log start offset if applicable.
final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition);
if (!segmentMetadataIter.hasNext()) {
Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition);
RemoteLogMetadataStats stats = calculateMetadataAndSize(segmentMetadataIter);
if (stats.metadataCount == 0) {
updateMetadataCountAndLogSizeWith(0, 0);
logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
return;
}

final Set<Integer> epochsSet = new HashSet<>();
int metadataCount = 0;
long remoteLogSizeBytes = 0;
// Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
// instead of going through all the segments and building it here.
while (segmentMetadataIter.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
metadataCount++;
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
}

updateMetadataCountAndLogSizeWith(metadataCount, remoteLogSizeBytes);
updateMetadataCountAndLogSizeWith(stats.metadataCount, stats.sizeInBytes);

// All the leader epochs in sorted order that exists in remote storage
final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
final List<Integer> remoteLeaderEpochs = new ArrayList<>(stats.epochsSet);
Collections.sort(remoteLeaderEpochs);

LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache();
Expand All @@ -1308,7 +1347,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
long logStartOffset = log.logStartOffset();
long logEndOffset = log.logEndOffset();
Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize,
log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets);
log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets, stats.copyFinishedSegmentsSizeInBytes);
Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs);

RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Expand Down Expand Up @@ -1443,13 +1482,20 @@ private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
: Optional.empty();
}

private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
long onlyLocalLogSegmentsSize,
long logEndOffset,
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
if (retentionSize > -1) {
long startTimeMs = time.milliseconds();
long remoteLogSizeBytes = 0L;
Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
long onlyLocalLogSegmentsSize,
long logEndOffset,
NavigableMap<Integer, Long> epochEntries,
long fullCopyFinishedSegmentsSizeInBytes) throws RemoteStorageException {
if (retentionSize < 0 || (onlyLocalLogSegmentsSize + fullCopyFinishedSegmentsSizeInBytes) <= retentionSize) {
return Optional.empty();
}
// compute valid remote-log size in bytes for the current partition if the size of the partition exceeds
// the configured limit.
long startTimeMs = time.milliseconds();
long remoteLogSizeBytes = 0L;
if (!isAllSegmentsValid) {
boolean isAllValid = true;
Set<RemoteLogSegmentId> visitedSegmentIds = new HashSet<>();
for (Integer epoch : epochEntries.navigableKeySet()) {
// remoteLogSize(topicIdPartition, epochEntry.epoch) may not be completely accurate as the remote
Expand All @@ -1465,26 +1511,33 @@ private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
// "DELETE_SEGMENT_FINISHED" means deletion completed, so there is nothing to count.
if (segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId();
if (!visitedSegmentIds.contains(segmentId) && isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) {
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
if (!visitedSegmentIds.contains(segmentId)) {
visitedSegmentIds.add(segmentId);
boolean isValid = isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries);
if (isValid) {
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
} else {
isAllValid = false;
}
}
}
}
}

brokerTopicStats.recordRemoteLogSizeComputationTime(topicIdPartition.topic(), topicIdPartition.partition(), time.milliseconds() - startTimeMs);

// This is the total size of segments in local log that have their base-offset > local-log-start-offset
// and size of the segments in remote storage which have their end-offset < local-log-start-offset.
long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
if (totalSize > retentionSize) {
long remainingBreachedSize = totalSize - retentionSize;
RetentionSizeData retentionSizeData = new RetentionSizeData(retentionSize, remainingBreachedSize);
return Optional.of(retentionSizeData);
}
this.isAllSegmentsValid = isAllValid && fullCopyFinishedSegmentsSizeInBytes == remoteLogSizeBytes;
} else {
// Once all the segments are valid, then the future segments to be uploaded by this leader are also valid.
remoteLogSizeBytes = fullCopyFinishedSegmentsSizeInBytes;
}
brokerTopicStats.recordRemoteLogSizeComputationTime(topicIdPartition.topic(), topicIdPartition.partition(),
time.milliseconds() - startTimeMs);
// This is the total size of segments in local log that have their base-offset > local-log-start-offset
// and size of the segments in remote storage which have their end-offset < local-log-start-offset.
long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
if (totalSize > retentionSize) {
long remainingBreachedSize = totalSize - retentionSize;
RetentionSizeData retentionSizeData = new RetentionSizeData(retentionSize, remainingBreachedSize);
return Optional.of(retentionSizeData);
}

return Optional.empty();
}
}
Expand Down Expand Up @@ -2190,6 +2243,14 @@ public RetentionSizeData(long retentionSize, long remainingBreachedSize) {
this.retentionSize = retentionSize;
this.remainingBreachedSize = remainingBreachedSize;
}

long retentionSize() {
return retentionSize;
}

long remainingBreachedSize() {
return remainingBreachedSize;
}
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception {
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", 1000000L);
logProps.put("retention.bytes", 5000L);
logProps.put("retention.ms", -1L);
LogConfig logConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(logConfig);
Expand Down Expand Up @@ -2177,6 +2177,80 @@ public void testRemoteSizeData() {
}
}

@Test
public void testBuildRetentionSizeData() throws RemoteStorageException {
long retentionSize = 1000L;
long onlyLocalLogSegmentsSize = 500L;
long logEndOffset = 100L;
NavigableMap<Integer, Long> epochEntries = new TreeMap<>();
epochEntries.put(0, 0L);
long fullCopyFinishedSegmentsSizeInBytes = 1600L;
RemoteLogManager.RLMExpirationTask expirationTask = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
assertFalse(expirationTask.isAllSegmentsValid());

// 1. retentionSize < 0
Optional<RemoteLogManager.RetentionSizeData> result = expirationTask
.buildRetentionSizeData(-1L, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, fullCopyFinishedSegmentsSizeInBytes);
assertFalse(result.isPresent());
assertFalse(expirationTask.isAllSegmentsValid());

// 2. When (onlyLocalLogSegmentsSize + fullCopyFinishedSegmentsSizeInBytes) <= configure-retention-size
result = expirationTask
.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 500L);
assertFalse(result.isPresent());
assertFalse(expirationTask.isAllSegmentsValid());

// 3. totalSize <= retentionSize
// totalSize = 500 (local) + 0 (remote, as listRemoteLogSegments returns empty) = 500. retentionSize = 1000.
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
.thenReturn(Collections.emptyIterator());
result = expirationTask
.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, fullCopyFinishedSegmentsSizeInBytes);
assertFalse(result.isPresent());
assertFalse(expirationTask.isAllSegmentsValid());

// 4. totalSize > retentionSize
// Each remote log segment size is 1000 bytes.
// totalSize = 500 (local) + 1000 (remote) = 1500. retentionSize = 1000.
AtomicInteger invocationCount = new AtomicInteger(0);
RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(0, 50, Collections.singletonMap(0, 0L));
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), eq(0)))
.thenAnswer(invocation -> {
invocationCount.incrementAndGet();
return Collections.singletonList(segmentMetadata).iterator();
});

result = expirationTask
.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, fullCopyFinishedSegmentsSizeInBytes);
assertTrue(result.isPresent());
assertEquals(1000L, result.get().retentionSize());
assertEquals(500L, result.get().remainingBreachedSize()); // (500 + 1000) - 1000 = 500
assertFalse(expirationTask.isAllSegmentsValid());
assertEquals(1, invocationCount.get());

// 5. Provide the valid `fullCopyFinishedSegmentsSizeInBytes` size
result = expirationTask
.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 1000L);
assertTrue(result.isPresent());
assertEquals(1000L, result.get().retentionSize());
assertEquals(500L, result.get().remainingBreachedSize()); // (500 + 1000) - 1000 = 500
assertTrue(expirationTask.isAllSegmentsValid());
assertEquals(2, invocationCount.get());

// Once all the segments are validated and the computed segmentSize for listRemoteLogSegments(tpId) and
// listRemoteLogSegments(tpId, epoch) are same, then the next calls to `buildRetentionSizeData` should not
// invoke listRemoteLogSegments(tpId, epoch) again.
result = expirationTask
.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 1000L);
assertTrue(result.isPresent());
assertEquals(500L, result.get().remainingBreachedSize());
assertEquals(2, invocationCount.get());
assertTrue(expirationTask.isAllSegmentsValid());

expirationTask.cancel();
assertFalse(expirationTask.isAllSegmentsValid());
}

@SuppressWarnings("unchecked")
@Test
public void testRemoteSizeTime() {
Expand Down