Skip to content

Commit 44c1e1a

Browse files
Fokko Driesprongsrowen
authored andcommitted
[SPARK-25408] Move to mode ideomatic Java8
While working on another PR, I noticed that there is quite some legacy Java in there that can be beautified. For example the use og features from Java8, such as: - Collection libraries - Try-with-resource blocks No code has been changed What are your thoughts on this? This makes code easier to read, and using try-with-resource makes is less likely to forget to close something. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#22399 from Fokko/SPARK-25408. Authored-by: Fokko Driesprong <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 8113b9c commit 44c1e1a

File tree

19 files changed

+243
-303
lines changed

19 files changed

+243
-303
lines changed

common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,8 @@ public final byte[] serialize(Object o) throws Exception {
5454
return ((String) o).getBytes(UTF_8);
5555
} else {
5656
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
57-
GZIPOutputStream out = new GZIPOutputStream(bytes);
58-
try {
57+
try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
5958
mapper.writeValue(out, o);
60-
} finally {
61-
out.close();
6259
}
6360
return bytes.toByteArray();
6461
}
@@ -69,11 +66,8 @@ public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
6966
if (klass.equals(String.class)) {
7067
return (T) new String(data, UTF_8);
7168
} else {
72-
GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
73-
try {
69+
try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data))) {
7470
return mapper.readValue(in, klass);
75-
} finally {
76-
in.close();
7771
}
7872
}
7973
}

common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public void testSkip() throws Exception {
217217
public void testNegativeIndexValues() throws Exception {
218218
List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);
219219

220-
expected.stream().forEach(i -> {
220+
expected.forEach(i -> {
221221
try {
222222
db.write(createCustomType1(i));
223223
} catch (Exception e) {

common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -143,37 +143,38 @@ public void releaseBuffers() {
143143
}
144144

145145
private FetchResult fetchChunks(List<Integer> chunkIndices) throws Exception {
146-
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
147-
final Semaphore sem = new Semaphore(0);
148-
149146
final FetchResult res = new FetchResult();
150-
res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
151-
res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
152-
res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
153147

154-
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
155-
@Override
156-
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
157-
buffer.retain();
158-
res.successChunks.add(chunkIndex);
159-
res.buffers.add(buffer);
160-
sem.release();
161-
}
148+
try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) {
149+
final Semaphore sem = new Semaphore(0);
162150

163-
@Override
164-
public void onFailure(int chunkIndex, Throwable e) {
165-
res.failedChunks.add(chunkIndex);
166-
sem.release();
167-
}
168-
};
151+
res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
152+
res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
153+
res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
169154

170-
for (int chunkIndex : chunkIndices) {
171-
client.fetchChunk(STREAM_ID, chunkIndex, callback);
172-
}
173-
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
174-
fail("Timeout getting response from the server");
155+
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
156+
@Override
157+
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
158+
buffer.retain();
159+
res.successChunks.add(chunkIndex);
160+
res.buffers.add(buffer);
161+
sem.release();
162+
}
163+
164+
@Override
165+
public void onFailure(int chunkIndex, Throwable e) {
166+
res.failedChunks.add(chunkIndex);
167+
sem.release();
168+
}
169+
};
170+
171+
for (int chunkIndex : chunkIndices) {
172+
client.fetchChunk(STREAM_ID, chunkIndex, callback);
173+
}
174+
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
175+
fail("Timeout getting response from the server");
176+
}
175177
}
176-
client.close();
177178
return res;
178179
}
179180

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,8 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
3737
size = (int)indexFile.length();
3838
ByteBuffer buffer = ByteBuffer.allocate(size);
3939
offsets = buffer.asLongBuffer();
40-
DataInputStream dis = null;
41-
try {
42-
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
40+
try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
4341
dis.readFully(buffer.array());
44-
} finally {
45-
if (dis != null) {
46-
dis.close();
47-
}
4842
}
4943
}
5044

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,15 @@ public void testSortShuffleBlocks() throws IOException {
9898
resolver.registerExecutor("app0", "exec0",
9999
dataContext.createExecutorInfo(SORT_MANAGER));
100100

101-
InputStream block0Stream =
102-
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
103-
String block0 = CharStreams.toString(
104-
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
105-
block0Stream.close();
106-
assertEquals(sortBlock0, block0);
107-
108-
InputStream block1Stream =
109-
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
110-
String block1 = CharStreams.toString(
111-
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
112-
block1Stream.close();
113-
assertEquals(sortBlock1, block1);
101+
try (InputStream block0Stream = resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream()) {
102+
String block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
103+
assertEquals(sortBlock0, block0);
104+
}
105+
106+
try (InputStream block1Stream = resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream()) {
107+
String block1 = CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
108+
assertEquals(sortBlock1, block1);
109+
}
114110
}
115111

116112
@Test

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -133,37 +133,38 @@ private FetchResult fetchBlocks(
133133

134134
final Semaphore requestsRemaining = new Semaphore(0);
135135

136-
ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000);
137-
client.init(APP_ID);
138-
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
139-
new BlockFetchingListener() {
140-
@Override
141-
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
142-
synchronized (this) {
143-
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
144-
data.retain();
145-
res.successBlocks.add(blockId);
146-
res.buffers.add(data);
147-
requestsRemaining.release();
136+
try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) {
137+
client.init(APP_ID);
138+
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
139+
new BlockFetchingListener() {
140+
@Override
141+
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
142+
synchronized (this) {
143+
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
144+
data.retain();
145+
res.successBlocks.add(blockId);
146+
res.buffers.add(data);
147+
requestsRemaining.release();
148+
}
148149
}
149150
}
150-
}
151-
152-
@Override
153-
public void onBlockFetchFailure(String blockId, Throwable exception) {
154-
synchronized (this) {
155-
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
156-
res.failedBlocks.add(blockId);
157-
requestsRemaining.release();
151+
152+
@Override
153+
public void onBlockFetchFailure(String blockId, Throwable exception) {
154+
synchronized (this) {
155+
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
156+
res.failedBlocks.add(blockId);
157+
requestsRemaining.release();
158+
}
158159
}
159160
}
160-
}
161-
}, null);
161+
}, null
162+
);
162163

163-
if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
164-
fail("Timeout getting response from the server");
164+
if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
165+
fail("Timeout getting response from the server");
166+
}
165167
}
166-
client.close();
167168
return res;
168169
}
169170

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,13 @@ private void validate(String appId, String secretKey, boolean encrypt)
9696
ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
9797
}
9898

99-
ExternalShuffleClient client =
100-
new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000);
101-
client.init(appId);
102-
// Registration either succeeds or throws an exception.
103-
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
104-
new ExecutorShuffleInfo(new String[0], 0,
105-
"org.apache.spark.shuffle.sort.SortShuffleManager"));
106-
client.close();
99+
try (ExternalShuffleClient client =
100+
new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
101+
client.init(appId);
102+
// Registration either succeeds or throws an exception.
103+
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
104+
new ExecutorShuffleInfo(new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager"));
105+
}
107106
}
108107

109108
/** Provides a secret key holder which always returns the given secret key, for a single appId. */

common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,9 @@ public static CountMinSketch readFrom(InputStream in) throws IOException {
191191
* Reads in a {@link CountMinSketch} from a byte array.
192192
*/
193193
public static CountMinSketch readFrom(byte[] bytes) throws IOException {
194-
InputStream in = new ByteArrayInputStream(bytes);
195-
CountMinSketch cms = readFrom(in);
196-
in.close();
197-
return cms;
194+
try (InputStream in = new ByteArrayInputStream(bytes)) {
195+
return readFrom(in);
196+
}
198197
}
199198

200199
/**

common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,10 +322,10 @@ public void writeTo(OutputStream out) throws IOException {
322322

323323
@Override
324324
public byte[] toByteArray() throws IOException {
325-
ByteArrayOutputStream out = new ByteArrayOutputStream();
326-
writeTo(out);
327-
out.close();
328-
return out.toByteArray();
325+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
326+
writeTo(out);
327+
return out.toByteArray();
328+
}
329329
}
330330

331331
public static CountMinSketchImpl readFrom(InputStream in) throws IOException {

core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java

Lines changed: 49 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -135,62 +135,58 @@ private void readAsync() throws IOException {
135135
} finally {
136136
stateChangeLock.unlock();
137137
}
138-
executorService.execute(new Runnable() {
139-
140-
@Override
141-
public void run() {
142-
stateChangeLock.lock();
143-
try {
144-
if (isClosed) {
145-
readInProgress = false;
146-
return;
147-
}
148-
// Flip this so that the close method will not close the underlying input stream when we
149-
// are reading.
150-
isReading = true;
151-
} finally {
152-
stateChangeLock.unlock();
138+
executorService.execute(() -> {
139+
stateChangeLock.lock();
140+
try {
141+
if (isClosed) {
142+
readInProgress = false;
143+
return;
153144
}
145+
// Flip this so that the close method will not close the underlying input stream when we
146+
// are reading.
147+
isReading = true;
148+
} finally {
149+
stateChangeLock.unlock();
150+
}
154151

155-
// Please note that it is safe to release the lock and read into the read ahead buffer
156-
// because either of following two conditions will hold - 1. The active buffer has
157-
// data available to read so the reader will not read from the read ahead buffer.
158-
// 2. This is the first time read is called or the active buffer is exhausted,
159-
// in that case the reader waits for this async read to complete.
160-
// So there is no race condition in both the situations.
161-
int read = 0;
162-
int off = 0, len = arr.length;
163-
Throwable exception = null;
164-
try {
165-
// try to fill the read ahead buffer.
166-
// if a reader is waiting, possibly return early.
167-
do {
168-
read = underlyingInputStream.read(arr, off, len);
169-
if (read <= 0) break;
170-
off += read;
171-
len -= read;
172-
} while (len > 0 && !isWaiting.get());
173-
} catch (Throwable ex) {
174-
exception = ex;
175-
if (ex instanceof Error) {
176-
// `readException` may not be reported to the user. Rethrow Error to make sure at least
177-
// The user can see Error in UncaughtExceptionHandler.
178-
throw (Error) ex;
179-
}
180-
} finally {
181-
stateChangeLock.lock();
182-
readAheadBuffer.limit(off);
183-
if (read < 0 || (exception instanceof EOFException)) {
184-
endOfStream = true;
185-
} else if (exception != null) {
186-
readAborted = true;
187-
readException = exception;
188-
}
189-
readInProgress = false;
190-
signalAsyncReadComplete();
191-
stateChangeLock.unlock();
192-
closeUnderlyingInputStreamIfNecessary();
152+
// Please note that it is safe to release the lock and read into the read ahead buffer
153+
// because either of following two conditions will hold - 1. The active buffer has
154+
// data available to read so the reader will not read from the read ahead buffer.
155+
// 2. This is the first time read is called or the active buffer is exhausted,
156+
// in that case the reader waits for this async read to complete.
157+
// So there is no race condition in both the situations.
158+
int read = 0;
159+
int off = 0, len = arr.length;
160+
Throwable exception = null;
161+
try {
162+
// try to fill the read ahead buffer.
163+
// if a reader is waiting, possibly return early.
164+
do {
165+
read = underlyingInputStream.read(arr, off, len);
166+
if (read <= 0) break;
167+
off += read;
168+
len -= read;
169+
} while (len > 0 && !isWaiting.get());
170+
} catch (Throwable ex) {
171+
exception = ex;
172+
if (ex instanceof Error) {
173+
// `readException` may not be reported to the user. Rethrow Error to make sure at least
174+
// The user can see Error in UncaughtExceptionHandler.
175+
throw (Error) ex;
193176
}
177+
} finally {
178+
stateChangeLock.lock();
179+
readAheadBuffer.limit(off);
180+
if (read < 0 || (exception instanceof EOFException)) {
181+
endOfStream = true;
182+
} else if (exception != null) {
183+
readAborted = true;
184+
readException = exception;
185+
}
186+
readInProgress = false;
187+
signalAsyncReadComplete();
188+
stateChangeLock.unlock();
189+
closeUnderlyingInputStreamIfNecessary();
194190
}
195191
});
196192
}

0 commit comments

Comments
 (0)