Skip to content

Commit 75849c9

Browse files
committed
add ability to change the underlying executor service in OutputStream's
Signed-off-by: Andrea Cioni <[email protected]>
1 parent 765bbdf commit 75849c9

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStream.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class S3MultipartOutputStream extends OutputStream {
4444

4545
private final PipedOutputStream pipedOutputStream;
4646

47-
private final ExecutorService singleThreadExecutor;
47+
private ExecutorService singleThreadExecutor;
4848

4949
private volatile boolean uploading;
5050

@@ -57,7 +57,6 @@ public S3MultipartOutputStream(S3Client s3Client, String bucketName, String key)
5757
public S3MultipartOutputStream(S3Uploader s3Uploader) throws IOException {
5858
this.pipedInputStream = new PipedInputStream();
5959
this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
60-
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
6160
this.uploading = false;
6261
this.multipartUpload = s3Uploader;
6362
}
@@ -73,6 +72,10 @@ public void write(int b) throws IOException {
7372
}
7473

7574
private void startUpload() {
75+
if(this.singleThreadExecutor == null) {
76+
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
77+
}
78+
7679
this.singleThreadExecutor.execute(() -> {
7780
try {
7881
this.multipartUpload.upload(this.pipedInputStream);
@@ -114,4 +117,7 @@ public void close() throws IOException {
114117
super.close();
115118
}
116119

120+
public void setSingleThreadExecutor(ExecutorService singleThreadExecutor) {
121+
this.singleThreadExecutor = singleThreadExecutor;
122+
}
117123
}

spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3OutputStream.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class S3OutputStream extends OutputStream {
5454

5555
private final PipedOutputStream pipedOutputStream;
5656

57-
private final ExecutorService singleThreadExecutor;
57+
private ExecutorService singleThreadExecutor;
5858

5959
private volatile boolean uploading;
6060

@@ -66,7 +66,6 @@ public S3OutputStream(S3Client s3, String bucketName, String key) throws IOExcep
6666
this.key = key;
6767
this.pipedInputStream = new PipedInputStream();
6868
this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
69-
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
7069
this.uploading = false;
7170
}
7271

@@ -80,6 +79,10 @@ public void write(int b) throws IOException {
8079
}
8180

8281
private void runUploadThread() {
82+
if(this.singleThreadExecutor == null) {
83+
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
84+
}
85+
8386
this.singleThreadExecutor.execute(() -> {
8487
try {
8588
RequestBody body = RequestBody

0 commit comments

Comments
 (0)