Skip to content

Commit 72d0416

Browse files
authored
fix: Bug report: The latest version is unable to backup or restore large data sets (#789)
- Use the correct index when looking up file metadata by Ids - Index archived file metadata by file metadata Ids - Close the backup pipeline only after completing the whole backup Resolves #786 {patch} Signed-off-by: Esta Nagy <nagyesta@gmail.com>
1 parent 2f0ed97 commit 72d0416

File tree

10 files changed

+124
-82
lines changed

10 files changed

+124
-82
lines changed

file-barj-core/src/main/java/com/github/nagyesta/filebarj/core/backup/pipeline/BackupController.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ private void calculateBackupDelta() {
222222
}
223223
}
224224

225+
@SuppressWarnings("java:S4087") //the pipeline must be closed at the right moment to finalize the process
225226
private void executeBackup(final int threads) {
226227
final var startTimeMillis = System.currentTimeMillis();
227228
final var fileMetadataSetRepository = dataStore().fileMetadataSetRepository();
@@ -245,6 +246,8 @@ private void executeBackup(final int threads) {
245246
hashAlgorithm,
246247
threadPool,
247248
batch -> processScope(batch, pipeline));
249+
//calling close explicitly to trigger finalization of the index and to know a final list of files
250+
pipeline.close();
248251
final var dataFiles = pipeline.getDataFilesWritten().stream()
249252
.map(Path::getFileName)
250253
.map(Path::toString)
@@ -286,7 +289,6 @@ private void processScope(
286289
.flatMap(Collection::stream)
287290
.forEach(metadata -> fileMetadataSetRepository
288291
.updateArchiveMetadataId(backupFileSet, metadata.getId(), metadata.getArchiveMetadataId()));
289-
pipeline.close();
290292
} catch (final Exception e) {
291293
throw new ArchivalException("Failed to store files: " + e.getMessage(), e);
292294
}

file-barj-core/src/main/java/com/github/nagyesta/filebarj/core/persistence/inmemory/InMemoryArchivedFileMetadataSetRepository.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class InMemoryArchivedFileMetadataSetRepository
1818
extends InMemoryBaseFileSetRepository<ArchivedFileMetadataSetId, ArchivedFileMetadata>
1919
implements ArchivedFileMetadataSetRepository {
2020

21+
private final Map<UUID, Map<UUID, ArchivedFileMetadata>> metadataByFileSetAndArchiveFileId = new ConcurrentHashMap<>();
2122
private final Map<UUID, Map<UUID, ArchivedFileMetadata>> metadataByFileSetAndFileId = new ConcurrentHashMap<>();
2223
private final Map<UUID, Map<ArchiveEntryLocator, Set<ArchivedFileMetadata>>> metadataByFileSetAndLocator = new ConcurrentHashMap<>();
2324

@@ -31,8 +32,10 @@ public void appendTo(
3132
@NotNull final ArchivedFileMetadataSetId id,
3233
@NotNull final Collection<ArchivedFileMetadata> values) {
3334
super.appendTo(id, values);
35+
final var mapByArchiveFileId = metadataByFileSetAndArchiveFileId.computeIfAbsent(id.id(), k -> new ConcurrentHashMap<>());
36+
values.forEach(metadata -> mapByArchiveFileId.put(metadata.getId(), metadata));
3437
final var mapByFileId = metadataByFileSetAndFileId.computeIfAbsent(id.id(), k -> new ConcurrentHashMap<>());
35-
values.forEach(metadata -> mapByFileId.put(metadata.getId(), metadata));
38+
values.forEach(metadata -> metadata.getFiles().forEach(fileId -> mapByFileId.put(fileId, metadata)));
3639
final var mapByLocator = metadataByFileSetAndLocator.computeIfAbsent(id.id(), k -> new ConcurrentHashMap<>());
3740
values.forEach(metadata -> mapByLocator
3841
.computeIfAbsent(metadata.getArchiveLocation(), k -> new ConcurrentSkipListSet<>()).add(metadata));
@@ -41,13 +44,15 @@ public void appendTo(
4144
@Override
4245
public void removeFileSet(@NotNull final ArchivedFileMetadataSetId id) {
4346
super.removeFileSet(id);
47+
metadataByFileSetAndArchiveFileId.remove(id.id());
4448
metadataByFileSetAndFileId.remove(id.id());
4549
metadataByFileSetAndLocator.remove(id.id());
4650
}
4751

4852
@Override
4953
public void close() {
5054
super.close();
55+
metadataByFileSetAndArchiveFileId.clear();
5156
metadataByFileSetAndFileId.clear();
5257
metadataByFileSetAndLocator.clear();
5358
}
@@ -77,10 +82,14 @@ public Set<UUID> containsFileMetadataIds(
7782
public Map<UUID, ArchivedFileMetadata> findByFileMetadataIds(
7883
@NotNull final ArchivedFileMetadataSetId id,
7984
@NotNull final Collection<UUID> fileMetadataIds) {
85+
final var map = metadataByFileSetAndFileId.get(id.id());
86+
if (map == null) {
87+
return Collections.emptyMap();
88+
}
8089
final var result = new ConcurrentHashMap<UUID, ArchivedFileMetadata>();
81-
getFileSetById(id).forEach(metadata -> fileMetadataIds.stream()
82-
.filter(metadata.getFiles()::contains)
83-
.forEach(fileMetadataId -> result.put(fileMetadataId, metadata)));
90+
fileMetadataIds.stream()
91+
.filter(map::containsKey)
92+
.forEach(fileMetadataId -> result.put(fileMetadataId, map.get(fileMetadataId)));
8493
return result;
8594
}
8695

@@ -89,7 +98,7 @@ public ArchivedFileMetadataSetId intersectWithFileMetadata(
8998
@NotNull final ArchivedFileMetadataSetId id,
9099
@NotNull final FileMetadataSetId fileMetadataSetId) {
91100
final var result = createFileSet();
92-
final var archivedMetadataByFileId = metadataByFileSetAndFileId.get(id.id());
101+
final var archivedMetadataByFileId = metadataByFileSetAndArchiveFileId.get(id.id());
93102
dataStore().fileMetadataSetRepository()
94103
.forEach(fileMetadataSetId,
95104
dataStore().singleThreadedPool(),

file-barj-core/src/main/java/com/github/nagyesta/filebarj/core/persistence/inmemory/InMemoryFileMetadataSetRepository.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,13 @@ public FileMetadataSetId keepChangedMetadata(
271271
public SortedSet<FileMetadata> findFilesByIds(
272272
@NotNull final FileMetadataSetId id,
273273
@NotNull final Set<UUID> files) {
274-
return getFileSetById(id).stream()
275-
.filter(file -> files.contains(file.getId()))
274+
final var fileMetadataMap = metadataByFileSetAndFileId.get(id.id());
275+
if (fileMetadataMap == null) {
276+
return Collections.emptySortedSet();
277+
}
278+
return files.stream()
279+
.filter(fileMetadataMap::containsKey)
280+
.map(fileMetadataMap::get)
276281
.collect(Collectors.toCollection(TreeSet::new));
277282
}
278283

file-barj-job/src/main/java/com/github/nagyesta/filebarj/job/Controller.java

Lines changed: 74 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.github.nagyesta.filebarj.core.inspect.pipeline.InspectParameters;
1414
import com.github.nagyesta.filebarj.core.merge.MergeController;
1515
import com.github.nagyesta.filebarj.core.merge.MergeParameters;
16+
import com.github.nagyesta.filebarj.core.model.AppVersion;
1617
import com.github.nagyesta.filebarj.core.restore.pipeline.RestoreController;
1718
import com.github.nagyesta.filebarj.core.restore.pipeline.RestoreParameters;
1819
import com.github.nagyesta.filebarj.io.stream.crypto.EncryptionUtil;
@@ -124,51 +125,54 @@ public void run() throws Exception {
124125
protected void doInspectContent(final InspectIncrementContentsProperties properties) {
125126
final var kek = getPrivateKey(properties.getKeyProperties());
126127
final var startTimeMillis = System.currentTimeMillis();
127-
log.info("Bootstrapping inspect content operation...");
128+
log.info("Bootstrapping inspect content operation with version {}...", AppVersion.DEFAULT_VERSION);
128129
final var pointInTimeEpochSeconds = properties.getPointInTimeEpochSeconds();
129-
new IncrementInspectionController(
130-
InspectParameters.builder()
131-
.backupDirectory(properties.getBackupSource())
132-
.fileNamePrefix(properties.getPrefix())
133-
.kek(kek)
134-
.build())
135-
.inspectContent(pointInTimeEpochSeconds, properties.getOutputFile());
136-
final var endTimeMillis = System.currentTimeMillis();
137-
final var durationMillis = (endTimeMillis - startTimeMillis);
138-
log.info("Increment content inspection operation completed. Total time: {}", toProcessSummary(durationMillis));
130+
final var parameters = InspectParameters.builder()
131+
.backupDirectory(properties.getBackupSource())
132+
.fileNamePrefix(properties.getPrefix())
133+
.kek(kek)
134+
.build();
135+
try (var controller = new IncrementInspectionController(parameters)) {
136+
controller.inspectContent(pointInTimeEpochSeconds, properties.getOutputFile());
137+
final var endTimeMillis = System.currentTimeMillis();
138+
final var durationMillis = (endTimeMillis - startTimeMillis);
139+
log.info("Increment content inspection operation completed. Total time: {}", toProcessSummary(durationMillis));
140+
}
139141
}
140142

141143
@SuppressWarnings("java:S106")
142144
protected void doInspectIncrements(final InspectIncrementsProperties properties) {
143145
final var kek = getPrivateKey(properties.getKeyProperties());
144146
final var startTimeMillis = System.currentTimeMillis();
145-
log.info("Bootstrapping inspect increments operation...");
146-
new IncrementInspectionController(
147-
InspectParameters.builder()
148-
.backupDirectory(properties.getBackupSource())
149-
.fileNamePrefix(properties.getPrefix())
150-
.kek(kek)
151-
.build())
152-
.inspectIncrements(System.out);
153-
final var endTimeMillis = System.currentTimeMillis();
154-
final var durationMillis = (endTimeMillis - startTimeMillis);
155-
log.info("Backup increments inspection operation completed. Total time: {}", toProcessSummary(durationMillis));
147+
log.info("Bootstrapping inspect increments operation with version {}...", AppVersion.DEFAULT_VERSION);
148+
final var parameters = InspectParameters.builder()
149+
.backupDirectory(properties.getBackupSource())
150+
.fileNamePrefix(properties.getPrefix())
151+
.kek(kek)
152+
.build();
153+
try (var controller = new IncrementInspectionController(parameters)) {
154+
controller.inspectIncrements(System.out);
155+
final var endTimeMillis = System.currentTimeMillis();
156+
final var durationMillis = (endTimeMillis - startTimeMillis);
157+
log.info("Backup increments inspection operation completed. Total time: {}", toProcessSummary(durationMillis));
158+
}
156159
}
157160

158161
protected void doDeleteIncrements(final DeleteIncrementsProperties properties) {
159162
final var kek = getPrivateKey(properties.getKeyProperties());
160163
final var startTimeMillis = System.currentTimeMillis();
161-
log.info("Bootstrapping delete increments operation...");
162-
new IncrementDeletionController(
163-
IncrementDeletionParameters.builder()
164-
.backupDirectory(properties.getBackupSource())
165-
.fileNamePrefix(properties.getPrefix())
166-
.kek(kek)
167-
.build())
168-
.deleteIncrementsUntilNextFullBackupAfter(properties.getAfterEpochSeconds());
169-
final var endTimeMillis = System.currentTimeMillis();
170-
final var durationMillis = (endTimeMillis - startTimeMillis);
171-
log.info("Increment deletion operation completed. Total time: {}", toProcessSummary(durationMillis));
164+
log.info("Bootstrapping delete increments operation with version {}...", AppVersion.DEFAULT_VERSION);
165+
final var parameters = IncrementDeletionParameters.builder()
166+
.backupDirectory(properties.getBackupSource())
167+
.fileNamePrefix(properties.getPrefix())
168+
.kek(kek)
169+
.build();
170+
try (var controller = new IncrementDeletionController(parameters)) {
171+
controller.deleteIncrementsUntilNextFullBackupAfter(properties.getAfterEpochSeconds());
172+
final var endTimeMillis = System.currentTimeMillis();
173+
final var durationMillis = (endTimeMillis - startTimeMillis);
174+
log.info("Increment deletion operation completed. Total time: {}", toProcessSummary(durationMillis));
175+
}
172176
}
173177

174178
protected void doGenerateKey(final KeyStoreProperties properties) {
@@ -189,7 +193,7 @@ protected void doRestore(final RestoreProperties properties) {
189193
.collect(Collectors.toSet())
190194
);
191195
final var startTimeMillis = System.currentTimeMillis();
192-
log.info("Bootstrapping restore operation...");
196+
log.info("Bootstrapping restore operation with version {}...", AppVersion.DEFAULT_VERSION);
193197
final var restoreTask = RestoreTask.builder()
194198
.restoreTargets(restoreTargets)
195199
.threads(properties.getThreads())
@@ -198,51 +202,54 @@ protected void doRestore(final RestoreProperties properties) {
198202
.includedPath(properties.getIncludedPath())
199203
.permissionComparisonStrategy(properties.getPermissionComparisonStrategy())
200204
.build();
201-
new RestoreController(
202-
RestoreParameters.builder()
203-
.backupDirectory(properties.getBackupSource())
204-
.fileNamePrefix(properties.getPrefix())
205-
.kek(kek)
206-
.atPointInTime(properties.getPointInTimeEpochSeconds())
207-
.build())
208-
.execute(restoreTask);
209-
final var endTimeMillis = System.currentTimeMillis();
210-
final var durationMillis = (endTimeMillis - startTimeMillis);
211-
log.info("Restore operation completed. Total time: {}", toProcessSummary(durationMillis));
205+
final var parameters = RestoreParameters.builder()
206+
.backupDirectory(properties.getBackupSource())
207+
.fileNamePrefix(properties.getPrefix())
208+
.kek(kek)
209+
.atPointInTime(properties.getPointInTimeEpochSeconds())
210+
.build();
211+
try (var controller = new RestoreController(parameters)) {
212+
controller.execute(restoreTask);
213+
final var endTimeMillis = System.currentTimeMillis();
214+
final var durationMillis = (endTimeMillis - startTimeMillis);
215+
log.info("Restore operation completed. Total time: {}", toProcessSummary(durationMillis));
216+
}
212217
}
213218

214219
protected void doMerge(final MergeProperties properties) {
215220
final var kek = getPrivateKey(properties.getKeyProperties());
216221
final var startTimeMillis = System.currentTimeMillis();
217-
log.info("Bootstrapping merge operation...");
218-
new MergeController(
219-
MergeParameters.builder()
220-
.backupDirectory(properties.getBackupSource())
221-
.fileNamePrefix(properties.getPrefix())
222-
.kek(kek)
223-
.rangeStartEpochSeconds(properties.getFromTimeEpochSeconds())
224-
.rangeEndEpochSeconds(properties.getToTimeEpochSeconds())
225-
.build())
226-
.execute(properties.isDeleteObsoleteFiles());
227-
final var endTimeMillis = System.currentTimeMillis();
228-
final var durationMillis = (endTimeMillis - startTimeMillis);
229-
log.info("Merge operation completed. Total time: {}", toProcessSummary(durationMillis));
222+
log.info("Bootstrapping merge operation with version {}...", AppVersion.DEFAULT_VERSION);
223+
final var parameters = MergeParameters.builder()
224+
.backupDirectory(properties.getBackupSource())
225+
.fileNamePrefix(properties.getPrefix())
226+
.kek(kek)
227+
.rangeStartEpochSeconds(properties.getFromTimeEpochSeconds())
228+
.rangeEndEpochSeconds(properties.getToTimeEpochSeconds())
229+
.build();
230+
try (var controller = new MergeController(parameters)) {
231+
controller.execute(properties.isDeleteObsoleteFiles());
232+
final var endTimeMillis = System.currentTimeMillis();
233+
final var durationMillis = (endTimeMillis - startTimeMillis);
234+
log.info("Merge operation completed. Total time: {}", toProcessSummary(durationMillis));
235+
}
230236
}
231237

232238
protected void doBackup(final BackupProperties properties) throws IOException {
233239
final var config = new ObjectMapper().reader()
234240
.readValue(properties.getConfig().toFile(), BackupJobConfiguration.class);
235241
final var startTimeMillis = System.currentTimeMillis();
236242
log.info("Bootstrapping backup operation...");
237-
new BackupController(
238-
BackupParameters.builder()
239-
.job(config)
240-
.forceFull(properties.isForceFullBackup())
241-
.build())
242-
.execute(properties.getThreads());
243-
final var endTimeMillis = System.currentTimeMillis();
244-
final var durationMillis = (endTimeMillis - startTimeMillis);
245-
log.info("Backup operation completed. Total time: {}", toProcessSummary(durationMillis));
243+
final var parameters = BackupParameters.builder()
244+
.job(config)
245+
.forceFull(properties.isForceFullBackup())
246+
.build();
247+
try (var controller = new BackupController(parameters)) {
248+
controller.execute(properties.getThreads());
249+
final var endTimeMillis = System.currentTimeMillis();
250+
final var durationMillis = (endTimeMillis - startTimeMillis);
251+
log.info("Backup operation completed. Total time: {}", toProcessSummary(durationMillis));
252+
}
246253
}
247254

248255
@SuppressWarnings("java:S106")

file-barj-job/src/main/resources/logback.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
<pattern>%white(%d{HH:mm:ss.SSS}) %boldYellow([%8.8thread{8}]) %highlight(%-5level) %highlight(%25.25logger{0}) - %msg%n%ex{3}</pattern>
1111
</encoder>
1212
</appender>
13+
14+
<logger name="com.github.nagyesta" level="INFO"/>
1315
<root level="INFO">
1416
<appender-ref ref="FILE"/>
1517
<appender-ref ref="STDOUT"/>

file-barj-stream-io/src/main/java/com/github/nagyesta/filebarj/io/stream/BarjCargoArchiverFileOutputStream.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ protected void doOnEntityClosed(final @Nullable BarjCargoEntityIndex entityToInd
8888

8989
private void writeEntityToIndex(final @NotNull BarjCargoEntityIndex entityIndex) throws IOException {
9090
try {
91-
final var prefix = entryIndexPrefix(entryCount());
91+
final var entryIndex = entryCount();
92+
log.trace("Writing entry index: {}", entryIndex);
93+
final var prefix = entryIndexPrefix(entryIndex);
9294
this.indexStreamWriter.write(entityIndex.toProperties(prefix));
9395
this.indexStreamWriter.flush();
9496
} catch (final IllegalArgumentException e) {
@@ -102,6 +104,7 @@ private void writeIndexFileHeader() throws IOException {
102104

103105
private void writeIndexFileFooter() throws IOException {
104106
final var lastChunk = getCurrentFilePath();
107+
log.debug("Writing last chunk of index: {}", lastChunk);
105108
final var footer = ArchiveIndexV2.builder()
106109
.numberOfChunks(getCurrentChunkIndex())
107110
.lastChunkSizeInBytes(lastChunk.toFile().length())

0 commit comments

Comments
 (0)