Skip to content

Commit 171833f

Browse files
committed
Implementing read retries.
Moved logic for doing retries into JobImpl, as both reads and writes can use the same logic.
1 parent 08df9f8 commit 171833f

File tree

7 files changed

+226
-72
lines changed

7 files changed

+226
-72
lines changed

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,50 @@
1717

1818
import com.google.common.collect.Lists;
1919
import com.spectralogic.ds3client.Ds3Client;
20+
import com.spectralogic.ds3client.Ds3ClientImpl;
2021
import com.spectralogic.ds3client.commands.GetObjectRequest;
2122
import com.spectralogic.ds3client.commands.GetObjectResponse;
2223
import com.spectralogic.ds3client.commands.PutObjectRequest;
2324
import com.spectralogic.ds3client.commands.spectrads3.GetJobSpectraS3Request;
2425
import com.spectralogic.ds3client.commands.spectrads3.GetJobSpectraS3Response;
2526
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers;
27+
import com.spectralogic.ds3client.helpers.FileObjectGetter;
28+
import com.spectralogic.ds3client.helpers.FileObjectPutter;
2629
import com.spectralogic.ds3client.helpers.options.ReadJobOptions;
2730
import com.spectralogic.ds3client.integration.test.helpers.ABMTestHelper;
31+
import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShim;
2832
import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds;
2933
import com.spectralogic.ds3client.integration.test.helpers.TempStorageUtil;
3034
import com.spectralogic.ds3client.models.ChecksumType;
3135
import com.spectralogic.ds3client.models.Priority;
3236
import com.spectralogic.ds3client.models.bulk.Ds3Object;
3337
import com.spectralogic.ds3client.utils.ResourceUtils;
38+
import org.apache.commons.io.FileUtils;
3439
import org.junit.AfterClass;
3540
import org.junit.BeforeClass;
3641
import org.junit.Test;
3742
import org.slf4j.Logger;
3843
import org.slf4j.LoggerFactory;
3944

45+
import java.io.File;
4046
import java.io.IOException;
47+
import java.lang.reflect.InvocationTargetException;
4148
import java.net.URISyntaxException;
4249
import java.nio.channels.SeekableByteChannel;
4350
import java.nio.channels.WritableByteChannel;
4451
import java.nio.file.Files;
4552
import java.nio.file.Path;
53+
import java.nio.file.Paths;
54+
import java.util.ArrayList;
55+
import java.util.Arrays;
56+
import java.util.List;
4657
import java.util.UUID;
4758

4859
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
4960
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
5061
import static org.hamcrest.Matchers.is;
5162
import static org.junit.Assert.assertThat;
63+
import static org.junit.Assert.assertTrue;
5264

5365
public class GetJobManagement_Test {
5466

@@ -128,6 +140,74 @@ public void createReadJob() throws IOException, InterruptedException, URISyntaxE
128140
assertThat(jobSpectraS3Response.getStatusCode(), is(200));
129141
}
130142

143+
private void putBigFile() throws IOException, URISyntaxException {
144+
final String DIR_NAME = "largeFiles/";
145+
final String[] FILE_NAMES = new String[] { "lesmis-copies.txt" };
146+
147+
final Path dirPath = ResourceUtils.loadFileResource(DIR_NAME);
148+
149+
final List<String> bookTitles = new ArrayList<>();
150+
final List<Ds3Object> objects = new ArrayList<>();
151+
for (final String book : FILE_NAMES) {
152+
final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + book);
153+
final long bookSize = Files.size(objPath);
154+
final Ds3Object obj = new Ds3Object(book, bookSize);
155+
156+
bookTitles.add(book);
157+
objects.add(obj);
158+
}
159+
160+
final int maxNumBlockAllocationRetries = 1;
161+
final int maxNumObjectTransferAttempts = 3;
162+
final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(client,
163+
maxNumBlockAllocationRetries,
164+
maxNumObjectTransferAttempts);
165+
166+
final Ds3ClientHelpers.Job writeJob = ds3ClientHelpers.startWriteJob(BUCKET_NAME, objects);
167+
writeJob.transfer(new FileObjectPutter(dirPath));
168+
}
169+
170+
@Test
171+
public void createReadJobWithBigFile() throws IOException, URISyntaxException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
172+
putBigFile();
173+
174+
final String tempPathPrefix = null;
175+
final Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix);
176+
177+
try {
178+
final String DIR_NAME = "largeFiles/";
179+
final String FILE_NAME = "lesmis-copies.txt";
180+
181+
final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + FILE_NAME);
182+
final long bookSize = Files.size(objPath);
183+
final Ds3Object obj = new Ds3Object(FILE_NAME, bookSize);
184+
185+
final Ds3ClientShim ds3ClientShim = new Ds3ClientShim((Ds3ClientImpl)client);
186+
187+
final int maxNumBlockAllocationRetries = 1;
188+
final int maxNumObjectTransferAttempts = 3;
189+
final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(ds3ClientShim,
190+
maxNumBlockAllocationRetries,
191+
maxNumObjectTransferAttempts);
192+
193+
final Ds3ClientHelpers.Job readJob = ds3ClientHelpers.startReadJob(BUCKET_NAME, Arrays.asList(obj));
194+
195+
final GetJobSpectraS3Response jobSpectraS3Response = ds3ClientShim
196+
.getJobSpectraS3(new GetJobSpectraS3Request(readJob.getJobId()));
197+
198+
assertThat(jobSpectraS3Response.getStatusCode(), is(200));
199+
200+
readJob.transfer(new FileObjectGetter(tempDirectory));
201+
202+
final File originalFile = ResourceUtils.loadFileResource(DIR_NAME + FILE_NAME).toFile();
203+
final File fileCopiedFromBP = Paths.get(tempDirectory.toString(), FILE_NAME).toFile();
204+
assertTrue(FileUtils.contentEquals(originalFile, fileCopiedFromBP));
205+
206+
} finally {
207+
FileUtils.deleteDirectory(tempDirectory.toFile());
208+
}
209+
}
210+
131211
@Test
132212
public void createReadJobWithPriorityOption() throws IOException,
133213
InterruptedException, URISyntaxException {
@@ -165,4 +245,4 @@ public void createReadJobWithNameAndPriorityOptions() throws IOException,
165245
assertThat(jobSpectraS3Response.getMasterObjectListResult().getName(), is("test_job"));
166246
assertThat(jobSpectraS3Response.getMasterObjectListResult().getPriority(), is(Priority.LOW));
167247
}
168-
}
248+
}

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@
3232
import com.spectralogic.ds3client.integration.test.helpers.TempStorageUtil;
3333
import com.spectralogic.ds3client.models.*;
3434
import com.spectralogic.ds3client.models.bulk.Ds3Object;
35-
import com.spectralogic.ds3client.networking.ConnectionDetails;
3635
import com.spectralogic.ds3client.networking.FailedRequestException;
37-
import com.spectralogic.ds3client.networking.NetworkClient;
38-
import com.spectralogic.ds3client.networking.NetworkClientImpl;
3936
import com.spectralogic.ds3client.utils.ByteArraySeekableByteChannel;
4037
import com.spectralogic.ds3client.utils.ResourceUtils;
4138
import org.apache.commons.io.FileUtils;
@@ -45,7 +42,6 @@
4542
import java.io.File;
4643
import java.io.IOException;
4744
import java.lang.reflect.InvocationTargetException;
48-
import java.lang.reflect.Method;
4945
import java.net.URISyntaxException;
5046
import java.nio.ByteBuffer;
5147
import java.nio.channels.SeekableByteChannel;
@@ -56,6 +52,7 @@
5652
import java.util.Date;
5753
import java.util.List;
5854
import java.util.UUID;
55+
import com.spectralogic.ds3client.integration.test.helpers.Ds3ClientShim;
5956

6057
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
6158
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
@@ -907,53 +904,6 @@ public void objectCompleted(final String name) {
907904
}
908905
}
909906

910-
private static class Ds3ClientShim extends Ds3ClientImpl {
911-
private static Method getNetClientMethod = null;
912-
913-
int numRetries = 0;
914-
915-
static {
916-
try {
917-
getNetClientMethod = Ds3ClientImpl.class.getDeclaredMethod("getNetClient");
918-
} catch (final NoSuchMethodException e) {
919-
fail("Could not find Ds3ClientImpl method getNetClient.");
920-
}
921-
922-
getNetClientMethod.setAccessible(true);
923-
}
924-
925-
public Ds3ClientShim(final NetworkClient netClient) {
926-
super(netClient);
927-
}
928-
929-
public Ds3ClientShim(final Ds3ClientImpl ds3ClientImpl) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
930-
this((NetworkClient)getNetClientMethod.invoke(ds3ClientImpl));
931-
}
932-
933-
@Override
934-
public PutObjectResponse putObject(final PutObjectRequest request) throws IOException {
935-
if(numRetries++ >= 1) {
936-
return super.putObject(request);
937-
}
938-
939-
throw new Ds3NoMoreRetriesException(1);
940-
}
941-
942-
@Override
943-
public Ds3Client newForNode(final JobNode node) {
944-
final ConnectionDetails newConnectionDetails;
945-
try {
946-
newConnectionDetails = ((NetworkClient)getNetClientMethod.invoke(this)).getConnectionDetails();
947-
final NetworkClient newNetClient = new NetworkClientImpl(newConnectionDetails);
948-
return new Ds3ClientShim(newNetClient);
949-
} catch (final IllegalAccessException | InvocationTargetException e) {
950-
fail("Failure trying to create Ds3Client used in verifying putObject retries: " + e.getMessage());
951-
}
952-
953-
return null;
954-
}
955-
}
956-
957907
private interface ObjectTransferExceptionHandler {
958908
boolean handleException(final Throwable t);
959909
}
@@ -975,4 +925,4 @@ public boolean handleException(final Throwable t) {
975925
}
976926
});
977927
}
978-
}
928+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* ****************************************************************************
3+
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
5+
* this file except in compliance with the License. A copy of the License is located at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* or in the "license" file accompanying this file.
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
* ****************************************************************************
14+
*/
15+
16+
package com.spectralogic.ds3client.integration.test.helpers;
17+
18+
import com.spectralogic.ds3client.Ds3Client;
19+
import com.spectralogic.ds3client.Ds3ClientImpl;
20+
import com.spectralogic.ds3client.commands.GetObjectRequest;
21+
import com.spectralogic.ds3client.commands.GetObjectResponse;
22+
import com.spectralogic.ds3client.commands.PutObjectRequest;
23+
import com.spectralogic.ds3client.commands.PutObjectResponse;
24+
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
25+
import com.spectralogic.ds3client.models.JobNode;
26+
import com.spectralogic.ds3client.networking.ConnectionDetails;
27+
import com.spectralogic.ds3client.networking.NetworkClient;
28+
import com.spectralogic.ds3client.networking.NetworkClientImpl;
29+
30+
import java.io.IOException;
31+
import java.lang.reflect.InvocationTargetException;
32+
import java.lang.reflect.Method;
33+
34+
import static org.junit.Assert.fail;
35+
36+
public class Ds3ClientShim extends Ds3ClientImpl {
37+
private static Method getNetClientMethod = null;
38+
39+
int numRetries = 0;
40+
41+
static {
42+
try {
43+
getNetClientMethod = Ds3ClientImpl.class.getDeclaredMethod("getNetClient");
44+
} catch (final NoSuchMethodException e) {
45+
fail("Could not find Ds3ClientImpl method getNetClient.");
46+
}
47+
48+
getNetClientMethod.setAccessible(true);
49+
}
50+
51+
public Ds3ClientShim(final NetworkClient netClient) {
52+
super(netClient);
53+
}
54+
55+
public Ds3ClientShim(final Ds3ClientImpl ds3ClientImpl) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
56+
this((NetworkClient)getNetClientMethod.invoke(ds3ClientImpl));
57+
}
58+
59+
@Override
60+
public PutObjectResponse putObject(final PutObjectRequest request) throws IOException {
61+
if (numRetries++ >= 1) {
62+
return super.putObject(request);
63+
}
64+
65+
throw new Ds3NoMoreRetriesException(1);
66+
}
67+
68+
@Override
69+
public GetObjectResponse getObject(final GetObjectRequest request) throws IOException {
70+
if (numRetries++ >= 1) {
71+
return super.getObject(request);
72+
}
73+
74+
throw new Ds3NoMoreRetriesException(1);
75+
}
76+
77+
@Override
78+
public Ds3Client newForNode(final JobNode node) {
79+
final ConnectionDetails newConnectionDetails;
80+
try {
81+
newConnectionDetails = ((NetworkClient)getNetClientMethod.invoke(this)).getConnectionDetails();
82+
final NetworkClient newNetClient = new NetworkClientImpl(newConnectionDetails);
83+
return new Ds3ClientShim(newNetClient);
84+
} catch (final IllegalAccessException | InvocationTargetException e) {
85+
fail("Failure trying to create Ds3Client used in verifying putObject retries: " + e.getMessage());
86+
}
87+
88+
return null;
89+
}
90+
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpersImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public Ds3ClientHelpersImpl(final Ds3Client client, final int retryAfter, final
7979
public Ds3ClientHelpersImpl(final Ds3Client client, final int retryAfter, final int objectTransferAttempts, final int retryDelay) {
8080
this(client, retryAfter, objectTransferAttempts, retryDelay, new SameThreadEventRunner());
8181
}
82+
8283
public Ds3ClientHelpersImpl(final Ds3Client client, final int retryAfter, final int objectTransferAttempts, final int retryDelay, final EventRunner eventRunner) {
8384
this.client = client;
8485
this.retryAfter = retryAfter;
@@ -148,7 +149,14 @@ private Ds3ClientHelpers.Job innerStartReadJob(final String bucket, final Iterab
148149

149150
final ImmutableMultimap<String, Range> partialRanges = PartialObjectHelpers.getPartialObjectsRanges(objects);
150151

151-
return new ReadJobImpl(this.client, prime.getResult(), partialRanges, this.retryAfter, this.retryDelay, this.eventRunner);
152+
return new ReadJobImpl(
153+
this.client,
154+
prime.getResult(),
155+
partialRanges,
156+
this.objectTransferAttempts,
157+
this.retryAfter,
158+
this.retryDelay,
159+
this.eventRunner);
152160
}
153161

154162
@Override
@@ -207,6 +215,7 @@ public Ds3ClientHelpers.Job recoverReadJob(final UUID jobId) throws IOException,
207215
this.client,
208216
jobResponse.getMasterObjectListResult(),
209217
ImmutableMultimap.<String, Range>of(),
218+
this.objectTransferAttempts,
210219
this.retryAfter,
211220
this.retryDelay,
212221
this.eventRunner);

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,23 @@
1717

1818
import com.spectralogic.ds3client.Ds3Client;
1919
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.Job;
20+
import com.spectralogic.ds3client.models.BulkObject;
2021
import com.spectralogic.ds3client.models.MasterObjectList;
2122

23+
import java.io.IOException;
2224
import java.util.UUID;
2325

2426
abstract class JobImpl implements Job {
2527
protected final Ds3Client client;
2628
protected final MasterObjectList masterObjectList;
2729
protected boolean running = false;
2830
protected int maxParallelRequests = 10;
31+
private final int objectTransferAttempts;
2932

30-
public JobImpl(final Ds3Client client, final MasterObjectList masterObjectList) {
33+
public JobImpl(final Ds3Client client, final MasterObjectList masterObjectList, final int objectTransferAttempts) {
3134
this.client = client;
3235
this.masterObjectList = masterObjectList;
36+
this.objectTransferAttempts = objectTransferAttempts;
3337
}
3438

3539
@Override
@@ -58,4 +62,23 @@ protected void checkRunning() {
5862
if (running) throw new IllegalStateException("You cannot modify a job after calling transfer");
5963
}
6064

65+
protected void transferItem(
66+
final Ds3Client client,
67+
final BulkObject ds3Object,
68+
final ChunkTransferrer.ItemTransferrer itemTransferrer)
69+
throws IOException
70+
{
71+
int objectTransfersAttempted = 0;
72+
73+
while(true) {
74+
try {
75+
itemTransferrer.transferItem(client, ds3Object);
76+
break;
77+
} catch (final Throwable t) {
78+
if (ExceptionClassifier.isUnrecoverableException(t) || ++objectTransfersAttempted >= objectTransferAttempts) {
79+
throw t;
80+
}
81+
}
82+
}
83+
}
6184
}

0 commit comments

Comments
 (0)