Skip to content

Commit b75dbdf

Browse files
authored
Merge pull request #522 from SpectraLogic/3_5_3_a
Merging into v3_5_3
2 parents 2d91316 + 974b766 commit b75dbdf

File tree

11 files changed

+247
-21
lines changed

11 files changed

+247
-21
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ The SDK can also be included directly into a Maven or Gradle build. There is als
4040
<dependency>
4141
<groupId>com.spectralogic.ds3</groupId>
4242
<artifactId>ds3-sdk</artifactId>
43-
<version>3.5.2</version>
43+
<version>3.5.3</version>
4444
<!-- <classifier>all</classifier> -->
4545
</dependency>
4646
...
@@ -63,8 +63,8 @@ repositories {
6363
6464
dependencies {
6565
...
66-
compile 'com.spectralogic.ds3:ds3-sdk:3.5.2'
67-
// compile 'com.spectralogic.ds3:ds3-sdk:3.5.2:all'
66+
compile 'com.spectralogic.ds3:ds3-sdk:3.5.3'
67+
// compile 'com.spectralogic.ds3:ds3-sdk:3.5.3:all'
6868
...
6969
}
7070

SETUP.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ If using Eclipse:
1313
* In the results that are returned install `Gradle Integration for Eclipse (4.4) 3.7.1.RELEASE` or current version. Clicking `Install` will take you to the 'Confirm Selected Features' dialog. (De-select the two optional Spring checkboxes on older versions). `Gradle IDE` and `org.gradle.toolingapi.feature` should both be selected. Accept any dialogs that popup.
1414
* After Gradle has been installed into eclipse and git has been installed, you should be able to clone the repo and import that project into eclipse
1515

16-
If using Intelllij:
16+
If using Intellij:
1717
* Open Intellij and select `Import Project`
1818
* Find the `build.gradle` file contained at the root of the project and select it
1919
* Accept the defaults

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ buildscript {
3131

3232
allprojects {
3333
group = 'com.spectralogic.ds3'
34-
version = '3.5.2'
34+
version = '3.5.3'
3535
}
3636

3737
subprojects {

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/helpers/FileSystemHelper_Test.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.spectralogic.ds3client.helpers;
1717

1818
import com.spectralogic.ds3client.Ds3Client;
19-
import com.spectralogic.ds3client.commands.PutObjectResponse;
2019
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
2120
import com.spectralogic.ds3client.integration.Util;
2221
import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds;

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/GetJobManagement_Test.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.spectralogic.ds3client.helpers.events.FailureEvent;
4242
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
4343
import com.spectralogic.ds3client.helpers.options.ReadJobOptions;
44+
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
4445
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.BlobStrategy;
4546
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.ChunkAttemptRetryBehavior;
4647
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.ChunkAttemptRetryDelayBehavior;
@@ -83,7 +84,9 @@
8384
import org.slf4j.LoggerFactory;
8485

8586
import java.io.BufferedReader;
87+
import java.io.DataOutputStream;
8688
import java.io.File;
89+
import java.io.FileOutputStream;
8790
import java.io.IOException;
8891
import java.io.InputStream;
8992
import java.io.InputStreamReader;
@@ -101,11 +104,15 @@
101104
import java.util.ArrayList;
102105
import java.util.Arrays;
103106
import java.util.Collection;
107+
import java.util.Collections;
104108
import java.util.List;
105109
import java.util.UUID;
110+
import java.util.concurrent.CountDownLatch;
106111
import java.util.concurrent.atomic.AtomicBoolean;
107112
import java.util.concurrent.atomic.AtomicInteger;
113+
import java.util.concurrent.atomic.AtomicLong;
108114

115+
import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES;
109116
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
110117
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
111118
import static com.spectralogic.ds3client.integration.Util.deleteBucketContents;
@@ -1449,4 +1456,68 @@ public void testThatFifoIsNotProcessed() throws IOException, InterruptedExceptio
14491456

14501457
assertTrue(caughtException.get());
14511458
}
1459+
1460+
@Test
1461+
public void testStreamedGetJobWithBlobbedFile() throws Exception {
1462+
final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES;
1463+
final long biggerThanAChunkSize = chunkSize * 2L + 1024;
1464+
1465+
final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4;
1466+
1467+
final String originalFileName = "Gracie.bin";
1468+
final String movedFileName = "Gracie.bak";
1469+
1470+
try {
1471+
final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName));
1472+
1473+
byte[] bytes = new byte[4];
1474+
1475+
for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) {
1476+
bytes[0] = (byte)i;
1477+
bytes[1] = (byte)(i >> 8);
1478+
bytes[2] = (byte)(i >> 16);
1479+
bytes[3] = (byte)(i >> 24);
1480+
originalFileStream.write(bytes);
1481+
}
1482+
1483+
originalFileStream.close();
1484+
1485+
final Ds3Object ds3Object = new Ds3Object();
1486+
ds3Object.setName(originalFileName);
1487+
ds3Object.setSize(biggerThanAChunkSize);
1488+
1489+
final AtomicLong numBytesTransferred = new AtomicLong(0);
1490+
1491+
final WriteJobOptions writeJobOptions = WriteJobOptions.create();
1492+
writeJobOptions.withMaxUploadSize(chunkSize);
1493+
1494+
final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJob(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions);
1495+
writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet);
1496+
1497+
final CountDownLatch writeCountDownLatch = new CountDownLatch(1);
1498+
1499+
writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown());
1500+
1501+
writeJob.transfer(new FileObjectPutter(Paths.get(".")));
1502+
1503+
writeCountDownLatch.await();
1504+
1505+
assertEquals(biggerThanAChunkSize, numBytesTransferred.get());
1506+
1507+
Files.move(Paths.get(originalFileName), Paths.get(movedFileName));
1508+
1509+
final CountDownLatch readCountdownLatch = new CountDownLatch(1);
1510+
1511+
final Ds3ClientHelpers.Job readJob = HELPERS.startReadJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object));
1512+
readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown());
1513+
readJob.transfer(new FileObjectGetter(Paths.get(".")));
1514+
1515+
readCountdownLatch.await();
1516+
1517+
assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName)));
1518+
} finally {
1519+
Files.deleteIfExists(Paths.get(originalFileName));
1520+
Files.deleteIfExists(Paths.get(movedFileName));
1521+
}
1522+
}
14521523
}

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252
import org.slf4j.LoggerFactory;
5353

5454
import java.io.BufferedInputStream;
55+
import java.io.DataOutputStream;
5556
import java.io.File;
57+
import java.io.FileOutputStream;
5658
import java.io.IOException;
5759
import java.io.InputStream;
5860
import java.lang.reflect.InvocationTargetException;
@@ -66,9 +68,12 @@
6668
import java.nio.file.Path;
6769
import java.nio.file.Paths;
6870
import java.util.*;
71+
import java.util.concurrent.CountDownLatch;
6972
import java.util.concurrent.atomic.AtomicBoolean;
7073
import java.util.concurrent.atomic.AtomicInteger;
74+
import java.util.concurrent.atomic.AtomicLong;
7175

76+
import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES;
7277
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
7378
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
7479
import static org.hamcrest.Matchers.*;
@@ -2059,4 +2064,70 @@ public void testThatNonExistentFileDoesNotStopPutJob() throws IOException {
20592064
assertTrue(caughtNoSuchFileException.get());
20602065
assertTrue(getJobRan.get());
20612066
}
2067+
2068+
@Test
2069+
public void testStreamedPutJobWithBlobbedFile() throws Exception {
2070+
final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES;
2071+
final long biggerThanAChunkSize = chunkSize * 2L + 1024;
2072+
2073+
final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4;
2074+
2075+
final String originalFileName = "Gracie.bin";
2076+
final String movedFileName = "Gracie.bak";
2077+
2078+
try {
2079+
final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName));
2080+
2081+
byte[] bytes = new byte[4];
2082+
2083+
for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) {
2084+
bytes[0] = (byte)i;
2085+
bytes[1] = (byte)(i >> 8);
2086+
bytes[2] = (byte)(i >> 16);
2087+
bytes[3] = (byte)(i >> 24);
2088+
originalFileStream.write(bytes);
2089+
}
2090+
2091+
originalFileStream.close();
2092+
2093+
final Ds3Object ds3Object = new Ds3Object();
2094+
ds3Object.setName(originalFileName);
2095+
ds3Object.setSize(biggerThanAChunkSize);
2096+
2097+
final AtomicLong numBytesTransferred = new AtomicLong(0);
2098+
2099+
final WriteJobOptions writeJobOptions = WriteJobOptions.create();
2100+
writeJobOptions.withMaxUploadSize(chunkSize);
2101+
2102+
final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions);
2103+
writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet);
2104+
2105+
final CountDownLatch writeCountDownLatch = new CountDownLatch(1);
2106+
2107+
writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown());
2108+
2109+
writeJob.transfer(new FileObjectPutter(Paths.get(".")));
2110+
2111+
writeCountDownLatch.await();
2112+
2113+
assertEquals(biggerThanAChunkSize, numBytesTransferred.get());
2114+
2115+
Files.move(Paths.get(originalFileName), Paths.get(movedFileName));
2116+
2117+
final CountDownLatch readCountdownLatch = new CountDownLatch(1);
2118+
2119+
final Ds3ClientHelpers.Job readJob = HELPERS.startReadJob(BUCKET_NAME, Collections.singletonList(ds3Object));
2120+
readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown());
2121+
readJob.transfer(new FileObjectGetter(Paths.get(".")));
2122+
2123+
readCountdownLatch.await();
2124+
2125+
assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName)));
2126+
} finally {
2127+
deleteAllContents(client, BUCKET_NAME);
2128+
2129+
Files.deleteIfExists(Paths.get(originalFileName));
2130+
Files.deleteIfExists(Paths.get(movedFileName));
2131+
}
2132+
}
20622133
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/exceptions/ContentLengthNotMatchException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class ContentLengthNotMatchException extends IOException {
2222
private final long contentLength;
2323
private final long totalBytes;
2424
public ContentLengthNotMatchException(final String fileName, final long contentLength, final long totalBytes) {
25-
super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLength, totalBytes));
25+
super(String.format("The Content length for %s (%d) does not match the number of bytes read (%d)", fileName, contentLength, totalBytes));
2626

2727
this.fileName = fileName;
2828
this.contentLength = contentLength;

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.common.base.Function;
1919
import com.google.common.collect.FluentIterable;
2020
import com.spectralogic.ds3client.Ds3Client;
21-
import com.spectralogic.ds3client.commands.PutObjectResponse;
2221
import com.spectralogic.ds3client.helpers.options.ReadJobOptions;
2322
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
2423
import com.spectralogic.ds3client.helpers.pagination.FileSystemKey;

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/channelstrategy/SeekableByteChannelDecorator.java

Lines changed: 83 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,52 +25,124 @@
2525
* An instance of {@link SeekableByteChannel} used to decorate another SeekableByteChannel in the
2626
* situation where we re-use the same channel for more than 1 blob. This subclass prevents closing
2727
* a channel when there are other blobs still referencing the shared channel.
28+
*
29+
* This class positions the content of a blob within the bounds of a channel that may be capable
30+
* of containing more than one blob.
2831
*/
2932
class SeekableByteChannelDecorator implements SeekableByteChannel {
30-
private final SeekableByteChannel seekableByteChannel;
33+
private final Object lock = new Object();
3134

32-
SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel) {
33-
Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null");
35+
private final SeekableByteChannel seekableByteChannel;
36+
private final long blobOffset;
37+
private final long blobLength;
38+
private long nextAvailableByteOffset = 0;
39+
40+
SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel, final long blobOffset, final long blobLength) throws IOException {
41+
Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null.");
42+
Preconditions.checkArgument(blobOffset >= 0, "blobOffset must be >= 0.");
43+
Preconditions.checkArgument(blobLength >= 0, "blobLength must be >= 0.");
3444
this.seekableByteChannel = seekableByteChannel;
45+
this.blobOffset = blobOffset;
46+
this.blobLength = blobLength;
47+
48+
seekableByteChannel.position(blobOffset);
3549
}
3650

37-
protected SeekableByteChannel wrappedSeekableByteChannel() {
51+
SeekableByteChannel wrappedSeekableByteChannel() {
3852
return seekableByteChannel;
3953
}
4054

4155
@Override
4256
public int read(final ByteBuffer dst) throws IOException {
43-
return seekableByteChannel.read(dst);
57+
synchronized (lock) {
58+
final long remainingInWindow = blobLength - nextAvailableByteOffset;
59+
final long numBytesWeCanRead = Math.min(dst.remaining(), remainingInWindow);
60+
61+
if (numBytesWeCanRead <= 0) {
62+
return 0;
63+
}
64+
65+
final int numBytesRead;
66+
67+
if (numBytesWeCanRead != dst.remaining()) {
68+
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanRead]);
69+
numBytesRead = seekableByteChannel.read(byteBuffer);
70+
byteBuffer.flip();
71+
dst.put(byteBuffer);
72+
} else {
73+
numBytesRead = seekableByteChannel.read(dst);
74+
}
75+
76+
nextAvailableByteOffset += numBytesRead;
77+
78+
return numBytesRead;
79+
}
4480
}
4581

4682
@Override
4783
public int write(final ByteBuffer src) throws IOException {
48-
return seekableByteChannel.write(src);
84+
synchronized (lock) {
85+
final long remainingInWindow = blobLength - nextAvailableByteOffset;
86+
final long numBytesWeCanWrite = Math.min(src.remaining(), remainingInWindow);
87+
88+
if (numBytesWeCanWrite <= 0) {
89+
return 0;
90+
}
91+
92+
final int numBytesWritten;
93+
94+
if (numBytesWeCanWrite != src.remaining()) {
95+
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanWrite]);
96+
byteBuffer.put(src);
97+
byteBuffer.flip();
98+
numBytesWritten = seekableByteChannel.write(byteBuffer);
99+
} else {
100+
numBytesWritten = seekableByteChannel.write(src);
101+
}
102+
103+
nextAvailableByteOffset += numBytesWritten;
104+
105+
return numBytesWritten;
106+
}
49107
}
50108

51109
@Override
52110
public long position() throws IOException {
53-
return seekableByteChannel.position();
111+
synchronized (lock) {
112+
return seekableByteChannel.position();
113+
}
54114
}
55115

56116
@Override
57117
public SeekableByteChannel position(final long newPosition) throws IOException {
58-
return seekableByteChannel.position(newPosition);
118+
synchronized (lock) {
119+
final long greatestPossiblePosition = blobLength - 1;
120+
nextAvailableByteOffset = Math.min(newPosition, greatestPossiblePosition);
121+
seekableByteChannel.position(blobOffset + nextAvailableByteOffset);
122+
123+
return this;
124+
}
59125
}
60126

61127
@Override
62128
public long size() throws IOException {
63-
return seekableByteChannel.size();
129+
synchronized (lock) {
130+
return seekableByteChannel.size();
131+
}
64132
}
65133

66134
@Override
67135
public SeekableByteChannel truncate(final long size) throws IOException {
68-
return seekableByteChannel.truncate(size);
136+
synchronized (lock) {
137+
return seekableByteChannel.truncate(size);
138+
}
69139
}
70140

71141
@Override
72142
public boolean isOpen() {
73-
return seekableByteChannel.isOpen();
143+
synchronized (lock) {
144+
return seekableByteChannel.isOpen();
145+
}
74146
}
75147

76148
@Override

0 commit comments

Comments
 (0)