Skip to content

Commit 72b4818

Browse files
authored
Merge pull request #363 from rpmoore/streaming_gets
Streaming gets
2 parents c01b1f0 + 458ba93 commit 72b4818

File tree

11 files changed

+562
-272
lines changed

11 files changed

+562
-272
lines changed

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ChunkTransferrer.java

Lines changed: 36 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,100 +15,79 @@
1515

1616
package com.spectralogic.ds3client.helpers;
1717

18-
import com.google.common.collect.ImmutableMap;
1918
import com.google.common.util.concurrent.Futures;
2019
import com.google.common.util.concurrent.ListenableFuture;
2120
import com.google.common.util.concurrent.ListeningExecutorService;
2221
import com.google.common.util.concurrent.MoreExecutors;
2322
import com.spectralogic.ds3client.Ds3Client;
2423
import com.spectralogic.ds3client.models.BulkObject;
25-
import com.spectralogic.ds3client.models.Objects;
26-
import com.spectralogic.ds3client.models.JobNode;
24+
import com.spectralogic.ds3client.helpers.strategy.BlobStrategy;
2725
import org.slf4j.Logger;
2826
import org.slf4j.LoggerFactory;
2927

28+
import java.io.Closeable;
3029
import java.io.IOException;
31-
import java.util.*;
30+
import java.util.ArrayList;
31+
import java.util.List;
3232
import java.util.concurrent.Callable;
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.Executors;
3535

36-
class ChunkTransferrer {
36+
class ChunkTransferrer implements Closeable {
3737
private final static Logger LOG = LoggerFactory.getLogger(ChunkTransferrer.class);
38+
3839
private final ItemTransferrer itemTransferrer;
39-
private final Ds3Client mainClient;
4040
private final JobPartTracker partTracker;
41-
private final int maxParallelRequests;
41+
private final ListeningExecutorService executor;
4242

4343
public interface ItemTransferrer {
4444
void transferItem(Ds3Client client, BulkObject ds3Object) throws IOException;
4545
}
4646

4747
public ChunkTransferrer(
4848
final ItemTransferrer transferrer,
49-
final Ds3Client mainClient,
5049
final JobPartTracker partTracker,
5150
final int maxParallelRequests) {
5251
this.itemTransferrer = transferrer;
53-
this.mainClient = mainClient;
5452
this.partTracker = partTracker;
55-
this.maxParallelRequests = maxParallelRequests;
56-
}
57-
58-
public void transferChunks(
59-
final Iterable<JobNode> nodes,
60-
final Iterable<Objects> chunks)
61-
throws IOException {
62-
LOG.debug("Getting ready to process chunks");
63-
final ImmutableMap<UUID, JobNode> nodeMap = buildNodeMap(nodes);
6453
LOG.debug("Starting executor service");
65-
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxParallelRequests));
54+
executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxParallelRequests));
6655
LOG.debug("Executor service started");
67-
try {
68-
final List<ListenableFuture<?>> tasks = new ArrayList<>();
69-
for (final Objects chunk : chunks) {
70-
LOG.debug("Processing parts for chunk: {}", chunk.getChunkId().toString());
71-
final Ds3Client client = getClient(nodeMap, chunk.getNodeId(), mainClient);
72-
for (final BulkObject ds3Object : chunk.getObjects()) {
73-
final ObjectPart part = new ObjectPart(ds3Object.getOffset(), ds3Object.getLength());
74-
if (this.partTracker.containsPart(ds3Object.getName(), part)) {
75-
LOG.debug("Adding {} to executor for processing", ds3Object.getName());
76-
tasks.add(executor.submit(new Callable<Object>() {
77-
@Override
78-
public Object call() throws Exception {
79-
LOG.debug("Processing {}", ds3Object.getName());
80-
ChunkTransferrer.this.itemTransferrer.transferItem(client, ds3Object);
81-
ChunkTransferrer.this.partTracker.completePart(ds3Object.getName(), part);
82-
return null;
83-
}
84-
}));
85-
}
86-
}
87-
}
88-
executeWithExceptionHandling(tasks);
89-
} finally {
90-
LOG.debug("Shutting down executor");
91-
executor.shutdown();
92-
}
9356
}
9457

95-
private static Ds3Client getClient(final ImmutableMap<UUID, JobNode> nodeMap, final UUID nodeId, final Ds3Client mainClient) {
96-
final JobNode jobNode = nodeMap.get(nodeId);
58+
public void transferChunks(
59+
final BlobStrategy blobStrategy)
60+
throws IOException, InterruptedException {
61+
final List<ListenableFuture<?>> tasks = new ArrayList<>();
62+
63+
final Iterable<JobPart> work = blobStrategy.getWork();
64+
65+
for (final JobPart jobPart : work) {
66+
final BulkObject blob = jobPart.getBulkObject();
67+
final ObjectPart part = new ObjectPart(blob.getOffset(), blob.getLength());
9768

98-
if (jobNode == null) {
99-
LOG.warn("The jobNode was not found, returning the existing client");
100-
return mainClient;
69+
if (this.partTracker.containsPart(blob.getName(), part)) {
70+
LOG.debug("Adding {} offset {} to executor for processing", blob.getName(), blob.getOffset());
71+
tasks.add(executor.submit(new Callable<Object>() {
72+
@Override
73+
public Object call() throws Exception {
74+
LOG.debug("Processing {} offset {}", blob.getName(), blob.getOffset());
75+
ChunkTransferrer.this.itemTransferrer.transferItem(jobPart.getClient(), blob);
76+
blobStrategy.blobCompleted(blob);
77+
ChunkTransferrer.this.partTracker.completePart(blob.getName(), part);
78+
return null;
79+
}
80+
}));
81+
}
10182
}
10283

103-
return mainClient.newForNode(jobNode);
84+
executeWithExceptionHandling(tasks);
10485
}
10586

106-
private static ImmutableMap<UUID, JobNode> buildNodeMap(final Iterable<JobNode> nodes) {
107-
final ImmutableMap.Builder<UUID, JobNode> nodeMap = ImmutableMap.builder();
108-
for (final JobNode node: nodes) {
109-
nodeMap.put(node.getId(), node);
110-
}
111-
return nodeMap.build();
87+
@Override
88+
public void close() throws IOException {
89+
LOG.debug("Shutting down executor");
90+
executor.shutdown();
11291
}
11392

11493
private static void executeWithExceptionHandling(final List<ListenableFuture<?>> tasks)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* ****************************************************************************
3+
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
5+
* this file except in compliance with the License. A copy of the License is located at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* or in the "license" file accompanying this file.
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
* ****************************************************************************
14+
*/
15+
16+
package com.spectralogic.ds3client.helpers;
17+
18+
import com.spectralogic.ds3client.Ds3Client;
19+
import com.spectralogic.ds3client.models.BulkObject;
20+
21+
public class JobPart {
22+
private final Ds3Client client;
23+
private final BulkObject bulkObject;
24+
25+
public JobPart(final Ds3Client client, final BulkObject bulkObject) {
26+
this.client = client;
27+
this.bulkObject = bulkObject;
28+
}
29+
30+
public Ds3Client getClient() {
31+
return client;
32+
}
33+
34+
public BulkObject getBulkObject() {
35+
return bulkObject;
36+
}
37+
38+
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java

Lines changed: 28 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
import com.spectralogic.ds3client.Ds3Client;
2020
import com.spectralogic.ds3client.commands.GetObjectRequest;
2121
import com.spectralogic.ds3client.commands.GetObjectResponse;
22-
import com.spectralogic.ds3client.commands.spectrads3.GetJobChunksReadyForClientProcessingSpectraS3Request;
23-
import com.spectralogic.ds3client.commands.spectrads3.GetJobChunksReadyForClientProcessingSpectraS3Response;
24-
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
2522
import com.spectralogic.ds3client.helpers.ChunkTransferrer.ItemTransferrer;
2623
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder;
2724
import com.spectralogic.ds3client.helpers.events.EventRunner;
25+
import com.spectralogic.ds3client.helpers.strategy.BlobStrategy;
26+
import com.spectralogic.ds3client.helpers.strategy.GetStreamerStrategy;
2827
import com.spectralogic.ds3client.helpers.util.PartialObjectHelpers;
2928
import com.spectralogic.ds3client.models.*;
3029
import com.spectralogic.ds3client.models.common.Range;
@@ -47,8 +46,6 @@ class ReadJobImpl extends JobImpl {
4746
private final int retryDelay; // Negative value represents default
4847
private final EventRunner eventRunner;
4948

50-
private int retryAfterLeft; // The number of retries left
51-
5249
public ReadJobImpl(
5350
final Ds3Client client,
5451
final MasterObjectList masterObjectList,
@@ -69,7 +66,7 @@ public ReadJobImpl(
6966
this.waitingForChunksListeners = Sets.newIdentityHashSet();
7067
this.eventRunner = eventRunner;
7168

72-
this.retryAfter = this.retryAfterLeft = retryAfter;
69+
this.retryAfter = retryAfter;
7370
this.retryDelay = retryDelay;
7471
}
7572

@@ -155,18 +152,38 @@ public Ds3ClientHelpers.Job withChecksum(final ChecksumFunction checksumFunction
155152
public void transfer(final ObjectChannelBuilder channelBuilder)
156153
throws IOException {
157154
running = true;
155+
156+
final BlobStrategy strategy = new GetStreamerStrategy(
157+
client,
158+
this.masterObjectList,
159+
retryAfter,
160+
retryDelay,
161+
new GetStreamerStrategy.ChunkEventHandler() {
162+
@Override
163+
public void emitWaitingForChunksEvents(final int secondsToDelay) {
164+
for (final WaitingForChunksListener waitingForChunksListener : waitingForChunksListeners) {
165+
eventRunner.emitEvent(new Runnable() {
166+
@Override
167+
public void run() {
168+
waitingForChunksListener.waiting(secondsToDelay);
169+
}
170+
});
171+
}
172+
}
173+
});
174+
158175
try (final JobState jobState = new JobState(
159176
channelBuilder,
160177
this.masterObjectList.getObjects(),
161178
partTracker, blobToRanges)) {
162-
final ChunkTransferrer chunkTransferrer = new ChunkTransferrer(
179+
try (final ChunkTransferrer chunkTransferrer = new ChunkTransferrer(
163180
new GetObjectTransferrerRetryDecorator(jobState),
164-
this.client,
165181
jobState.getPartTracker(),
166182
this.maxParallelRequests
167-
);
168-
while (jobState.hasObjects()) {
169-
transferNextChunks(chunkTransferrer);
183+
)) {
184+
while (jobState.hasObjects()) {
185+
chunkTransferrer.transferChunks(strategy);
186+
}
170187
}
171188
} catch (final RuntimeException | IOException e) {
172189
throw e;
@@ -175,51 +192,6 @@ public void transfer(final ObjectChannelBuilder channelBuilder)
175192
}
176193
}
177194

178-
private void transferNextChunks(final ChunkTransferrer chunkTransferrer)
179-
throws IOException, InterruptedException {
180-
final GetJobChunksReadyForClientProcessingSpectraS3Response availableJobChunks =
181-
this.client.getJobChunksReadyForClientProcessingSpectraS3(new GetJobChunksReadyForClientProcessingSpectraS3Request(this.masterObjectList.getJobId().toString()));
182-
switch(availableJobChunks.getStatus()) {
183-
case AVAILABLE: {
184-
final MasterObjectList availableMol = availableJobChunks.getMasterObjectListResult();
185-
chunkTransferrer.transferChunks(availableMol.getNodes(), availableMol.getObjects());
186-
retryAfterLeft = retryAfter; // Reset the number of retries to the initial value
187-
break;
188-
}
189-
case RETRYLATER: {
190-
if (retryAfterLeft == 0) {
191-
throw new Ds3NoMoreRetriesException(this.retryAfter);
192-
}
193-
retryAfterLeft--;
194-
final int secondsToDelay = computeDelay(availableJobChunks.getRetryAfterSeconds());
195-
emitWaitingForChunksEvents(secondsToDelay);
196-
Thread.sleep(secondsToDelay * 1000);
197-
break;
198-
}
199-
default:
200-
assert false : "This line of code should be impossible to hit.";
201-
}
202-
}
203-
204-
private int computeDelay(final int retryAfterSeconds) {
205-
if (retryDelay == -1) {
206-
return retryAfterSeconds;
207-
} else {
208-
return retryDelay;
209-
}
210-
}
211-
212-
private void emitWaitingForChunksEvents(final int secondsToRetry) {
213-
for (final WaitingForChunksListener waitingForChunksListener : waitingForChunksListeners) {
214-
eventRunner.emitEvent(new Runnable() {
215-
@Override
216-
public void run() {
217-
waitingForChunksListener.waiting(secondsToRetry);
218-
}
219-
});
220-
}
221-
}
222-
223195
private final class GetObjectTransferrerRetryDecorator implements ItemTransferrer {
224196
private final GetObjectTransferrer getObjectTransferrer;
225197

0 commit comments

Comments
 (0)