Skip to content

Commit cf0f252

Browse files
committed
BlobAnalyzeAction: rough first cut at copy
The control flow is a bit of a mess.
1 parent 23ac1bd commit cf0f252

File tree

4 files changed

+162
-8
lines changed

4 files changed

+162
-8
lines changed

x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.hamcrest.Matchers.anyOf;
7575
import static org.hamcrest.Matchers.containsString;
7676
import static org.hamcrest.Matchers.equalTo;
77+
import static org.hamcrest.Matchers.instanceOf;
7778
import static org.hamcrest.Matchers.nullValue;
7879

7980
public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
@@ -172,6 +173,21 @@ public byte[] onRead(byte[] actualContents, long position, long length) {
172173
assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage());
173174
}
174175

176+
public void testFailsOnCopyAfterWrite() {
177+
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
178+
request.maxBlobSize(ByteSizeValue.ofBytes(10L));
179+
request.abortWritePermitted(false);
180+
181+
blobStore.setDisruption(new Disruption() {
182+
@Override
183+
public void onCopy() throws IOException {
184+
throw new IOException("simulated");
185+
}
186+
});
187+
188+
assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage());
189+
}
190+
175191
public void testFailsOnChecksumMismatch() {
176192
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
177193
request.maxBlobSize(ByteSizeValue.ofBytes(10L));
@@ -593,6 +609,8 @@ default byte[] onRead(byte[] actualContents, long position, long length) throws
593609

594610
default void onWrite() throws IOException {}
595611

612+
default void onCopy() throws IOException {}
613+
596614
default Map<String, BlobMetadata> onList(Map<String, BlobMetadata> actualListing) throws IOException {
597615
return actualListing;
598616
}
@@ -751,6 +769,25 @@ private void writeBlobAtomic(String blobName, InputStream inputStream, boolean f
751769
blobs.put(blobName, contents);
752770
}
753771

772+
@Override
773+
public void copyBlob(
774+
OperationPurpose purpose,
775+
BlobContainer sourceBlobContainer,
776+
String sourceBlobName,
777+
String blobName,
778+
long blobSize
779+
) throws IOException {
780+
assertThat(sourceBlobContainer, instanceOf(DisruptableBlobContainer.class));
781+
assertPurpose(purpose);
782+
final var source = (DisruptableBlobContainer) sourceBlobContainer;
783+
final var sourceBlob = source.blobs.get(sourceBlobName);
784+
if (sourceBlob == null) {
785+
throw new FileNotFoundException(sourceBlobName + " not found");
786+
}
787+
disruption.onCopy();
788+
blobs.put(blobName, sourceBlob);
789+
}
790+
754791
@Override
755792
public DeleteResult delete(OperationPurpose purpose) throws IOException {
756793
assertPurpose(purpose);

x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import static org.hamcrest.Matchers.equalTo;
7171
import static org.hamcrest.Matchers.greaterThan;
7272
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
73+
import static org.hamcrest.Matchers.instanceOf;
7374
import static org.hamcrest.Matchers.lessThanOrEqualTo;
7475
import static org.hamcrest.Matchers.nullValue;
7576
import static org.hamcrest.Matchers.startsWith;
@@ -469,6 +470,24 @@ private void writeBlobAtomic(String blobName, InputStream inputStream, long blob
469470
}
470471
}
471472

473+
@Override
474+
public void copyBlob(
475+
OperationPurpose purpose,
476+
BlobContainer sourceBlobContainer,
477+
String sourceBlobName,
478+
String blobName,
479+
long blobSize
480+
) throws IOException {
481+
assertPurpose(purpose);
482+
assertThat(sourceBlobContainer, instanceOf(AssertingBlobContainer.class));
483+
final var source = (AssertingBlobContainer) sourceBlobContainer;
484+
final var sourceBlob = source.blobs.get(sourceBlobName);
485+
if (sourceBlob == null) {
486+
throw new FileNotFoundException(sourceBlobName + " not found");
487+
}
488+
blobs.put(blobName, sourceBlob);
489+
}
490+
472491
@Override
473492
public DeleteResult delete(OperationPurpose purpose) {
474493
assertPurpose(purpose);

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/BlobAnalyzeAction.java

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
import org.elasticsearch.xcontent.ToXContentObject;
4848
import org.elasticsearch.xcontent.XContentBuilder;
4949

50+
import java.io.FileNotFoundException;
5051
import java.io.IOException;
52+
import java.nio.file.NoSuchFileException;
5153
import java.util.ArrayList;
5254
import java.util.Collection;
5355
import java.util.Collections;
@@ -70,6 +72,11 @@
7072
* version of the blob, but again must not yield partial data). Usually, however, we write once and only read after the write completes, and
7173
* in this case we insist that the read succeeds.
7274
*
75+
* The writer may also attempt to copy the blob, either just before the write completes (which may fail with not found)
76+
* or after (which should not fail). The writer may overwrite the source while the copy is in progress. If a copy is attempted,
77+
* readers will read the copy instead of the original. As above, if the copy succeeds, then readers should see a complete copy.
78+
* If the source is overwritten while the copy is in progress, readers may see either the original blob or the new one but no
79+
* mixture or partial result.
7380
*
7481
* <pre>
7582
*
@@ -83,6 +90,12 @@
8390
* | Write blob with random content | |
8491
* |-----------------------------------→| |
8592
* | | |
93+
* | Copy blob during write (rarely) | |
94+
* |-----------------------------------→| |
95+
* | | |
96+
* | Copy complete | |
97+
* |←-----------------------------------| |
98+
* | | |
8699
* | Read range during write (rarely) | |
87100
* |----------------------------------------------------------------------------→|
88101
* | | |
@@ -106,6 +119,18 @@
106119
* |-| Read phase | | |
107120
* | |------------| | |
108121
* | | |
122+
* | Copy blob (rarely) | |
123+
* |-----------------------------------→| |
124+
* | | |
125+
* | TODO: Overwrite source (rarely) | |
126+
* |-----------------------------------→| |
127+
* | | |
128+
* | Overwrite complete | |
129+
* |←-----------------------------------| |
130+
* | | |
131+
* | Copy complete | |
132+
* |←-----------------------------------| |
133+
* | | |
109134
* | Read range [a,b) | |
110135
* |----------------------------------------------------------------------------→|
111136
* | | |
@@ -199,6 +224,9 @@ private static class BlobAnalysis {
199224
private final boolean checksumWholeBlob;
200225
private final long checksumStart;
201226
private final long checksumEnd;
227+
// If a copy is requested, do exactly one so that the number of blobs created is controlled by RepositoryAnalyzeAction.
228+
// Doing the copy in step 1 exercises copy before read completes. Step 2 exercises copy after read completes or the happy path.
229+
private final boolean doEarlyCopy;
202230
private final List<DiscoveryNode> earlyReadNodes;
203231
private final List<DiscoveryNode> readNodes;
204232
private final GroupedActionListener<NodeResponse> readNodesListener;
@@ -230,6 +258,7 @@ private static class BlobAnalysis {
230258
checksumStart = randomLongBetween(0L, request.targetLength);
231259
checksumEnd = randomLongBetween(checksumStart + 1, request.targetLength + 1);
232260
}
261+
doEarlyCopy = random.nextBoolean();
233262

234263
final ArrayList<DiscoveryNode> nodes = new ArrayList<>(request.nodes); // copy for shuffling purposes
235264
if (request.readEarly) {
@@ -368,11 +397,38 @@ public StreamInput streamInput() throws IOException {
368397
}
369398

370399
private void onLastReadForInitialWrite() {
400+
var readBlobName = request.blobName;
401+
if (request.doCopy && doEarlyCopy) {
402+
try {
403+
final var copyName = request.blobName + "_copy";
404+
blobContainer.copyBlob(
405+
OperationPurpose.REPOSITORY_ANALYSIS,
406+
blobContainer,
407+
request.blobName,
408+
copyName,
409+
request.targetLength
410+
);
411+
readBlobName = copyName;
412+
} catch (UnsupportedOperationException uoe) {
413+
// not all repositories support copy
414+
} catch (NoSuchFileException | FileNotFoundException ignored) {
415+
// assume this is due to copy starting before the source was finished
416+
logger.trace("copy FNF before write completed: {}", request.blobName);
417+
} catch (IOException e) {
418+
if (request.getAbortWrite() == false) {
419+
throw new RepositoryVerificationException(
420+
request.getRepositoryName(),
421+
"failed to copy blob before write: [" + request.blobName + "]",
422+
e
423+
);
424+
}
425+
}
426+
}
371427
if (earlyReadNodes.isEmpty() == false) {
372428
if (logger.isTraceEnabled()) {
373429
logger.trace("sending read request to [{}] for [{}] before write complete", earlyReadNodes, request.getDescription());
374430
}
375-
readOnNodes(earlyReadNodes, true);
431+
readOnNodes(earlyReadNodes, readBlobName, true);
376432
}
377433
if (request.getAbortWrite()) {
378434
throw new BlobWriteAbortedException();
@@ -383,10 +439,37 @@ private void doReadAfterWrite() {
383439
if (logger.isTraceEnabled()) {
384440
logger.trace("sending read request to [{}] for [{}] after write complete", readNodes, request.getDescription());
385441
}
386-
readOnNodes(readNodes, false);
442+
var readBlobName = request.blobName;
443+
if (request.doCopy && (doEarlyCopy == false) && (request.getAbortWrite() == false)) {
444+
try {
445+
final var copyName = request.blobName + "_copy";
446+
blobContainer.copyBlob(
447+
OperationPurpose.REPOSITORY_ANALYSIS,
448+
blobContainer,
449+
request.blobName,
450+
copyName,
451+
request.targetLength
452+
);
453+
readBlobName = copyName;
454+
} catch (UnsupportedOperationException uoe) {
455+
// not all repositories support copy
456+
} catch (IOException e) {
457+
for (int i = 0; i < readNodes.size(); i++) {
458+
readNodesListener.onFailure(
459+
new RepositoryVerificationException(
460+
request.getRepositoryName(),
461+
"failed to copy blob after write: [" + request.blobName + "]",
462+
e
463+
)
464+
);
465+
}
466+
return;
467+
}
468+
}
469+
readOnNodes(readNodes, readBlobName, false);
387470
}
388471

389-
private void readOnNodes(List<DiscoveryNode> nodes, boolean beforeWriteComplete) {
472+
private void readOnNodes(List<DiscoveryNode> nodes, String blobName, boolean beforeWriteComplete) {
390473
for (DiscoveryNode node : nodes) {
391474
if (task.isCancelled()) {
392475
// record dummy response since we're already on the path to failure
@@ -396,7 +479,7 @@ private void readOnNodes(List<DiscoveryNode> nodes, boolean beforeWriteComplete)
396479
} else {
397480
// no need for extra synchronization after checking if we were cancelled a couple of lines ago -- we haven't notified
398481
// the outer listener yet so any bans on the children are still in place
399-
final GetBlobChecksumAction.Request blobChecksumRequest = getBlobChecksumRequest();
482+
final GetBlobChecksumAction.Request blobChecksumRequest = getBlobChecksumRequest(blobName);
400483
transportService.sendChildRequest(
401484
node,
402485
GetBlobChecksumAction.NAME,
@@ -432,11 +515,11 @@ public void onFailure(Exception e) {
432515
}
433516
}
434517

435-
private GetBlobChecksumAction.Request getBlobChecksumRequest() {
518+
private GetBlobChecksumAction.Request getBlobChecksumRequest(String blobName) {
436519
return new GetBlobChecksumAction.Request(
437520
request.getRepositoryName(),
438521
request.getBlobPath(),
439-
request.getBlobName(),
522+
blobName,
440523
checksumStart,
441524
checksumWholeBlob ? 0L : checksumEnd
442525
);
@@ -650,6 +733,7 @@ static class Request extends ActionRequest {
650733
private final boolean readEarly;
651734
private final boolean writeAndOverwrite;
652735
private final boolean abortWrite;
736+
private final boolean doCopy;
653737

654738
Request(
655739
String repositoryName,
@@ -662,7 +746,8 @@ static class Request extends ActionRequest {
662746
int earlyReadNodeCount,
663747
boolean readEarly,
664748
boolean writeAndOverwrite,
665-
boolean abortWrite
749+
boolean abortWrite,
750+
boolean doCopy
666751
) {
667752
assert 0 < targetLength;
668753
assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
@@ -678,6 +763,7 @@ static class Request extends ActionRequest {
678763
this.readEarly = readEarly;
679764
this.writeAndOverwrite = writeAndOverwrite;
680765
this.abortWrite = abortWrite;
766+
this.doCopy = doCopy;
681767
}
682768

683769
Request(StreamInput in) throws IOException {
@@ -693,6 +779,8 @@ static class Request extends ActionRequest {
693779
readEarly = in.readBoolean();
694780
writeAndOverwrite = in.readBoolean();
695781
abortWrite = in.readBoolean();
782+
// BWC
783+
doCopy = in.readBoolean();
696784
}
697785

698786
@Override
@@ -709,6 +797,8 @@ public void writeTo(StreamOutput out) throws IOException {
709797
out.writeBoolean(readEarly);
710798
out.writeBoolean(writeAndOverwrite);
711799
out.writeBoolean(abortWrite);
800+
// BWC
801+
out.writeBoolean(doCopy);
712802
}
713803

714804
@Override

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,13 @@ public void run() {
521521
final long targetLength = blobSizes.get(i);
522522
final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
523523
final boolean abortWrite = smallBlob && request.isAbortWritePermitted() && rarely(random);
524+
final boolean doCopy = random.nextBoolean();
525+
if (doCopy) {
526+
i++;
527+
if (i >= request.getBlobCount()) {
528+
break;
529+
}
530+
}
524531
final BlobAnalyzeAction.Request blobAnalyzeRequest = new BlobAnalyzeAction.Request(
525532
request.getRepositoryName(),
526533
blobPath,
@@ -532,7 +539,8 @@ public void run() {
532539
request.getEarlyReadNodeCount(),
533540
smallBlob && rarely(random),
534541
repository.supportURLRepo() && repository.hasAtomicOverwrites() && smallBlob && rarely(random) && abortWrite == false,
535-
abortWrite
542+
abortWrite,
543+
doCopy
536544
);
537545
final DiscoveryNode node = nodes.get(random.nextInt(nodes.size()));
538546
queue.add(ref -> runBlobAnalysis(ref, blobAnalyzeRequest, node));

0 commit comments

Comments
 (0)