Skip to content

Commit 59d5263

Browse files
BryanCutlerHyukjinKwon
authored andcommitted
[SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
## What changes were proposed in this pull request? Upgrade Spark to Arrow 0.8.0 for Java and Python. Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements. The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include: * Java refactoring for more simple API * Java reduced heap usage and streamlined hot code paths * Type support for DecimalType, ArrayType * Improved type casting support in Python * Simplified type checking in Python ## How was this patch tested? Existing tests Author: Bryan Cutler <[email protected]> Author: Shixiong Zhu <[email protected]> Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
1 parent cb9fc8d commit 59d5263

File tree

26 files changed

+515
-869
lines changed

26 files changed

+515
-869
lines changed

common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import io.netty.buffer.ByteBuf;
3131
import io.netty.buffer.Unpooled;
3232
import io.netty.channel.*;
33-
import io.netty.util.AbstractReferenceCounted;
3433
import org.apache.commons.crypto.stream.CryptoInputStream;
3534
import org.apache.commons.crypto.stream.CryptoOutputStream;
3635

36+
import org.apache.spark.network.util.AbstractFileRegion;
3737
import org.apache.spark.network.util.ByteArrayReadableChannel;
3838
import org.apache.spark.network.util.ByteArrayWritableChannel;
3939

@@ -161,7 +161,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
161161
}
162162
}
163163

164-
private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
164+
private static class EncryptedMessage extends AbstractFileRegion {
165165
private final boolean isByteBuf;
166166
private final ByteBuf buf;
167167
private final FileRegion region;
@@ -199,10 +199,45 @@ public long position() {
199199
}
200200

201201
@Override
202-
public long transfered() {
202+
public long transferred() {
203203
return transferred;
204204
}
205205

206+
@Override
207+
public EncryptedMessage touch(Object o) {
208+
super.touch(o);
209+
if (region != null) {
210+
region.touch(o);
211+
}
212+
if (buf != null) {
213+
buf.touch(o);
214+
}
215+
return this;
216+
}
217+
218+
@Override
219+
public EncryptedMessage retain(int increment) {
220+
super.retain(increment);
221+
if (region != null) {
222+
region.retain(increment);
223+
}
224+
if (buf != null) {
225+
buf.retain(increment);
226+
}
227+
return this;
228+
}
229+
230+
@Override
231+
public boolean release(int decrement) {
232+
if (region != null) {
233+
region.release(decrement);
234+
}
235+
if (buf != null) {
236+
buf.release(decrement);
237+
}
238+
return super.release(decrement);
239+
}
240+
206241
@Override
207242
public long transferTo(WritableByteChannel target, long position) throws IOException {
208243
Preconditions.checkArgument(position == transfered(), "Invalid position.");

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@
2525
import com.google.common.base.Preconditions;
2626
import io.netty.buffer.ByteBuf;
2727
import io.netty.channel.FileRegion;
28-
import io.netty.util.AbstractReferenceCounted;
2928
import io.netty.util.ReferenceCountUtil;
3029

3130
import org.apache.spark.network.buffer.ManagedBuffer;
31+
import org.apache.spark.network.util.AbstractFileRegion;
3232

3333
/**
3434
* A wrapper message that holds two separate pieces (a header and a body).
3535
*
3636
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
3737
*/
38-
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
38+
class MessageWithHeader extends AbstractFileRegion {
3939

4040
@Nullable private final ManagedBuffer managedBuffer;
4141
private final ByteBuf header;
@@ -91,7 +91,7 @@ public long position() {
9191
}
9292

9393
@Override
94-
public long transfered() {
94+
public long transferred() {
9595
return totalBytesTransferred;
9696
}
9797

@@ -160,4 +160,37 @@ private int writeNioBuffer(
160160

161161
return ret;
162162
}
163+
164+
@Override
165+
public MessageWithHeader touch(Object o) {
166+
super.touch(o);
167+
header.touch(o);
168+
ReferenceCountUtil.touch(body, o);
169+
return this;
170+
}
171+
172+
@Override
173+
public MessageWithHeader retain(int increment) {
174+
super.retain(increment);
175+
header.retain(increment);
176+
ReferenceCountUtil.retain(body, increment);
177+
if (managedBuffer != null) {
178+
for (int i = 0; i < increment; i++) {
179+
managedBuffer.retain();
180+
}
181+
}
182+
return this;
183+
}
184+
185+
@Override
186+
public boolean release(int decrement) {
187+
header.release(decrement);
188+
ReferenceCountUtil.release(body, decrement);
189+
if (managedBuffer != null) {
190+
for (int i = 0; i < decrement; i++) {
191+
managedBuffer.release();
192+
}
193+
}
194+
return super.release(decrement);
195+
}
163196
}

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import io.netty.channel.ChannelPromise;
3333
import io.netty.channel.FileRegion;
3434
import io.netty.handler.codec.MessageToMessageDecoder;
35-
import io.netty.util.AbstractReferenceCounted;
3635

36+
import org.apache.spark.network.util.AbstractFileRegion;
3737
import org.apache.spark.network.util.ByteArrayWritableChannel;
3838
import org.apache.spark.network.util.NettyUtils;
3939

@@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
129129
}
130130

131131
@VisibleForTesting
132-
static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
132+
static class EncryptedMessage extends AbstractFileRegion {
133133

134134
private final SaslEncryptionBackend backend;
135135
private final boolean isByteBuf;
@@ -183,10 +183,45 @@ public long position() {
183183
* Returns an approximation of the amount of data transferred. See {@link #count()}.
184184
*/
185185
@Override
186-
public long transfered() {
186+
public long transferred() {
187187
return transferred;
188188
}
189189

190+
@Override
191+
public EncryptedMessage touch(Object o) {
192+
super.touch(o);
193+
if (buf != null) {
194+
buf.touch(o);
195+
}
196+
if (region != null) {
197+
region.touch(o);
198+
}
199+
return this;
200+
}
201+
202+
@Override
203+
public EncryptedMessage retain(int increment) {
204+
super.retain(increment);
205+
if (buf != null) {
206+
buf.retain(increment);
207+
}
208+
if (region != null) {
209+
region.retain(increment);
210+
}
211+
return this;
212+
}
213+
214+
@Override
215+
public boolean release(int decrement) {
216+
if (region != null) {
217+
region.release(decrement);
218+
}
219+
if (buf != null) {
220+
buf.release(decrement);
221+
}
222+
return super.release(decrement);
223+
}
224+
190225
/**
191226
* Transfers data from the original message to the channel, encrypting it in the process.
192227
*
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import io.netty.channel.FileRegion;
21+
import io.netty.util.AbstractReferenceCounted;
22+
23+
public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion {
24+
25+
@Override
26+
@SuppressWarnings("deprecation")
27+
public final long transfered() {
28+
return transferred();
29+
}
30+
31+
@Override
32+
public AbstractFileRegion retain() {
33+
super.retain();
34+
return this;
35+
}
36+
37+
@Override
38+
public AbstractFileRegion retain(int increment) {
39+
super.retain(increment);
40+
return this;
41+
}
42+
43+
@Override
44+
public AbstractFileRegion touch() {
45+
super.touch();
46+
return this;
47+
}
48+
49+
@Override
50+
public AbstractFileRegion touch(Object o) {
51+
return this;
52+
}
53+
}

common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
5656
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
5757

5858
while (!serverChannel.outboundMessages().isEmpty()) {
59-
clientChannel.writeInbound(serverChannel.readOutbound());
59+
clientChannel.writeOneInbound(serverChannel.readOutbound());
6060
}
6161

6262
assertEquals(1, clientChannel.inboundMessages().size());
@@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
7272
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
7373

7474
while (!clientChannel.outboundMessages().isEmpty()) {
75-
serverChannel.writeInbound(clientChannel.readOutbound());
75+
serverChannel.writeOneInbound(clientChannel.readOutbound());
7676
}
7777

7878
assertEquals(1, serverChannel.inboundMessages().size());

common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323

2424
import io.netty.buffer.ByteBuf;
2525
import io.netty.buffer.Unpooled;
26-
import io.netty.channel.FileRegion;
27-
import io.netty.util.AbstractReferenceCounted;
26+
import org.apache.spark.network.util.AbstractFileRegion;
2827
import org.junit.Test;
2928
import org.mockito.Mockito;
3029

@@ -108,7 +107,7 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc
108107
return Unpooled.wrappedBuffer(channel.getData());
109108
}
110109

111-
private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion {
110+
private static class TestFileRegion extends AbstractFileRegion {
112111

113112
private final int writeCount;
114113
private final int writesPerCall;
@@ -130,7 +129,7 @@ public long position() {
130129
}
131130

132131
@Override
133-
public long transfered() {
132+
public long transferred() {
134133
return 8 * written;
135134
}
136135

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@ import java.util.concurrent.ConcurrentHashMap
2626
import scala.collection.mutable.ListBuffer
2727

2828
import com.google.common.io.Closeables
29-
import io.netty.channel.{DefaultFileRegion, FileRegion}
30-
import io.netty.util.AbstractReferenceCounted
29+
import io.netty.channel.DefaultFileRegion
3130

3231
import org.apache.spark.{SecurityManager, SparkConf}
3332
import org.apache.spark.internal.Logging
34-
import org.apache.spark.network.util.JavaUtils
33+
import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
3534
import org.apache.spark.security.CryptoStreamUtils
3635
import org.apache.spark.util.Utils
3736
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -266,7 +265,7 @@ private class EncryptedBlockData(
266265
}
267266

268267
private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long)
269-
extends AbstractReferenceCounted with FileRegion {
268+
extends AbstractFileRegion {
270269

271270
private var _transferred = 0L
272271

@@ -277,7 +276,7 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:
277276

278277
override def position(): Long = 0
279278

280-
override def transfered(): Long = _transferred
279+
override def transferred(): Long = _transferred
281280

282281
override def transferTo(target: WritableByteChannel, pos: Long): Long = {
283282
assert(pos == transfered(), "Invalid position.")

dev/deps/spark-deps-hadoop-2.6

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
1414
api-asn1-api-1.0.0-M20.jar
1515
api-util-1.0.0-M20.jar
1616
arpack_combined_all-0.1.jar
17-
arrow-format-0.4.0.jar
18-
arrow-memory-0.4.0.jar
19-
arrow-vector-0.4.0.jar
17+
arrow-format-0.8.0.jar
18+
arrow-memory-0.8.0.jar
19+
arrow-vector-0.8.0.jar
2020
avro-1.7.7.jar
2121
avro-ipc-1.7.7.jar
2222
avro-mapred-1.7.7-hadoop2.jar
@@ -82,7 +82,7 @@ hadoop-yarn-server-web-proxy-2.6.5.jar
8282
hk2-api-2.4.0-b34.jar
8383
hk2-locator-2.4.0-b34.jar
8484
hk2-utils-2.4.0-b34.jar
85-
hppc-0.7.1.jar
85+
hppc-0.7.2.jar
8686
htrace-core-3.0.4.jar
8787
httpclient-4.5.2.jar
8888
httpcore-4.4.4.jar
@@ -144,7 +144,7 @@ metrics-json-3.1.5.jar
144144
metrics-jvm-3.1.5.jar
145145
minlog-1.3.0.jar
146146
netty-3.9.9.Final.jar
147-
netty-all-4.0.47.Final.jar
147+
netty-all-4.1.17.Final.jar
148148
objenesis-2.1.jar
149149
opencsv-2.3.jar
150150
orc-core-1.4.1-nohive.jar

dev/deps/spark-deps-hadoop-2.7

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
1414
api-asn1-api-1.0.0-M20.jar
1515
api-util-1.0.0-M20.jar
1616
arpack_combined_all-0.1.jar
17-
arrow-format-0.4.0.jar
18-
arrow-memory-0.4.0.jar
19-
arrow-vector-0.4.0.jar
17+
arrow-format-0.8.0.jar
18+
arrow-memory-0.8.0.jar
19+
arrow-vector-0.8.0.jar
2020
avro-1.7.7.jar
2121
avro-ipc-1.7.7.jar
2222
avro-mapred-1.7.7-hadoop2.jar
@@ -82,7 +82,7 @@ hadoop-yarn-server-web-proxy-2.7.3.jar
8282
hk2-api-2.4.0-b34.jar
8383
hk2-locator-2.4.0-b34.jar
8484
hk2-utils-2.4.0-b34.jar
85-
hppc-0.7.1.jar
85+
hppc-0.7.2.jar
8686
htrace-core-3.1.0-incubating.jar
8787
httpclient-4.5.2.jar
8888
httpcore-4.4.4.jar
@@ -145,7 +145,7 @@ metrics-json-3.1.5.jar
145145
metrics-jvm-3.1.5.jar
146146
minlog-1.3.0.jar
147147
netty-3.9.9.Final.jar
148-
netty-all-4.0.47.Final.jar
148+
netty-all-4.1.17.Final.jar
149149
objenesis-2.1.jar
150150
opencsv-2.3.jar
151151
orc-core-1.4.1-nohive.jar

0 commit comments

Comments
 (0)