Skip to content

Commit a48cf95

Browse files
authored
Merge pull request #472 from GraciesPadre/fix_live_lock_bug
Fix live lock bug
2 parents ba7cd87 + 929f739 commit a48cf95

File tree

11 files changed

+291
-35
lines changed

11 files changed

+291
-35
lines changed

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,6 @@ public void dataTransferred(final long size) {
730730
}
731731
}
732732

733-
734733
@Test
735734
public void testFiringFailureHandlerWhenGettingChunks()
736735
throws URISyntaxException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpersImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ private Ds3ClientHelpers.Job innerStartReadJob(final List<Ds3Object> objects,
244244

245245
final MasterObjectList masterObjectList = getBulkJobSpectraS3Response.getMasterObjectList();
246246

247-
transferStrategyBuilder.withMasterObjectList(masterObjectList)
247+
transferStrategyBuilder
248+
.withMasterObjectList(masterObjectList)
248249
.withRangesForBlobs(PartialObjectHelpers.mapRangesToBlob(masterObjectList.getObjects(), partialRanges));
249250

250251
return new ReadJobImpl(transferStrategyBuilder);

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobPart.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@
2323
*/
2424
public class JobPart {
2525
private final Ds3Client client;
26-
private final BulkObject bulkObject;
26+
private final BulkObject blob;
2727

28-
public JobPart(final Ds3Client client, final BulkObject bulkObject) {
28+
public JobPart(final Ds3Client client, final BulkObject blob) {
2929
this.client = client;
30-
this.bulkObject = bulkObject;
30+
this.blob = blob;
3131
}
3232

3333
public Ds3Client getClient() {
3434
return client;
3535
}
3636

37-
public BulkObject getBulkObject() {
38-
return bulkObject;
37+
public BulkObject getBlob() {
38+
return blob;
3939
}
4040

4141
@Override
4242
public String toString() {
4343
return "JobPart{" +
4444
"client=" + client +
45-
", bulkObject=" + bulkObject +
45+
", blob=" + blob +
4646
'}';
4747
}
4848
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobState.java

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,113 @@
1515

1616
package com.spectralogic.ds3client.helpers;
1717

18+
import com.google.common.collect.Sets;
1819
import com.spectralogic.ds3client.models.BulkObject;
1920
import com.spectralogic.ds3client.models.Objects;
21+
import com.spectralogic.ds3client.models.PhysicalPlacement;
2022

2123
import java.util.Collection;
24+
import java.util.Set;
25+
import java.util.UUID;
2226

2327
public class JobState {
24-
final int numBlobsToTransfer;
28+
private final Set<BlobIdentityDecorator> blobs = Sets.newConcurrentHashSet();
29+
30+
private final int numBlobsInJob;
2531

2632
public JobState(final Collection<Objects> chunksThatContainBlobs) {
2733
int numBlobsInChunks = 0;
2834

2935
for (final Objects chunk : chunksThatContainBlobs) {
30-
for (final BulkObject ignored : chunk.getObjects()) {
36+
for (final BulkObject blob : chunk.getObjects()) {
37+
blobs.add(new BlobIdentityDecorator(blob));
3138
++numBlobsInChunks;
3239
}
3340
}
3441

35-
numBlobsToTransfer = numBlobsInChunks;
42+
numBlobsInJob = numBlobsInChunks;
43+
}
44+
45+
public boolean blobTransferredOrFailed(final BulkObject blob) {
46+
return blobs.remove(new BlobIdentityDecorator(blob));
47+
}
48+
49+
public boolean contains(final BulkObject blob) {
50+
return blobs.contains(new BlobIdentityDecorator(blob));
51+
}
52+
53+
public int numBlobsInJob() {
54+
return numBlobsInJob;
3655
}
3756

38-
public int numBlobsToTransfer() {
39-
return numBlobsToTransfer;
57+
/**
58+
* This class is used to know whether we have attempted to transfer a blob. In getting the master object list from
59+
* a call to either getBulkJobSpectraS3 or putBulkJobSpectraS3, the resulting blobs will have their inCache property
60+
* marked false. In the master object list returned from a call to either allocateChunk or
61+
* getJobChunksReadyForClientProcessingSpectraS3, the resulting blobs may or may not have the same value for
62+
* their inCache property. Since the BlackPearl may continually send us a blob that will always fail to transfer,
63+
* we check to see if a blob in the master object list we get during a transfer is in the master object list we
64+
* get when initiating a job. If not, we can assume that we have previously tried and failed to transfer that
65+
* blob and skip it.
66+
*/
67+
private static class BlobIdentityDecorator {
68+
private final String bucket;
69+
70+
private final UUID id;
71+
72+
private final boolean latest;
73+
74+
private final long length;
75+
76+
private final String name;
77+
78+
private final long offset;
79+
80+
private final PhysicalPlacement physicalPlacement;
81+
82+
private final long version;
83+
84+
private BlobIdentityDecorator(final BulkObject blob) {
85+
this.bucket = blob.getBucket();
86+
this.id = blob.getId();
87+
this.latest = blob.getLatest();
88+
this.length = blob.getLength();
89+
this.name = blob.getName();
90+
this.offset = blob.getOffset();
91+
this.physicalPlacement = blob.getPhysicalPlacement();
92+
this.version = blob.getVersion();
93+
}
94+
95+
@Override
96+
public boolean equals(final Object o) {
97+
if (this == o) return true;
98+
if (!(o instanceof BlobIdentityDecorator)) return false;
99+
100+
final BlobIdentityDecorator that = (BlobIdentityDecorator) o;
101+
102+
if (latest != that.latest) return false;
103+
if (length != that.length) return false;
104+
if (offset != that.offset) return false;
105+
if (version != that.version) return false;
106+
if (bucket != null ? !bucket.equals(that.bucket) : that.bucket != null) return false;
107+
if (id != null ? !id.equals(that.id) : that.id != null) return false;
108+
if (name != null ? !name.equals(that.name) : that.name != null) return false;
109+
return physicalPlacement != null ? physicalPlacement.equals(that.physicalPlacement) : that.physicalPlacement == null;
110+
}
111+
112+
@Override
113+
public int hashCode() {
114+
return java.util.Objects.hash(bucket, id, latest, length, name, offset, physicalPlacement, version);
115+
}
116+
117+
@Override
118+
public String toString() {
119+
return "BlobIdentityDecorator{" +
120+
"bucket='" + bucket + '\'' +
121+
", length=" + length +
122+
", name='" + name + '\'' +
123+
", offset=" + offset +
124+
'}';
125+
}
40126
}
41127
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/blobstrategy/GetSequentialBlobStrategy.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
* A subclass of {@link BlobStrategy} used in get transfers.
3636
*/
3737
public class GetSequentialBlobStrategy extends AbstractBlobStrategy {
38-
private static final Logger LOG = LoggerFactory.getLogger(GetSequentialBlobStrategy.class);
39-
4038
public GetSequentialBlobStrategy(final Ds3Client client,
4139
final MasterObjectList masterObjectList,
4240
final EventDispatcher eventDispatcher,

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/AbstractTransferStrategy.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package com.spectralogic.ds3client.helpers.strategy.transferstrategy;
1717

18+
import com.google.common.base.Predicate;
19+
import com.google.common.collect.FluentIterable;
1820
import com.google.common.collect.Iterables;
1921
import com.google.common.util.concurrent.ListeningExecutorService;
2022
import com.spectralogic.ds3client.Ds3Client;
@@ -29,14 +31,18 @@
2931
import com.spectralogic.ds3client.models.Objects;
3032

3133
import java.io.IOException;
34+
import java.util.NoSuchElementException;
3235
import java.util.concurrent.Callable;
3336
import java.util.concurrent.CountDownLatch;
3437
import java.util.concurrent.atomic.AtomicInteger;
3538
import java.util.concurrent.atomic.AtomicReference;
3639

40+
import com.spectralogic.ds3client.networking.FailedRequestException;
3741
import org.slf4j.Logger;
3842
import org.slf4j.LoggerFactory;
3943

44+
import javax.annotation.Nullable;
45+
4046
/**
4147
* An implementation of {@link TransferStrategy} that provides a default implementation {@code transfer}
4248
* implementation.
@@ -45,11 +51,11 @@ abstract class AbstractTransferStrategy implements TransferStrategy {
4551
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransferStrategy.class);
4652

4753
private final BlobStrategy blobStrategy;
54+
private final JobState jobState;
4855
private final ListeningExecutorService executorService;
4956
private final EventDispatcher eventDispatcher;
5057
private final MasterObjectList masterObjectList;
5158
private final FailureEvent.FailureActivity failureActivity;
52-
private final AtomicInteger totalNumBlobsToTransfer;
5359

5460
private final AtomicReference<IOException> cachedException = new AtomicReference<>();
5561

@@ -77,7 +83,7 @@ public AbstractTransferStrategy(final BlobStrategy blobStrategy,
7783
final FailureEvent.FailureActivity failureActivity)
7884
{
7985
this.blobStrategy = blobStrategy;
80-
totalNumBlobsToTransfer = new AtomicInteger(jobState.numBlobsToTransfer());
86+
this.jobState = jobState;
8187
this.executorService = executorService;
8288
this.eventDispatcher = eventDispatcher;
8389
this.masterObjectList = masterObjectList;
@@ -103,20 +109,27 @@ public AbstractTransferStrategy withTransferMethod(final TransferMethod transfer
103109
public void transfer() throws IOException {
104110
cachedException.set(null);
105111

106-
while (totalNumBlobsToTransfer.get() > 0 && ! Thread.currentThread().isInterrupted()) {
112+
final AtomicInteger numBlobsRemaining = new AtomicInteger(jobState.numBlobsInJob());
113+
114+
while ( ! Thread.currentThread().isInterrupted() && numBlobsRemaining.get() > 0) {
107115
try {
108-
final Iterable<JobPart> jobParts = blobStrategy.getWork();
109-
final CountDownLatch countDownLatch = new CountDownLatch(Iterables.size(jobParts));
110-
transferJobParts(jobParts, countDownLatch);
116+
final Iterable<JobPart> jobParts = jobPartsNotAlreadyTransferred();
117+
118+
final int numJobParts = Iterables.size(jobParts);
119+
120+
if (numJobParts <= 0) {
121+
break;
122+
}
123+
124+
final CountDownLatch countDownLatch = new CountDownLatch(numJobParts);
125+
transferJobParts(jobParts, countDownLatch, numBlobsRemaining);
111126
countDownLatch.await();
112-
} catch (final Ds3NoMoreRetriesException e) {
127+
} catch (final Ds3NoMoreRetriesException | FailedRequestException e) {
113128
emitFailureEvent(makeFailureEvent(failureActivity, e, firstChunk()));
114129
throw e;
115-
} catch (final InterruptedException e) {
116-
LOG.info("Error getting entries from work queue.", e);
130+
} catch (final InterruptedException | NoSuchElementException e) {
117131
Thread.currentThread().interrupt();
118-
} catch (final Throwable t) {
119-
totalNumBlobsToTransfer.decrementAndGet();
132+
} catch (final Throwable t) {
120133
emitFailureAndSetCachedException(t);
121134
}
122135
}
@@ -126,6 +139,17 @@ public void transfer() throws IOException {
126139
}
127140
}
128141

142+
private Iterable<JobPart> jobPartsNotAlreadyTransferred() throws IOException, InterruptedException {
143+
return FluentIterable
144+
.from(blobStrategy.getWork())
145+
.filter(new Predicate<JobPart>() {
146+
@Override
147+
public boolean apply(@Nullable final JobPart jobPart) {
148+
return jobState.contains(jobPart.getBlob());
149+
}
150+
});
151+
}
152+
129153
private void emitFailureAndSetCachedException(final Throwable t) {
130154
emitFailureEvent(makeFailureEvent(failureActivity, t, firstChunk()));
131155
maybeSetCachedException(t);
@@ -141,7 +165,9 @@ private synchronized void maybeSetCachedException(final Throwable t) {
141165
}
142166
}
143167

144-
private void transferJobParts(final Iterable<JobPart> jobParts, final CountDownLatch countDownLatch) throws IOException {
168+
private void transferJobParts(final Iterable<JobPart> jobParts,
169+
final CountDownLatch countDownLatch,
170+
final AtomicInteger numBlobsTransferred) throws IOException {
145171
for (final JobPart jobPart : jobParts) {
146172
executorService.submit(new Callable<Void>() {
147173
@Override
@@ -155,7 +181,8 @@ public Void call() throws Exception {
155181
emitFailureAndSetCachedException(e);
156182
throw new RuntimeException(e);
157183
} finally {
158-
totalNumBlobsToTransfer.decrementAndGet();
184+
jobState.blobTransferredOrFailed(jobPart.getBlob());
185+
numBlobsTransferred.decrementAndGet();
159186
countDownLatch.countDown();
160187
return null;
161188
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobNetworkFailureRetryDecorator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private TransferState makeInitialTransferState() {
9696
private synchronized void makeTransferMethodForPartialRetry(final JobPart jobPart, final long numBytesTransferred) {
9797
final TransferState existingTransferState = transferMethodMap.get(jobPart);
9898

99-
ImmutableCollection<Range> ranges = initializeRanges(jobPart.getBulkObject(), existingTransferState);
99+
ImmutableCollection<Range> ranges = initializeRanges(jobPart.getBlob(), existingTransferState);
100100
final Long numBytesToTransfer = initializeNumBytesToTransfer(existingTransferState, ranges);
101101
ranges = updateRanges(ranges, numBytesTransferred, numBytesToTransfer);
102102
final long destinationChannelOffset = existingTransferState.getDestinationChannelOffset() + numBytesTransferred;

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobPartialBlobTransferMethod.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public GetJobPartialBlobTransferMethod(final ChannelStrategy channelStrategy,
6565
*/
6666
@Override
6767
public void transferJobPart(final JobPart jobPart) throws IOException {
68-
final BulkObject blob = jobPart.getBulkObject();
68+
final BulkObject blob = jobPart.getBlob();
6969

7070
final SeekableByteChannel seekableByteChannel = channelStrategy.acquireChannelForBlob(blob);
7171

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/GetJobTransferMethod.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ public GetJobTransferMethod(final ChannelStrategy channelStrategy,
6161
*/
6262
@Override
6363
public void transferJobPart(final JobPart jobPart) throws IOException {
64-
final SeekableByteChannel seekableByteChannel = channelStrategy.acquireChannelForBlob(jobPart.getBulkObject());
64+
final SeekableByteChannel seekableByteChannel = channelStrategy.acquireChannelForBlob(jobPart.getBlob());
6565

6666
final GetObjectResponse getObjectResponse = jobPart.getClient().getObject(makeGetObjectRequest(seekableByteChannel, jobPart));
6767

68-
final BulkObject blob = jobPart.getBulkObject();
68+
final BulkObject blob = jobPart.getBlob();
6969

7070
channelStrategy.releaseChannelForBlob(seekableByteChannel, blob);
7171

@@ -77,7 +77,7 @@ public void transferJobPart(final JobPart jobPart) throws IOException {
7777
}
7878

7979
private GetObjectRequest makeGetObjectRequest(final SeekableByteChannel seekableByteChannel, final JobPart jobPart) {
80-
final BulkObject blob = jobPart.getBulkObject();
80+
final BulkObject blob = jobPart.getBlob();
8181

8282
final GetObjectRequest getObjectRequest = new GetObjectRequest(
8383
bucketName,

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/PutJobTransferMethod.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public PutJobTransferMethod(final ChannelStrategy channelStrategy,
7171
*/
7272
@Override
7373
public void transferJobPart(final JobPart jobPart) throws IOException {
74-
final BulkObject blob = jobPart.getBulkObject();
74+
final BulkObject blob = jobPart.getBlob();
7575

7676
final SeekableByteChannel seekableByteChannel = channelStrategy.acquireChannelForBlob(blob);
7777

@@ -84,7 +84,7 @@ public void transferJobPart(final JobPart jobPart) throws IOException {
8484
}
8585

8686
private PutObjectRequest makePutObjectRequest(final SeekableByteChannel seekableByteChannel, final JobPart jobPart) {
87-
final BulkObject blob = jobPart.getBulkObject();
87+
final BulkObject blob = jobPart.getBlob();
8888

8989
final PutObjectRequest putObjectRequest = new PutObjectRequest(
9090
bucketName,

0 commit comments

Comments
 (0)