Skip to content

Commit 76ac12f

Browse files
scriberpmoore
authored andcommitted
Streaming Fix - JSDK-298 (#579)
* Fixed * Oops * Imports * Imports * Imports
1 parent 5f2b48a commit 76ac12f

File tree

10 files changed

+107
-60
lines changed

10 files changed

+107
-60
lines changed

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.spectralogic.ds3client.commands.spectrads3.notifications.*;
2828
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
2929
import com.spectralogic.ds3client.helpers.*;
30+
import com.spectralogic.ds3client.helpers.channelbuilders.ObjectInputStreamBuilder;
3031
import com.spectralogic.ds3client.helpers.events.FailureEvent;
3132
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
3233
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
@@ -57,6 +58,7 @@
5758
import java.io.BufferedInputStream;
5859
import java.io.DataOutputStream;
5960
import java.io.File;
61+
import java.io.FileInputStream;
6062
import java.io.FileOutputStream;
6163
import java.io.IOException;
6264
import java.io.InputStream;
@@ -1859,6 +1861,51 @@ public void testPutJobUsingStreamedTransferStrategy() throws IOException, URISyn
18591861
}
18601862
}
18611863

1864+
public class StreamObjectPutter extends ObjectInputStreamBuilder {
1865+
final InputStream _is;
1866+
1867+
public StreamObjectPutter(final InputStream is) {
1868+
_is = is;
1869+
}
1870+
1871+
@Override
1872+
public InputStream buildInputStream(final String key) {
1873+
return _is;
1874+
}
1875+
}
1876+
1877+
@Test
1878+
public void testPutJobUsingStreamedTransferStrategyAboveChunk() throws IOException, URISyntaxException {
1879+
final String DIR_NAME = "books/";
1880+
final String[] FILE_NAMES = new String[]{"Paw-3.1.9.zip"};
1881+
1882+
try {
1883+
final Path dirPath = ResourceUtils.loadFileResource(DIR_NAME);
1884+
1885+
final List<Ds3Object> objectsToWrite = new ArrayList<>();
1886+
for (final String book : FILE_NAMES) {
1887+
final Path objPath = ResourceUtils.loadFileResource(DIR_NAME + book);
1888+
final long bookSize = Files.size(objPath);
1889+
final Ds3Object obj = new Ds3Object(book, bookSize);
1890+
1891+
objectsToWrite.add(obj);
1892+
}
1893+
1894+
final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, objectsToWrite, WriteJobOptions.create());
1895+
writeJob.transfer(new StreamObjectPutter(new FileInputStream(dirPath.resolve("Paw-3.1.9.zip").toFile())));
1896+
1897+
final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(client);
1898+
final Iterable<Contents> bucketContentsIterable = ds3ClientHelpers.listObjects(BUCKET_NAME);
1899+
1900+
for (final Contents bucketContents : bucketContentsIterable) {
1901+
assertEquals(FILE_NAMES[0], bucketContents.getKey());
1902+
}
1903+
} finally {
1904+
cancelAllJobsForBucket(client, BUCKET_NAME);
1905+
deleteAllContents(client, BUCKET_NAME);
1906+
}
1907+
}
1908+
18621909
@Test
18631910
public void testPutJobUsingRandomAccessTransferStrategy() throws IOException, URISyntaxException {
18641911
final String DIR_NAME = "books/";
14 MB
Binary file not shown.

ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3InputStreamEntity.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,14 @@ public void setBufferSize(final int bufferSize) {
3939
this.bufferSize = bufferSize;
4040
}
4141

42-
public long getBufferSize() {
43-
return bufferSize;
44-
}
45-
4642
@Override
4743
public void writeTo(final OutputStream outStream) throws IOException {
4844
final long startTime = PerformanceUtils.getCurrentTime();
4945
final long totalBytes = IOUtils.copy(this.getContent(), outStream, bufferSize, path, true);
5046
final long endTime = PerformanceUtils.getCurrentTime();
51-
52-
if (this.getContentLength() != -1 && totalBytes != this.getContentLength()) {
53-
throw new ContentLengthNotMatchException(path, this.getContentLength(), totalBytes);
47+
final long length = this.getContentLength();
48+
if (length != -1 && totalBytes != length) {
49+
throw new ContentLengthNotMatchException(path, length, totalBytes);
5450
}
5551

5652
PerformanceUtils.logMbps(startTime, endTime, totalBytes, path, true);

ds3-sdk/src/main/java/com/spectralogic/ds3client/commands/PutMultiPartUploadPartRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.spectralogic.ds3client.networking.HttpVerb;
2020
import com.spectralogic.ds3client.commands.interfaces.AbstractRequest;
2121
import java.util.UUID;
22-
import com.google.common.net.UrlEscapers;
2322
import javax.annotation.Nonnull;
2423
import com.google.common.base.Preconditions;
2524
import com.spectralogic.ds3client.utils.SeekableByteChannelInputStream;
@@ -63,7 +62,7 @@ public PutMultiPartUploadPartRequest(final String bucketName, final String objec
6362
this.stream = new SeekableByteChannelInputStream(channel);
6463
}
6564

66-
65+
6766
public PutMultiPartUploadPartRequest(final String bucketName, final String objectName, @Nonnull final SeekableByteChannel channel, final int partNumber, final long size, final String uploadId) {
6867
Preconditions.checkNotNull(channel, "Channel may not be null.");
6968
this.bucketName = bucketName;

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/channelbuilders/ReadOnlySeekableByteChannel.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
public class ReadOnlySeekableByteChannel implements SeekableByteChannel {
2626

2727
private final ReadableByteChannel channel;
28-
private int size = 0;
29-
private int position = 0;
28+
private long size = 0;
29+
private long position = 0;
3030

3131
public ReadOnlySeekableByteChannel(final ReadableByteChannel channel) {
3232
this.channel = channel;
@@ -40,7 +40,7 @@ public ReadableByteChannel getChannel() {
4040
public int read(final ByteBuffer dst) throws IOException {
4141
size = channel.read(dst);
4242
position += size;
43-
return size;
43+
return Long.valueOf(size).intValue();
4444
}
4545

4646
@Override
@@ -55,6 +55,9 @@ public long position() throws IOException {
5555

5656
@Override
5757
public SeekableByteChannel position(final long newPosition) throws IOException {
58+
if (position == 0 || newPosition == 0) {
59+
this.position = newPosition;
60+
}
5861
if (newPosition == this.position) {
5962
return this;
6063
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ class SeekableByteChannelDecorator implements SeekableByteChannel {
4848
seekableByteChannel.position(blobOffset);
4949
}
5050

51-
SeekableByteChannel wrappedSeekableByteChannel() {
51+
public SeekableByteChannel wrappedSeekableByteChannel() {
5252
return seekableByteChannel;
5353
}
5454

55+
5556
@Override
5657
public int read(final ByteBuffer dst) throws IOException {
5758
synchronized (lock) {

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SequentialChannelStrategy.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import com.google.common.collect.HashMultimap;
1919
import com.google.common.collect.SetMultimap;
2020
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers;
21+
import com.spectralogic.ds3client.helpers.channelbuilders.ReadOnlySeekableByteChannel;
2122
import com.spectralogic.ds3client.models.BulkObject;
23+
import com.spectralogic.ds3client.models.JobRequestType;
24+
import com.spectralogic.ds3client.models.MasterObjectList;
2225

2326
import java.io.IOException;
2427
import java.nio.channels.SeekableByteChannel;
@@ -40,6 +43,7 @@ public class SequentialChannelStrategy implements ChannelStrategy {
4043
private final ChannelStrategy channelStrategyDelegate;
4144
private final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder;
4245
private final ChannelPreparable channelPreparer;
46+
private final MasterObjectList masterObjectList;
4347

4448
/**
4549
* @param channelStrategy The instance of {@link ChannelStrategy} that holds the 1 channel reference a blob needs
@@ -51,12 +55,14 @@ public class SequentialChannelStrategy implements ChannelStrategy {
5155
* either {@link TruncatingChannelPreparable} or {@link NullChannelPreparable}.
5256
*/
5357
public SequentialChannelStrategy(final ChannelStrategy channelStrategy,
54-
final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder,
55-
final ChannelPreparable channelPreparer)
58+
final Ds3ClientHelpers.ObjectChannelBuilder objectChannelBuilder,
59+
final ChannelPreparable channelPreparer,
60+
final MasterObjectList masterObjectList)
5661
{
5762
channelStrategyDelegate = channelStrategy;
5863
this.objectChannelBuilder = objectChannelBuilder;
5964
this.channelPreparer = channelPreparer;
65+
this.masterObjectList = masterObjectList;
6066
}
6167

6268
/**
@@ -109,10 +115,19 @@ public void releaseChannelForBlob(final SeekableByteChannel seekableByteChannel,
109115

110116
blobNameOffsetMap.remove(blobName, blob.getOffset());
111117

112-
if (blobNameOffsetMap.get(blobName).size() == 0) {
118+
final Long maximumOffset = masterObjectList.getObjects().stream()
119+
.flatMap(objects -> objects.getObjects().stream())
120+
.filter(bulkObject -> bulkObject.getName().equals(blobName))
121+
.map(bulkObject -> bulkObject.getOffset())
122+
.max(Long::compareTo).orElseGet(() -> blob.getOffset());
123+
124+
final boolean isReadOnly = ((SeekableByteChannelDecorator) seekableByteChannel).wrappedSeekableByteChannel() instanceof ReadOnlySeekableByteChannel;
125+
126+
if (blobNameOffsetMap.get(blobName).size() == 0 && (blob.getOffset() == maximumOffset || !(isReadOnly))) {
113127
blobNameChannelMap.remove(blobName);
114128
channelStrategyDelegate.releaseChannelForBlob(((SeekableByteChannelDecorator)seekableByteChannel).wrappedSeekableByteChannel(), blob);
115129
}
130+
116131
}
117132
}
118133
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/TransferStrategyBuilder.java

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -475,34 +475,21 @@ private TransferStrategy makeStreamingPutTransferStrategy() {
475475
getOrMakeTransferRetryDecorator();
476476

477477
return makeTransferStrategy(
478-
new BlobStrategyMaker() {
479-
@Override
480-
public BlobStrategy makeBlobStrategy(final Ds3Client client,
481-
final MasterObjectList masterObjectList,
482-
final EventDispatcher eventDispatcher)
483-
{
484-
return new PutSequentialBlobStrategy(ds3Client,
485-
masterObjectList,
486-
eventDispatcher,
487-
getOrMakeChunkAttemptRetryBehavior(),
488-
getOrMakeChunkAllocationRetryDelayBehavior()
489-
);
490-
}
491-
},
492-
new TransferMethodMaker() {
493-
@Override
494-
public TransferMethod makeTransferMethod() {
495-
return makePutTransferMethod();
496-
}
497-
});
478+
(client, masterObjectList, eventDispatcher) -> new PutSequentialBlobStrategy(ds3Client,
479+
masterObjectList,
480+
eventDispatcher,
481+
getOrMakeChunkAttemptRetryBehavior(),
482+
getOrMakeChunkAllocationRetryDelayBehavior()
483+
),
484+
this::makePutTransferMethod);
498485
}
499486

500487
private void maybeMakeStreamedPutChannelStrategy() {
501488
if (channelStrategy == null) {
502489
Preconditions.checkNotNull(channelBuilder, "channelBuilder my not be null");
503490

504491
channelStrategy = new SequentialChannelStrategy(new SequentialFileReaderChannelStrategy(channelBuilder),
505-
channelBuilder, new NullChannelPreparable());
492+
channelBuilder, new NullChannelPreparable(), masterObjectList);
506493
}
507494
}
508495

@@ -823,30 +810,20 @@ private TransferStrategy makeStreamingGetTransferStrategy() {
823810
getOrMakeTransferRetryDecorator();
824811

825812
return makeTransferStrategy(
826-
new BlobStrategyMaker() {
827-
@Override
828-
public BlobStrategy makeBlobStrategy(final Ds3Client client, final MasterObjectList masterObjectList, final EventDispatcher eventDispatcher) {
829-
return new GetSequentialBlobStrategy(ds3Client,
830-
masterObjectList,
831-
eventDispatcher,
832-
getOrMakeChunkAttemptRetryBehavior(),
833-
getOrMakeChunkAllocationRetryDelayBehavior());
834-
}
835-
},
836-
new TransferMethodMaker() {
837-
@Override
838-
public TransferMethod makeTransferMethod() {
839-
return makeGetTransferMethod();
840-
}
841-
});
813+
(client, masterObjectList, eventDispatcher) -> new GetSequentialBlobStrategy(ds3Client,
814+
masterObjectList,
815+
eventDispatcher,
816+
getOrMakeChunkAttemptRetryBehavior(),
817+
getOrMakeChunkAllocationRetryDelayBehavior()),
818+
this::makeGetTransferMethod);
842819
}
843820

844821
private void maybeMakeSequentialGetChannelStrategy() {
845822
if (channelStrategy == null) {
846823
Preconditions.checkNotNull(channelBuilder, "channelBuilder my not be null");
847824

848825
channelStrategy = new SequentialChannelStrategy(new SequentialFileWriterChannelStrategy(channelBuilder),
849-
channelBuilder, new TruncatingChannelPreparable());
826+
channelBuilder, new TruncatingChannelPreparable(), masterObjectList);
850827
}
851828
}
852829

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.spectralogic.ds3client.utils
2+
3+
import com.spectralogic.ds3client.helpers.channelbuilders.ReadOnlySeekableByteChannel
4+
5+
class ReadOnlySeekableByteChannelInputStream(
6+
readOnlySeekableByteChannel: ReadOnlySeekableByteChannel
7+
) : SeekableByteChannelInputStream(readOnlySeekableByteChannel) {
8+
override fun reset(): Unit = Unit
9+
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/SeekableByteChannelInputStream.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public int read() throws IOException {
3939
return -1;
4040
}
4141
}
42-
42+
4343
@Override
4444
public int read(final byte[] b, final int off, final int len) throws IOException {
4545
final ByteBuffer buffer = ByteBuffer.wrap(b);
@@ -52,18 +52,18 @@ public int read(final byte[] b, final int off, final int len) throws IOException
5252
return -1;
5353
}
5454
}
55-
55+
5656
@Override
5757
public long skip(final long n) throws IOException {
5858
this.seekableByteChannel.position(this.seekableByteChannel.position() + n);
5959
return this.seekableByteChannel.position();
6060
}
61-
61+
6262
@Override
6363
public boolean markSupported() {
6464
return true;
6565
}
66-
66+
6767
@Override
6868
public void mark(final int readlimit) {
6969
try {
@@ -72,13 +72,13 @@ public void mark(final int readlimit) {
7272
throw new RuntimeException(e);
7373
}
7474
}
75-
75+
7676
@Override
7777
public void reset() throws IOException {
7878
this.seekableByteChannel.position(this.markPosition);
7979
this.markPosition = 0;
8080
}
81-
81+
8282
@Override
8383
public void close() throws IOException {
8484
this.seekableByteChannel.close();

0 commit comments

Comments
 (0)