Skip to content

Commit bd08934

Browse files
committed
Downgrade Netty from 4.1.x series to 4.0.x series
1 parent 00a5e85 commit bd08934

File tree

9 files changed

+12
-137
lines changed

9 files changed

+12
-137
lines changed

common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,6 @@ public long transfered() {
203203
return transferred;
204204
}
205205

206-
@Override
207-
public long transferred() {
208-
return transferred;
209-
}
210-
211206
@Override
212207
public long transferTo(WritableByteChannel target, long position) throws IOException {
213208
Preconditions.checkArgument(position == transfered(), "Invalid position.");
@@ -237,7 +232,7 @@ private void encryptMore() throws IOException {
237232
int copied = byteRawChannel.write(buf.nioBuffer());
238233
buf.skipBytes(copied);
239234
} else {
240-
region.transferTo(byteRawChannel, region.transferred());
235+
region.transferTo(byteRawChannel, region.transfered());
241236
}
242237
cos.write(byteRawChannel.getData(), 0, byteRawChannel.length());
243238
cos.flush();
@@ -246,28 +241,6 @@ private void encryptMore() throws IOException {
246241
0, byteEncChannel.length());
247242
}
248243

249-
@Override
250-
public FileRegion retain() {
251-
super.retain();
252-
return this;
253-
}
254-
255-
@Override
256-
public FileRegion retain(int increment) {
257-
super.retain(increment);
258-
return this;
259-
}
260-
261-
@Override
262-
public FileRegion touch() {
263-
return this;
264-
}
265-
266-
@Override
267-
public FileRegion touch(Object o) {
268-
return this;
269-
}
270-
271244
@Override
272245
protected void deallocate() {
273246
byteRawChannel.reset();

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,6 @@ public long transfered() {
9595
return totalBytesTransferred;
9696
}
9797

98-
@Override
99-
public long transferred() {
100-
return totalBytesTransferred;
101-
}
102-
10398
/**
10499
* This code is more complicated than you would think because we might require multiple
105100
* transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
@@ -132,28 +127,6 @@ public long transferTo(final WritableByteChannel target, final long position) th
132127
return writtenHeader + writtenBody;
133128
}
134129

135-
@Override
136-
public FileRegion touch(Object msg) {
137-
return this;
138-
}
139-
140-
@Override
141-
public FileRegion retain() {
142-
super.retain();
143-
return this;
144-
}
145-
146-
@Override
147-
public FileRegion retain(int increment) {
148-
super.retain(increment);
149-
return this;
150-
}
151-
152-
@Override
153-
public FileRegion touch() {
154-
return this;
155-
}
156-
157130
@Override
158131
protected void deallocate() {
159132
header.release();

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,6 @@ public long transfered() {
187187
return transferred;
188188
}
189189

190-
@Override
191-
public long transferred() {
192-
return transferred;
193-
}
194-
195190
/**
196191
* Transfers data from the original message to the channel, encrypting it in the process.
197192
*
@@ -267,7 +262,7 @@ private void nextChunk() throws IOException {
267262
int copied = byteChannel.write(buf.nioBuffer());
268263
buf.skipBytes(copied);
269264
} else {
270-
region.transferTo(byteChannel, region.transferred());
265+
region.transferTo(byteChannel, region.transfered());
271266
}
272267

273268
byte[] encrypted = backend.wrap(byteChannel.getData(), 0, byteChannel.length());
@@ -277,28 +272,6 @@ private void nextChunk() throws IOException {
277272
this.unencryptedChunkSize = byteChannel.length();
278273
}
279274

280-
@Override
281-
public FileRegion touch(Object o) {
282-
return this;
283-
}
284-
285-
@Override
286-
public FileRegion retain() {
287-
super.retain();
288-
return this;
289-
}
290-
291-
@Override
292-
public FileRegion retain(int increment) {
293-
super.retain(increment);
294-
return this;
295-
}
296-
297-
@Override
298-
public FileRegion touch() {
299-
return this;
300-
}
301-
302275
@Override
303276
protected void deallocate() {
304277
if (currentHeader != null) {

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,11 @@ public void close() {
140140
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
141141
channelFuture = null;
142142
}
143-
if (bootstrap != null && bootstrap.config() != null && bootstrap.config().group() != null) {
144-
bootstrap.config().group().shutdownGracefully();
143+
if (bootstrap != null && bootstrap.group() != null) {
144+
bootstrap.group().shutdownGracefully();
145145
}
146-
if (bootstrap != null && bootstrap.config() != null
147-
&& bootstrap.config().childGroup() != null) {
148-
bootstrap.config().childGroup().shutdownGracefully();
146+
if (bootstrap != null &&bootstrap.childGroup() != null) {
147+
bootstrap.childGroup().shutdownGracefully();
149148
}
150149
bootstrap = null;
151150
}

common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
5656
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
5757

5858
while (!serverChannel.outboundMessages().isEmpty()) {
59-
clientChannel.writeOneInbound(serverChannel.readOutbound());
59+
clientChannel.writeInbound(serverChannel.readOutbound());
6060
}
6161

6262
assertEquals(1, clientChannel.inboundMessages().size());
@@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
7272
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
7373

7474
while (!clientChannel.outboundMessages().isEmpty()) {
75-
serverChannel.writeOneInbound(clientChannel.readOutbound());
75+
serverChannel.writeInbound(clientChannel.readOutbound());
7676
}
7777

7878
assertEquals(1, serverChannel.inboundMessages().size());
@@ -116,8 +116,8 @@ public void encode(ChannelHandlerContext ctx, FileRegion in, List<Object> out)
116116
throws Exception {
117117

118118
ByteArrayWritableChannel channel = new ByteArrayWritableChannel(Ints.checkedCast(in.count()));
119-
while (in.transferred() < in.count()) {
120-
in.transferTo(channel, in.transferred());
119+
while (in.transfered() < in.count()) {
120+
in.transferTo(channel, in.transfered());
121121
}
122122
out.add(Unpooled.wrappedBuffer(channel.getData()));
123123
}

common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,6 @@ public long transfered() {
134134
return 8 * written;
135135
}
136136

137-
@Override
138-
public long transferred() {
139-
return 8 * written;
140-
}
141-
142137
@Override
143138
public long transferTo(WritableByteChannel target, long position) throws IOException {
144139
for (int i = 0; i < writesPerCall; i++) {
@@ -153,28 +148,6 @@ public long transferTo(WritableByteChannel target, long position) throws IOExcep
153148
return 8 * writesPerCall;
154149
}
155150

156-
@Override
157-
public FileRegion retain() {
158-
super.retain();
159-
return this;
160-
}
161-
162-
@Override
163-
public FileRegion retain(int increment) {
164-
super.retain(increment);
165-
return this;
166-
}
167-
168-
@Override
169-
public FileRegion touch(Object o) {
170-
return this;
171-
}
172-
173-
@Override
174-
public FileRegion touch() {
175-
return this;
176-
}
177-
178151
@Override
179152
protected void deallocate() {
180153
}

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -305,22 +305,6 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:
305305
}
306306

307307
override def deallocate(): Unit = source.close()
308-
309-
override def transferred(): Long = _transferred
310-
311-
override def touch(o: scala.Any): FileRegion = this
312-
313-
override def retain(): FileRegion = {
314-
super.retain()
315-
this
316-
}
317-
318-
override def retain(increment: Int): FileRegion = {
319-
super.retain(increment)
320-
this
321-
}
322-
323-
override def touch(): FileRegion = this
324308
}
325309

326310
private class CountingWritableChannel(sink: WritableByteChannel) extends WritableByteChannel {

dev/deps/spark-deps-hadoop-palantir

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ mimepull-1.9.6.jar
186186
minlog-1.3.0.jar
187187
mx4j-3.0.2.jar
188188
netty-3.10.6.Final.jar
189-
netty-all-4.1.13.Final.jar
189+
netty-all-4.0.50.Final.jar
190190
nimbus-jose-jwt-3.9.jar
191191
objenesis-2.5.1.jar
192192
okhttp-2.7.5.jar

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,7 @@
698698
<dependency>
699699
<groupId>io.netty</groupId>
700700
<artifactId>netty-all</artifactId>
701-
<version>4.1.13.Final</version>
701+
<version>4.0.50.Final</version>
702702
</dependency>
703703
<dependency>
704704
<groupId>io.netty</groupId>

0 commit comments

Comments
 (0)