Skip to content

Commit dc59ff7

Browse files
authored
Merge pull request #540 from GraciesPadre/memory
Fix a thread leak when Job.start<Read | Write>Job is called in a loop.
2 parents 7adf24b + 2cd7174 commit dc59ff7

File tree

5 files changed

+250
-40
lines changed

5 files changed

+250
-40
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* ******************************************************************************
3+
* Copyright 2014-2017 Spectra Logic Corporation. All Rights Reserved.
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
5+
* this file except in compliance with the License. A copy of the License is located at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* or in the "license" file accompanying this file.
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
* ****************************************************************************
14+
*/
15+
16+
package com.spectralogic.ds3client.helpers.channelbuilders;
17+
18+
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers;
19+
20+
import java.nio.ByteBuffer;
21+
import java.nio.channels.SeekableByteChannel;
22+
23+
public class RepeatStringObjectChannelBuilder implements Ds3ClientHelpers.ObjectChannelBuilder {
24+
private final int bufferSize;
25+
private final long sizeOfFiles;
26+
private String inputDataHeader;
27+
28+
public RepeatStringObjectChannelBuilder(final String inputDataHeader, final int bufferSize, final long sizeOfFile) {
29+
this.bufferSize = bufferSize;
30+
this.sizeOfFiles = sizeOfFile;
31+
this.inputDataHeader = inputDataHeader;
32+
}
33+
34+
@Override
35+
public SeekableByteChannel buildChannel(final String key) {
36+
return new RepeatStringObjectChannelBuilder.RepeatStringByteChannel(inputDataHeader + key, this.bufferSize, this.sizeOfFiles);
37+
}
38+
39+
private static class RepeatStringByteChannel implements SeekableByteChannel {
40+
final private byte[] backingArray;
41+
final private int bufferSize;
42+
final private long limit;
43+
private boolean isOpen;
44+
private int position;
45+
46+
RepeatStringByteChannel(final String inputData, final int bufferSize, final long size) {
47+
this.bufferSize = bufferSize;
48+
final byte[] bytes = new byte[bufferSize];
49+
final byte[] stringBytes = inputData.getBytes();
50+
int stringPosition = 0;
51+
for (int i = 0; i < bufferSize; i++) {
52+
bytes[i] = stringBytes[stringPosition];
53+
if (++stringPosition >= inputData.length()) {
54+
stringPosition = 0;
55+
}
56+
}
57+
58+
backingArray = bytes;
59+
60+
this.position = 0;
61+
this.limit = size;
62+
this.isOpen = true;
63+
}
64+
65+
public void close() {
66+
this.isOpen = false;
67+
}
68+
69+
public boolean isOpen() {
70+
return this.isOpen;
71+
}
72+
73+
public long position() {
74+
return (long) this.position;
75+
}
76+
77+
public SeekableByteChannel position(final long newPosition) {
78+
this.position = (int) newPosition;
79+
return this;
80+
}
81+
82+
public int read(final ByteBuffer dst) {
83+
final int amountToRead = Math.min(dst.remaining(), this.bufferSize);
84+
dst.put(this.backingArray, 0, amountToRead);
85+
return amountToRead;
86+
}
87+
88+
public long size() {
89+
return this.limit;
90+
}
91+
92+
public SeekableByteChannel truncate(final long size) {
93+
return this;
94+
}
95+
96+
public int write(final ByteBuffer src) {
97+
final int amountToWrite = Math.min(src.remaining(), this.bufferSize);
98+
src.get(this.backingArray, 0, amountToWrite);
99+
return amountToWrite;
100+
}
101+
}
102+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* ******************************************************************************
3+
* Copyright 2014-2017 Spectra Logic Corporation. All Rights Reserved.
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
5+
* this file except in compliance with the License. A copy of the License is located at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* or in the "license" file accompanying this file.
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
* ****************************************************************************
14+
*/
15+
16+
package com.spectralogic.ds3client.integration
17+
18+
import com.spectralogic.ds3client.commands.spectrads3.PutBucketSpectraS3Request
19+
import com.spectralogic.ds3client.helpers.DeleteBucket
20+
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers
21+
import com.spectralogic.ds3client.helpers.channelbuilders.RepeatStringObjectChannelBuilder
22+
import com.spectralogic.ds3client.integration.test.helpers.TempStorageIds
23+
import com.spectralogic.ds3client.integration.test.helpers.TempStorageUtil
24+
import com.spectralogic.ds3client.models.ChecksumType
25+
import com.spectralogic.ds3client.models.bulk.Ds3Object
26+
import org.junit.AfterClass
27+
import org.junit.BeforeClass
28+
import org.junit.Test
29+
import java.io.IOException
30+
import org.hamcrest.MatcherAssert.assertThat
31+
import org.hamcrest.Matchers.lessThan
32+
33+
class ResourceTest {
34+
private companion object {
35+
private val numBuckets = 100
36+
private val numObjects = 1024
37+
private val numThreads = numObjects
38+
private val objectSize = 1024 * 10
39+
private val baseBucketName = "putLotsaBuckets"
40+
private val baseFileName = "file"
41+
private val objectData = "Gracie"
42+
private val dataBufferSize = 1024
43+
private val ds3Client = Util.fromEnv()
44+
private val policyPrefix = ResourceTest::class.java.simpleName
45+
private var envStorageIds : TempStorageIds? = null
46+
47+
@BeforeClass
48+
@JvmStatic
49+
@Throws(IOException::class)
50+
fun startup() {
51+
val envDataPolicyId = TempStorageUtil.setupDataPolicy(policyPrefix, false, ChecksumType.Type.MD5, ds3Client)
52+
envStorageIds = TempStorageUtil.setup(policyPrefix, envDataPolicyId, ds3Client)
53+
}
54+
55+
@AfterClass
56+
@JvmStatic
57+
@Throws(IOException::class)
58+
fun teardown() {
59+
TempStorageUtil.teardown(policyPrefix, envStorageIds, ds3Client)
60+
ds3Client.close()
61+
}
62+
}
63+
64+
@Test
65+
fun putLotsaBuckets() {
66+
val bucketNames = generateBucketNames()
67+
68+
try {
69+
createBuckets(bucketNames)
70+
populateBuckets(bucketNames)
71+
assertThat(Thread.activeCount(), lessThan(numThreads * 2))
72+
} finally {
73+
deleteBuckets(bucketNames)
74+
}
75+
}
76+
77+
private fun generateBucketNames() : List<String> {
78+
return (1 .. numBuckets)
79+
.map { index -> baseBucketName + index }
80+
.toList()
81+
}
82+
83+
private fun createBuckets(bucketNames: List<String>) {
84+
bucketNames.forEach({ bucketName -> ds3Client.putBucketSpectraS3(PutBucketSpectraS3Request(bucketName)) })
85+
}
86+
87+
@Throws(IOException::class)
88+
private fun populateBuckets(bucketNames: List<String>) {
89+
bucketNames
90+
.forEach({ bucketName ->
91+
Ds3ClientHelpers
92+
.wrap(ds3Client)
93+
.startWriteJob(bucketName, generateDs3Objects())
94+
.withMaxParallelRequests(numThreads)
95+
.transfer(RepeatStringObjectChannelBuilder(objectData, objectData.length * 512, dataBufferSize.toLong()))
96+
})
97+
98+
}
99+
100+
private fun generateDs3Objects(): List<Ds3Object> {
101+
return (1 .. numObjects)
102+
.map { index ->
103+
val ds3Object = Ds3Object()
104+
ds3Object.name = baseFileName + index
105+
ds3Object.size = objectSize.toLong()
106+
ds3Object
107+
}
108+
.toList()
109+
}
110+
111+
private fun deleteBuckets(bucketNames: List<String>) {
112+
bucketNames.forEach({ bucketName -> DeleteBucket.deleteBucket(Ds3ClientHelpers.wrap(ds3Client), bucketName) })
113+
}
114+
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/AbstractTransferStrategy.java

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515

1616
package com.spectralogic.ds3client.helpers.strategy.transferstrategy;
1717

18-
import com.google.common.base.Predicate;
1918
import com.google.common.collect.FluentIterable;
2019
import com.google.common.collect.Iterables;
21-
import com.google.common.util.concurrent.ListeningExecutorService;
2220
import com.spectralogic.ds3client.Ds3Client;
2321
import com.spectralogic.ds3client.commands.spectrads3.GetBulkJobSpectraS3Request;
2422
import com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request;
@@ -32,17 +30,15 @@
3230

3331
import java.io.IOException;
3432
import java.util.NoSuchElementException;
35-
import java.util.concurrent.Callable;
3633
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
3735
import java.util.concurrent.atomic.AtomicInteger;
3836
import java.util.concurrent.atomic.AtomicReference;
3937

4038
import com.spectralogic.ds3client.networking.FailedRequestException;
4139
import org.slf4j.Logger;
4240
import org.slf4j.LoggerFactory;
4341

44-
import javax.annotation.Nullable;
45-
4642
/**
4743
* An implementation of {@link TransferStrategy} that provides a default implementation {@code transfer}
4844
* implementation.
@@ -52,7 +48,7 @@ abstract class AbstractTransferStrategy implements TransferStrategy {
5248

5349
private final BlobStrategy blobStrategy;
5450
private final JobState jobState;
55-
private final ListeningExecutorService executorService;
51+
private final ExecutorService executorService;
5652
private final EventDispatcher eventDispatcher;
5753
private final MasterObjectList masterObjectList;
5854
private final FailureEvent.FailureActivity failureActivity;
@@ -77,7 +73,7 @@ abstract class AbstractTransferStrategy implements TransferStrategy {
7773
*/
7874
public AbstractTransferStrategy(final BlobStrategy blobStrategy,
7975
final JobState jobState,
80-
final ListeningExecutorService executorService,
76+
final ExecutorService executorService,
8177
final EventDispatcher eventDispatcher,
8278
final MasterObjectList masterObjectList,
8379
final FailureEvent.FailureActivity failureActivity)
@@ -109,9 +105,21 @@ public AbstractTransferStrategy withTransferMethod(final TransferMethod transfer
109105
public void transfer() throws IOException {
110106
cachedException.set(null);
111107

108+
try {
109+
transferAllJobBlobs();
110+
} finally {
111+
close();
112+
}
113+
114+
if (cachedException.get() != null) {
115+
throw cachedException.get();
116+
}
117+
}
118+
119+
private void transferAllJobBlobs() throws IOException {
112120
final AtomicInteger numBlobsRemaining = new AtomicInteger(jobState.numBlobsInJob());
113121

114-
while ( ! Thread.currentThread().isInterrupted() && numBlobsRemaining.get() > 0) {
122+
while (!Thread.currentThread().isInterrupted() && numBlobsRemaining.get() > 0) {
115123
try {
116124
final Iterable<JobPart> jobParts = jobPartsNotAlreadyTransferred();
117125

@@ -129,25 +137,17 @@ public void transfer() throws IOException {
129137
throw e;
130138
} catch (final InterruptedException | NoSuchElementException e) {
131139
Thread.currentThread().interrupt();
132-
} catch (final Throwable t) {
140+
} catch (final Throwable t) {
133141
emitFailureAndSetCachedException(t);
134142
}
135143
}
136-
137-
if (cachedException.get() != null) {
138-
throw cachedException.get();
139-
}
140144
}
141145

146+
142147
private Iterable<JobPart> jobPartsNotAlreadyTransferred() throws IOException, InterruptedException {
143148
return FluentIterable
144149
.from(blobStrategy.getWork())
145-
.filter(new Predicate<JobPart>() {
146-
@Override
147-
public boolean apply(@Nullable final JobPart jobPart) {
148-
return jobState.contains(jobPart.getBlob());
149-
}
150-
});
150+
.filter(jobPart -> jobState.contains(jobPart.getBlob()));
151151
}
152152

153153
private void emitFailureAndSetCachedException(final Throwable t) {
@@ -169,23 +169,19 @@ private void transferJobParts(final Iterable<JobPart> jobParts,
169169
final CountDownLatch countDownLatch,
170170
final AtomicInteger numBlobsTransferred) throws IOException {
171171
for (final JobPart jobPart : jobParts) {
172-
executorService.submit(new Callable<Void>() {
173-
@Override
174-
public Void call() throws Exception {
175-
try {
176-
transferMethod.transferJobPart(jobPart);
177-
} catch (final RuntimeException e) {
178-
emitFailureAndSetCachedException(e);
179-
throw e;
180-
} catch (final Exception e) {
181-
emitFailureAndSetCachedException(e);
182-
throw new RuntimeException(e);
183-
} finally {
184-
jobState.blobTransferredOrFailed(jobPart.getBlob());
185-
numBlobsTransferred.decrementAndGet();
186-
countDownLatch.countDown();
187-
return null;
188-
}
172+
executorService.execute(() -> {
173+
try {
174+
transferMethod.transferJobPart(jobPart);
175+
} catch (final RuntimeException e) {
176+
emitFailureAndSetCachedException(e);
177+
throw e;
178+
} catch (final Exception e) {
179+
emitFailureAndSetCachedException(e);
180+
throw new RuntimeException(e);
181+
} finally {
182+
jobState.blobTransferredOrFailed(jobPart.getBlob());
183+
numBlobsTransferred.decrementAndGet();
184+
countDownLatch.countDown();
189185
}
190186
});
191187
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/MultiThreadedTransferStrategy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package com.spectralogic.ds3client.helpers.strategy.transferstrategy;
1717

18-
import com.google.common.util.concurrent.MoreExecutors;
1918
import com.spectralogic.ds3client.helpers.JobState;
2019
import com.spectralogic.ds3client.helpers.events.FailureEvent;
2120
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.BlobStrategy;
@@ -36,7 +35,7 @@ public MultiThreadedTransferStrategy(final BlobStrategy blobStrategy,
3635
{
3736
super(blobStrategy,
3837
jobState,
39-
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numConcurrentTransferThreads)),
38+
Executors.newFixedThreadPool(numConcurrentTransferThreads),
4039
eventDispatcher,
4140
masterObjectList,
4241
failureActivity);

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/strategy/transferstrategy/SingleThreadedTransferStrategy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package com.spectralogic.ds3client.helpers.strategy.transferstrategy;
1717

18-
import com.google.common.util.concurrent.MoreExecutors;
1918
import com.spectralogic.ds3client.helpers.JobState;
2019
import com.spectralogic.ds3client.helpers.events.FailureEvent;
2120
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.BlobStrategy;
@@ -35,7 +34,7 @@ public SingleThreadedTransferStrategy(final BlobStrategy blobStrategy,
3534
{
3635
super(blobStrategy,
3736
jobState,
38-
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
37+
Executors.newSingleThreadExecutor(),
3938
eventDispatcher,
4039
masterObjectList,
4140
failureActivity);

0 commit comments

Comments
 (0)