Skip to content

Commit 5ae20cf

Browse files
committed
Revert "[SPARK-25408] Move to mode ideomatic Java8"
This reverts commit 44c1e1a.
1 parent 44c1e1a commit 5ae20cf

File tree

19 files changed

+303
-243
lines changed

19 files changed

+303
-243
lines changed

common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,11 @@ public final byte[] serialize(Object o) throws Exception {
5454
return ((String) o).getBytes(UTF_8);
5555
} else {
5656
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
57-
try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
57+
GZIPOutputStream out = new GZIPOutputStream(bytes);
58+
try {
5859
mapper.writeValue(out, o);
60+
} finally {
61+
out.close();
5962
}
6063
return bytes.toByteArray();
6164
}
@@ -66,8 +69,11 @@ public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
6669
if (klass.equals(String.class)) {
6770
return (T) new String(data, UTF_8);
6871
} else {
69-
try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data))) {
72+
GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
73+
try {
7074
return mapper.readValue(in, klass);
75+
} finally {
76+
in.close();
7177
}
7278
}
7379
}

common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public void testSkip() throws Exception {
217217
public void testNegativeIndexValues() throws Exception {
218218
List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);
219219

220-
expected.forEach(i -> {
220+
expected.stream().forEach(i -> {
221221
try {
222222
db.write(createCustomType1(i));
223223
} catch (Exception e) {

common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -143,38 +143,37 @@ public void releaseBuffers() {
143143
}
144144

145145
private FetchResult fetchChunks(List<Integer> chunkIndices) throws Exception {
146-
final FetchResult res = new FetchResult();
147-
148-
try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) {
149-
final Semaphore sem = new Semaphore(0);
146+
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
147+
final Semaphore sem = new Semaphore(0);
150148

151-
res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
152-
res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
153-
res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
154-
155-
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
156-
@Override
157-
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
158-
buffer.retain();
159-
res.successChunks.add(chunkIndex);
160-
res.buffers.add(buffer);
161-
sem.release();
162-
}
163-
164-
@Override
165-
public void onFailure(int chunkIndex, Throwable e) {
166-
res.failedChunks.add(chunkIndex);
167-
sem.release();
168-
}
169-
};
149+
final FetchResult res = new FetchResult();
150+
res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
151+
res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
152+
res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
170153

171-
for (int chunkIndex : chunkIndices) {
172-
client.fetchChunk(STREAM_ID, chunkIndex, callback);
154+
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
155+
@Override
156+
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
157+
buffer.retain();
158+
res.successChunks.add(chunkIndex);
159+
res.buffers.add(buffer);
160+
sem.release();
173161
}
174-
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
175-
fail("Timeout getting response from the server");
162+
163+
@Override
164+
public void onFailure(int chunkIndex, Throwable e) {
165+
res.failedChunks.add(chunkIndex);
166+
sem.release();
176167
}
168+
};
169+
170+
for (int chunkIndex : chunkIndices) {
171+
client.fetchChunk(STREAM_ID, chunkIndex, callback);
172+
}
173+
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
174+
fail("Timeout getting response from the server");
177175
}
176+
client.close();
178177
return res;
179178
}
180179

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,14 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
3737
size = (int)indexFile.length();
3838
ByteBuffer buffer = ByteBuffer.allocate(size);
3939
offsets = buffer.asLongBuffer();
40-
try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
40+
DataInputStream dis = null;
41+
try {
42+
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
4143
dis.readFully(buffer.array());
44+
} finally {
45+
if (dis != null) {
46+
dis.close();
47+
}
4248
}
4349
}
4450

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,19 @@ public void testSortShuffleBlocks() throws IOException {
9898
resolver.registerExecutor("app0", "exec0",
9999
dataContext.createExecutorInfo(SORT_MANAGER));
100100

101-
try (InputStream block0Stream = resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream()) {
102-
String block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
103-
assertEquals(sortBlock0, block0);
104-
}
105-
106-
try (InputStream block1Stream = resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream()) {
107-
String block1 = CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
108-
assertEquals(sortBlock1, block1);
109-
}
101+
InputStream block0Stream =
102+
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
103+
String block0 = CharStreams.toString(
104+
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
105+
block0Stream.close();
106+
assertEquals(sortBlock0, block0);
107+
108+
InputStream block1Stream =
109+
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
110+
String block1 = CharStreams.toString(
111+
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
112+
block1Stream.close();
113+
assertEquals(sortBlock1, block1);
110114
}
111115

112116
@Test

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -133,38 +133,37 @@ private FetchResult fetchBlocks(
133133

134134
final Semaphore requestsRemaining = new Semaphore(0);
135135

136-
try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) {
137-
client.init(APP_ID);
138-
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
139-
new BlockFetchingListener() {
140-
@Override
141-
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
142-
synchronized (this) {
143-
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
144-
data.retain();
145-
res.successBlocks.add(blockId);
146-
res.buffers.add(data);
147-
requestsRemaining.release();
148-
}
136+
ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000);
137+
client.init(APP_ID);
138+
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
139+
new BlockFetchingListener() {
140+
@Override
141+
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
142+
synchronized (this) {
143+
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
144+
data.retain();
145+
res.successBlocks.add(blockId);
146+
res.buffers.add(data);
147+
requestsRemaining.release();
149148
}
150149
}
151-
152-
@Override
153-
public void onBlockFetchFailure(String blockId, Throwable exception) {
154-
synchronized (this) {
155-
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
156-
res.failedBlocks.add(blockId);
157-
requestsRemaining.release();
158-
}
150+
}
151+
152+
@Override
153+
public void onBlockFetchFailure(String blockId, Throwable exception) {
154+
synchronized (this) {
155+
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
156+
res.failedBlocks.add(blockId);
157+
requestsRemaining.release();
159158
}
160159
}
161-
}, null
162-
);
160+
}
161+
}, null);
163162

164-
if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
165-
fail("Timeout getting response from the server");
166-
}
163+
if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
164+
fail("Timeout getting response from the server");
167165
}
166+
client.close();
168167
return res;
169168
}
170169

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,14 @@ private void validate(String appId, String secretKey, boolean encrypt)
9696
ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
9797
}
9898

99-
try (ExternalShuffleClient client =
100-
new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
101-
client.init(appId);
102-
// Registration either succeeds or throws an exception.
103-
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
104-
new ExecutorShuffleInfo(new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager"));
105-
}
99+
ExternalShuffleClient client =
100+
new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000);
101+
client.init(appId);
102+
// Registration either succeeds or throws an exception.
103+
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
104+
new ExecutorShuffleInfo(new String[0], 0,
105+
"org.apache.spark.shuffle.sort.SortShuffleManager"));
106+
client.close();
106107
}
107108

108109
/** Provides a secret key holder which always returns the given secret key, for a single appId. */

common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,10 @@ public static CountMinSketch readFrom(InputStream in) throws IOException {
191191
* Reads in a {@link CountMinSketch} from a byte array.
192192
*/
193193
public static CountMinSketch readFrom(byte[] bytes) throws IOException {
194-
try (InputStream in = new ByteArrayInputStream(bytes)) {
195-
return readFrom(in);
196-
}
194+
InputStream in = new ByteArrayInputStream(bytes);
195+
CountMinSketch cms = readFrom(in);
196+
in.close();
197+
return cms;
197198
}
198199

199200
/**

common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,10 +322,10 @@ public void writeTo(OutputStream out) throws IOException {
322322

323323
@Override
324324
public byte[] toByteArray() throws IOException {
325-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
326-
writeTo(out);
327-
return out.toByteArray();
328-
}
325+
ByteArrayOutputStream out = new ByteArrayOutputStream();
326+
writeTo(out);
327+
out.close();
328+
return out.toByteArray();
329329
}
330330

331331
public static CountMinSketchImpl readFrom(InputStream in) throws IOException {

core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java

Lines changed: 53 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -135,58 +135,62 @@ private void readAsync() throws IOException {
135135
} finally {
136136
stateChangeLock.unlock();
137137
}
138-
executorService.execute(() -> {
139-
stateChangeLock.lock();
140-
try {
141-
if (isClosed) {
142-
readInProgress = false;
143-
return;
144-
}
145-
// Flip this so that the close method will not close the underlying input stream when we
146-
// are reading.
147-
isReading = true;
148-
} finally {
149-
stateChangeLock.unlock();
150-
}
138+
executorService.execute(new Runnable() {
151139

152-
// Please note that it is safe to release the lock and read into the read ahead buffer
153-
// because either of following two conditions will hold - 1. The active buffer has
154-
// data available to read so the reader will not read from the read ahead buffer.
155-
// 2. This is the first time read is called or the active buffer is exhausted,
156-
// in that case the reader waits for this async read to complete.
157-
// So there is no race condition in both the situations.
158-
int read = 0;
159-
int off = 0, len = arr.length;
160-
Throwable exception = null;
161-
try {
162-
// try to fill the read ahead buffer.
163-
// if a reader is waiting, possibly return early.
164-
do {
165-
read = underlyingInputStream.read(arr, off, len);
166-
if (read <= 0) break;
167-
off += read;
168-
len -= read;
169-
} while (len > 0 && !isWaiting.get());
170-
} catch (Throwable ex) {
171-
exception = ex;
172-
if (ex instanceof Error) {
173-
// `readException` may not be reported to the user. Rethrow Error to make sure at least
174-
// The user can see Error in UncaughtExceptionHandler.
175-
throw (Error) ex;
176-
}
177-
} finally {
140+
@Override
141+
public void run() {
178142
stateChangeLock.lock();
179-
readAheadBuffer.limit(off);
180-
if (read < 0 || (exception instanceof EOFException)) {
181-
endOfStream = true;
182-
} else if (exception != null) {
183-
readAborted = true;
184-
readException = exception;
143+
try {
144+
if (isClosed) {
145+
readInProgress = false;
146+
return;
147+
}
148+
// Flip this so that the close method will not close the underlying input stream when we
149+
// are reading.
150+
isReading = true;
151+
} finally {
152+
stateChangeLock.unlock();
153+
}
154+
155+
// Please note that it is safe to release the lock and read into the read ahead buffer
156+
// because either of following two conditions will hold - 1. The active buffer has
157+
// data available to read so the reader will not read from the read ahead buffer.
158+
// 2. This is the first time read is called or the active buffer is exhausted,
159+
// in that case the reader waits for this async read to complete.
160+
// So there is no race condition in both the situations.
161+
int read = 0;
162+
int off = 0, len = arr.length;
163+
Throwable exception = null;
164+
try {
165+
// try to fill the read ahead buffer.
166+
// if a reader is waiting, possibly return early.
167+
do {
168+
read = underlyingInputStream.read(arr, off, len);
169+
if (read <= 0) break;
170+
off += read;
171+
len -= read;
172+
} while (len > 0 && !isWaiting.get());
173+
} catch (Throwable ex) {
174+
exception = ex;
175+
if (ex instanceof Error) {
176+
// `readException` may not be reported to the user. Rethrow Error to make sure at least
177+
// The user can see Error in UncaughtExceptionHandler.
178+
throw (Error) ex;
179+
}
180+
} finally {
181+
stateChangeLock.lock();
182+
readAheadBuffer.limit(off);
183+
if (read < 0 || (exception instanceof EOFException)) {
184+
endOfStream = true;
185+
} else if (exception != null) {
186+
readAborted = true;
187+
readException = exception;
188+
}
189+
readInProgress = false;
190+
signalAsyncReadComplete();
191+
stateChangeLock.unlock();
192+
closeUnderlyingInputStreamIfNecessary();
185193
}
186-
readInProgress = false;
187-
signalAsyncReadComplete();
188-
stateChangeLock.unlock();
189-
closeUnderlyingInputStreamIfNecessary();
190194
}
191195
});
192196
}

0 commit comments

Comments
 (0)