Skip to content
Open
9 changes: 9 additions & 0 deletions changelog/unreleased/parallelizebackups.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: Parallelize Backup and Restore File Operations
type: changed
authors:
- name: Samuel Verstraete
github: elangelo
links:
- name: PR#4023
url: https://github.com/apache/solr/pull/4023
185 changes: 156 additions & 29 deletions solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,28 @@
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.util.Precision;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.api.model.SolrJerseyResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
Expand All @@ -52,6 +65,15 @@
*/
public class IncrementalShardBackup {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/**
* Maximum number of files to upload in parallel during backup. Can be configured via the system
* property {@code solr.backup.maxparalleluploads} or environment variable {@code
* SOLR_BACKUP_MAXPARALLELUPLOADS}.
*/
private static final int DEFAULT_MAX_PARALLEL_UPLOADS =
EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1);

private SolrCore solrCore;

private BackupFilePaths incBackupFiles;
Expand Down Expand Up @@ -154,8 +176,8 @@ protected IncrementalShardSnapshotResponse backup(final IndexCommit indexCommit)
solrCore.getSolrConfig().indexConfig.lockType);
try {
BackupStats stats = incrementalCopy(files, dir);
details.indexFileCount = stats.fileCount;
details.uploadedIndexFileCount = stats.uploadedFileCount;
details.indexFileCount = stats.fileCount.get();
details.uploadedIndexFileCount = stats.uploadedFileCount.get();
details.indexSizeMB = stats.getIndexSizeMB();
details.uploadedIndexFileMB = stats.getTotalUploadedMB();
} finally {
Expand Down Expand Up @@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String> indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();

for (String fileName : indexFiles) {
Optional<ShardBackupMetadata.BackedFile> opBackedFile = oldBackupPoint.getFile(fileName);
Checksum originalFileCS = backupRepo.checksum(dir, fileName);

if (opBackedFile.isPresent()) {
ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
Checksum existedFileCS = backedFile.fileChecksum;
if (existedFileCS.equals(originalFileCS)) {
currentBackupPoint.addBackedFile(opBackedFile.get());
backupStats.skippedUploadingFile(existedFileCS);
continue;
// Only use an executor for parallel uploads when parallelism > 1
// When set to 1, run synchronously to avoid thread-local state issues with CallerRunsPolicy
int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
ExecutorService executor =
maxParallelUploads > 1
? new ExecutorUtil.MDCAwareThreadPoolExecutor(
0,
maxParallelUploads,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new SolrNamedThreadFactory("IncrementalBackup"),
new ThreadPoolExecutor.CallerRunsPolicy())
: null;

List<Future<?>> uploadFutures = new ArrayList<>();

try {
for (String fileName : indexFiles) {
// Capture variable for lambda
final String fileNameFinal = fileName;

Runnable uploadTask =
() -> {
try {
// Calculate checksum and check if file already exists in previous backup
Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileNameFinal);
Checksum originalFileCS = backupRepo.checksum(dir, fileNameFinal);

if (opBackedFile.isPresent()) {
ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
Checksum existedFileCS = backedFile.fileChecksum;
if (existedFileCS.equals(originalFileCS)) {
synchronized (currentBackupPoint) {
currentBackupPoint.addBackedFile(opBackedFile.get());
}
backupStats.skippedUploadingFile(existedFileCS);
return;
}
}

// File doesn't exist or has changed - upload it
String backedFileName = UUID.randomUUID().toString();
backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, backedFileName);

synchronized (currentBackupPoint) {
currentBackupPoint.addBackedFile(backedFileName, fileNameFinal, originalFileCS);
}
backupStats.uploadedFile(originalFileCS);
} catch (IOException e) {
throw new RuntimeException("Failed to process file: " + fileNameFinal, e);
}
};

if (executor != null) {
uploadFutures.add(executor.submit(uploadTask));
} else {
// Run synchronously when parallelism is 1
try {
uploadTask.run();
} catch (RuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw e;
}
}
}

String backedFileName = UUID.randomUUID().toString();
backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
// Wait for all uploads to complete and collect any errors (only if using executor)
if (executor != null) {
// We need to wait for ALL futures before throwing, otherwise we might exit
// before all successfully uploaded files are added to currentBackupPoint
Throwable firstError = null;
for (Future<?> future : uploadFutures) {
try {
future.get();
} catch (ExecutionException e) {
if (firstError == null) {
Throwable cause = e.getCause();
// Unwrap RuntimeExceptions that wrap the original IOException
if (cause instanceof RuntimeException && cause.getCause() != null) {
firstError = cause.getCause();
} else {
firstError = cause;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (firstError == null) {
firstError = e;
}
}
}

currentBackupPoint.addBackedFile(backedFileName, fileName, originalFileCS);
backupStats.uploadedFile(originalFileCS);
// Now throw the first error we encountered, if any
if (firstError != null) {
if (firstError instanceof Error) {
// Rethrow Errors (like OutOfMemoryError) - don't try to recover
throw (Error) firstError;
} else if (firstError instanceof IOException) {
throw (IOException) firstError;
} else if (firstError instanceof RuntimeException) {
throw (RuntimeException) firstError;
} else if (firstError instanceof InterruptedException) {
throw new IOException("Backup interrupted", firstError);
} else {
throw new IOException("Error during parallel backup upload", firstError);
}
}
}
} finally {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

currentBackupPoint.store(backupRepo, incBackupFiles.getShardBackupMetadataDir(), shardBackupId);
return backupStats;
}

private static class BackupStats {
private int fileCount;
private int uploadedFileCount;
private long indexSize;
private long totalUploadedBytes;
private final AtomicInteger fileCount = new AtomicInteger();
private final AtomicInteger uploadedFileCount = new AtomicInteger();
private final AtomicLong indexSize = new AtomicLong();
private final AtomicLong totalUploadedBytes = new AtomicLong();

public void uploadedFile(Checksum file) {
fileCount++;
uploadedFileCount++;
indexSize += file.size;
totalUploadedBytes += file.size;
fileCount.incrementAndGet();
uploadedFileCount.incrementAndGet();
indexSize.addAndGet(file.size);
totalUploadedBytes.addAndGet(file.size);
}

public void skippedUploadingFile(Checksum existedFile) {
fileCount++;
indexSize += existedFile.size;
fileCount.incrementAndGet();
indexSize.addAndGet(existedFile.size);
}

public double getIndexSizeMB() {
return Precision.round(indexSize / (1024.0 * 1024), 3);
return Precision.round(indexSize.get() / (1024.0 * 1024), 3);
}

public double getTotalUploadedMB() {
return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3);
return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3);
}
}

Expand Down
Loading