diff --git a/changelog/unreleased/parallelizebackups.yml b/changelog/unreleased/parallelizebackups.yml new file mode 100644 index 000000000000..0c78ea06274d --- /dev/null +++ b/changelog/unreleased/parallelizebackups.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Parallelize Backup and Restore File Operations +type: changed +authors: + - name: Samuel Verstraete + github: elangelo +links: +- name: PR#4023 + url: https://github.com/apache/solr/pull/4023 \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java index 0e07ac0ca280..ef9d9223d094 100644 --- a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java +++ b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java @@ -23,15 +23,28 @@ import java.lang.invoke.MethodHandles; import java.net.URI; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.math3.util.Precision; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; import org.apache.solr.client.api.model.SolrJerseyResponse; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.EnvUtils; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.IndexDeletionPolicyWrapper; import org.apache.solr.core.SolrCore; @@ -52,6 +65,15 @@ */ public class IncrementalShardBackup { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Maximum number of files to upload in parallel during backup. Can be configured via the system + * property {@code solr.backup.maxparalleluploads} or environment variable {@code + * SOLR_BACKUP_MAXPARALLELUPLOADS}. + */ + private static final int DEFAULT_MAX_PARALLEL_UPLOADS = + EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1); + private SolrCore solrCore; private BackupFilePaths incBackupFiles; @@ -154,8 +176,8 @@ protected IncrementalShardSnapshotResponse backup(final IndexCommit indexCommit) solrCore.getSolrConfig().indexConfig.lockType); try { BackupStats stats = incrementalCopy(files, dir); - details.indexFileCount = stats.fileCount; - details.uploadedIndexFileCount = stats.uploadedFileCount; + details.indexFileCount = stats.fileCount.get(); + details.uploadedIndexFileCount = stats.uploadedFileCount.get(); details.indexSizeMB = stats.getIndexSizeMB(); details.uploadedIndexFileMB = stats.getTotalUploadedMB(); } finally { @@ -191,25 +213,130 @@ private BackupStats incrementalCopy(Collection indexFiles, Directory dir URI indexDir = incBackupFiles.getIndexDir(); BackupStats backupStats = new BackupStats(); - for (String fileName : indexFiles) { - Optional opBackedFile = oldBackupPoint.getFile(fileName); - Checksum originalFileCS = backupRepo.checksum(dir, fileName); - - if (opBackedFile.isPresent()) { - ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); - Checksum existedFileCS = backedFile.fileChecksum; - if (existedFileCS.equals(originalFileCS)) { - currentBackupPoint.addBackedFile(opBackedFile.get()); - backupStats.skippedUploadingFile(existedFileCS); - continue; + // Only use an executor for parallel uploads when parallelism > 1 + // When set to 1, run synchronously to avoid thread-local state issues with CallerRunsPolicy + int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS; + ExecutorService executor = + maxParallelUploads > 1 + ? new ExecutorUtil.MDCAwareThreadPoolExecutor( + 0, + maxParallelUploads, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new SolrNamedThreadFactory("IncrementalBackup"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; + + List> uploadFutures = new ArrayList<>(); + + try { + for (String fileName : indexFiles) { + // Capture variable for lambda + final String fileNameFinal = fileName; + + Runnable uploadTask = + () -> { + try { + // Calculate checksum and check if file already exists in previous backup + Optional opBackedFile = + oldBackupPoint.getFile(fileNameFinal); + Checksum originalFileCS = backupRepo.checksum(dir, fileNameFinal); + + if (opBackedFile.isPresent()) { + ShardBackupMetadata.BackedFile backedFile = opBackedFile.get(); + Checksum existedFileCS = backedFile.fileChecksum; + if (existedFileCS.equals(originalFileCS)) { + synchronized (currentBackupPoint) { + currentBackupPoint.addBackedFile(opBackedFile.get()); + } + backupStats.skippedUploadingFile(existedFileCS); + return; + } + } + + // File doesn't exist or has changed - upload it + String backedFileName = UUID.randomUUID().toString(); + backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, backedFileName); + + synchronized (currentBackupPoint) { + currentBackupPoint.addBackedFile(backedFileName, fileNameFinal, originalFileCS); + } + backupStats.uploadedFile(originalFileCS); + } catch (IOException e) { + throw new RuntimeException("Failed to process file: " + fileNameFinal, e); + } + }; + + if (executor != null) { + uploadFutures.add(executor.submit(uploadTask)); + } else { + // Run synchronously when parallelism is 1 + try { + uploadTask.run(); + } catch (RuntimeException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw e; + } } } - String backedFileName = UUID.randomUUID().toString(); - backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName); + // Wait for all uploads to complete and collect any errors (only if using executor) + if (executor != null) { + // We need to wait for ALL futures before throwing, otherwise we might exit + // before all successfully uploaded files are added to currentBackupPoint + Throwable firstError = null; + for (Future future : uploadFutures) { + try { + future.get(); + } catch (ExecutionException e) { + if (firstError == null) { + Throwable cause = e.getCause(); + // Unwrap RuntimeExceptions that wrap the original IOException + if (cause instanceof RuntimeException && cause.getCause() != null) { + firstError = cause.getCause(); + } else { + firstError = cause; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = e; + } + } + } - currentBackupPoint.addBackedFile(backedFileName, fileName, originalFileCS); - backupStats.uploadedFile(originalFileCS); + // Now throw the first error we encountered, if any + if (firstError != null) { + if (firstError instanceof Error) { + // Rethrow Errors (like OutOfMemoryError) - don't try to recover + throw (Error) firstError; + } else if (firstError instanceof IOException) { + throw (IOException) firstError; + } else if (firstError instanceof RuntimeException) { + throw (RuntimeException) firstError; + } else if (firstError instanceof InterruptedException) { + throw new IOException("Backup interrupted", firstError); + } else { + throw new IOException("Error during parallel backup upload", firstError); + } + } + } + } finally { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } currentBackupPoint.store(backupRepo, incBackupFiles.getShardBackupMetadataDir(), shardBackupId); @@ -217,29 +344,29 @@ private BackupStats incrementalCopy(Collection indexFiles, Directory dir } private static class BackupStats { - private int fileCount; - private int uploadedFileCount; - private long indexSize; - private long totalUploadedBytes; + private final AtomicInteger fileCount = new AtomicInteger(); + private final AtomicInteger uploadedFileCount = new AtomicInteger(); + private final AtomicLong indexSize = new AtomicLong(); + private final AtomicLong totalUploadedBytes = new AtomicLong(); public void uploadedFile(Checksum file) { - fileCount++; - uploadedFileCount++; - indexSize += file.size; - totalUploadedBytes += file.size; + fileCount.incrementAndGet(); + uploadedFileCount.incrementAndGet(); + indexSize.addAndGet(file.size); + totalUploadedBytes.addAndGet(file.size); } public void skippedUploadingFile(Checksum existedFile) { - fileCount++; - indexSize += existedFile.size; + fileCount.incrementAndGet(); + indexSize.addAndGet(existedFile.size); } public double getIndexSizeMB() { - return Precision.round(indexSize / (1024.0 * 1024), 3); + return Precision.round(indexSize.get() / (1024.0 * 1024), 3); } public double getTotalUploadedMB() { - return Precision.round(totalUploadedBytes / (1024.0 * 1024), 3); + return Precision.round(totalUploadedBytes.get() / (1024.0 * 1024), 3); } } diff --git a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java index 7f4abc18ffdb..ac8bee61fb52 100644 --- a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java +++ b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java @@ -21,19 +21,29 @@ import java.lang.reflect.Array; import java.net.URI; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.EnvUtils; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.SolrCore; import org.apache.solr.core.backup.BackupFilePaths; @@ -48,6 +58,14 @@ public class RestoreCore implements Callable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + /** + * Maximum number of files to download in parallel during restore. Can be configured via the + * system property {@code solr.backup.maxparalleldownloads} or environment variable {@code + * SOLR_BACKUP_MAXPARALLELDOWNLOADS}. + */ + private static final int DEFAULT_MAX_PARALLEL_DOWNLOADS = + EnvUtils.getPropertyAsInteger("solr.backup.maxparalleldownloads", 1); + private final SolrCore core; private RestoreRepository repository; @@ -107,34 +125,140 @@ public boolean doRestore() throws Exception { DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType); Set indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll())); - // Move all files from backupDir to restoreIndexDir - for (String filename : repository.listAllFiles()) { - checkInterrupted(); - try { - if (indexDirFiles.contains(filename)) { - Checksum cs = repository.checksum(filename); - IndexFetcher.CompareResult compareResult; - if (cs == null) { - compareResult = new IndexFetcher.CompareResult(); - compareResult.equal = false; - } else { - compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum); + + // Capture directories as final for lambda access + final Directory finalIndexDir = indexDir; + final Directory finalRestoreIndexDir = restoreIndexDir; + + // Only use an executor for parallel downloads when parallelism > 1 + // When set to 1, run synchronously to avoid thread-local state issues with CallerRunsPolicy + int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS; + ExecutorService executor = + maxParallelDownloads > 1 + ? new ExecutorUtil.MDCAwareThreadPoolExecutor( + 0, + maxParallelDownloads, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new SolrNamedThreadFactory("RestoreCore"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; + + List> downloadFutures = new ArrayList<>(); + + try { + // Move all files from backupDir to restoreIndexDir + for (String filename : repository.listAllFiles()) { + checkInterrupted(); + + // Capture variables for lambda + final String filenameFinal = filename; + final boolean fileExistsLocally = indexDirFiles.contains(filename); + + Runnable downloadTask = + () -> { + try { + if (fileExistsLocally) { + Checksum cs = repository.checksum(filenameFinal); + IndexFetcher.CompareResult compareResult; + if (cs == null) { + compareResult = new IndexFetcher.CompareResult(); + compareResult.equal = false; + } else { + compareResult = + IndexFetcher.compareFile( + finalIndexDir, filenameFinal, cs.size, cs.checksum); + } + if (!compareResult.equal + || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( + filenameFinal, cs.size, compareResult))) { + repository.repoCopy(filenameFinal, finalRestoreIndexDir); + } else { + // prefer local copy + repository.localCopy(finalIndexDir, filenameFinal, finalRestoreIndexDir); + } + } else { + repository.repoCopy(filenameFinal, finalRestoreIndexDir); + } + } catch (Exception e) { + log.warn("Exception while restoring the backup index ", e); + throw new RuntimeException( + "Exception while restoring the backup index for file: " + filenameFinal, e); + } + }; + + if (executor != null) { + downloadFutures.add(executor.submit(downloadTask)); + } else { + // Run synchronously when parallelism is 1 + try { + downloadTask.run(); + } catch (RuntimeException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw e; + } + } + } + + // Wait for all downloads to complete and collect any errors (only if using executor) + if (executor != null) { + // We need to wait for ALL futures to ensure all files are processed + Throwable firstError = null; + for (Future future : downloadFutures) { + try { + future.get(); + } catch (ExecutionException e) { + if (firstError == null) { + Throwable cause = e.getCause(); + // Unwrap RuntimeExceptions that wrap the original IOException + if (cause instanceof RuntimeException && cause.getCause() != null) { + firstError = cause.getCause(); + } else { + firstError = cause; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = e; + } } - if (!compareResult.equal - || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( - filename, cs.size, compareResult))) { - repository.repoCopy(filename, restoreIndexDir); + } + + // Now throw the first error we encountered, if any + if (firstError != null) { + if (firstError instanceof Error) { + // Rethrow Errors (like OutOfMemoryError) - don't try to recover + throw (Error) firstError; + } else if (firstError instanceof IOException) { + throw (IOException) firstError; + } else if (firstError instanceof RuntimeException) { + throw (RuntimeException) firstError; + } else if (firstError instanceof InterruptedException) { + throw new SolrException( + SolrException.ErrorCode.UNKNOWN, "Restore interrupted", firstError); } else { - // prefer local copy - repository.localCopy(indexDir, filename, restoreIndexDir); + throw new SolrException( + SolrException.ErrorCode.UNKNOWN, + "Error during parallel restore download", + firstError); } - } else { - repository.repoCopy(filename, restoreIndexDir); } - } catch (Exception e) { - log.warn("Exception while restoring the backup index ", e); - throw new SolrException( - SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e); + } + } finally { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } } } log.debug("Switching directories"); diff --git a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java index 846563b929f5..ab3de1a75c2f 100644 --- a/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java +++ b/solr/modules/gcs-repository/src/test/org/apache/solr/gcs/GCSIncrementalBackupTest.java @@ -70,6 +70,9 @@ public class GCSIncrementalBackupTest extends AbstractIncrementalBackupTest { @BeforeClass public static void setupClass() throws Exception { + // Enable parallel backup/restore for cloud storage tests + System.setProperty("solr.backup.maxparalleluploads", "2"); + System.setProperty("solr.backup.maxparalleldownloads", "2"); configureCluster(NUM_NODES) // nodes .addConfig("conf1", getFile("conf/solrconfig.xml").getParent()) diff --git a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java index 80c5207505b1..fc57d3350813 100644 --- a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java +++ b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3IncrementalBackupTest.java @@ -87,6 +87,9 @@ public static void ensureCompatibleLocale() { public static void setupClass() throws Exception { System.setProperty("aws.accessKeyId", "foo"); System.setProperty("aws.secretAccessKey", "bar"); + // Enable parallel backup/restore for cloud storage tests + System.setProperty("solr.backup.maxparalleluploads", "2"); + System.setProperty("solr.backup.maxparalleldownloads", "2"); String retryMode; switch (random().nextInt(3)) { case 0: diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc index e6fa3e4d4039..18b8ae5360cb 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc @@ -396,6 +396,39 @@ Any children under the `` tag are passed as additional configuration Information on each of the repository implementations provided with Solr is provided below. +=== Parallel File Transfers + +Backup and restore operations can transfer multiple index files in parallel to improve throughput, especially when using cloud storage repositories like S3 or GCS where latency is higher. +The parallelism is controlled via system properties or environment variables: + +`solr.backup.maxparalleluploads`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `1` +|=== ++ +Maximum number of index files to upload in parallel during backup operations. +Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable. +For cloud storage repositories (S3, GCS), consider setting this to `8` or higher to improve backup performance. + +`solr.backup.maxparalleldownloads`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: `1` +|=== ++ +Maximum number of index files to download in parallel during restore operations. +Can also be set via the `SOLR_BACKUP_MAXPARALLELDOWNLOADS` environment variable. +For cloud storage repositories (S3, GCS), consider setting this to `8` or higher to improve restore performance. + +TIP: When using cloud storage, increasing parallelism significantly improves backup and restore speed by overlapping network latency. +However, higher parallelism also increases memory usage. +Start with a value of `8` and adjust based on your available heap memory and network bandwidth. + +=== Checksum Verification + By default, all the repository implementations verify the integrity of the index files before they are copied to the destination. However, it is possible to disable this integrity check by setting the optional configuration property `verifyChecksum`. `verifyChecksum`::