Skip to content

Commit 2529984

Browse files
authored
Merge pull request apache-spark-on-k8s#355 from palantir/rk/from-upstream
Update from upstream
2 parents 2fe4371 + 7bcb662 commit 2529984

File tree

118 files changed

+2068
-535
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+2068
-535
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ jobs:
167167
run-style-tests:
168168
# depends only on build-maven
169169
<<: *test-defaults
170-
resource_class: small
170+
resource_class: medium
171171
steps:
172172
- *checkout-code
173173
# Need maven dependency cache, otherwise checkstyle tests fail as such:

common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions;
1919

20-
import org.apache.spark.unsafe.Platform;
20+
import org.apache.spark.unsafe.memory.MemoryBlock;
2121

2222
/**
2323
* Simulates Hive's hashing function from Hive v1.2.1
@@ -38,12 +38,17 @@ public static int hashLong(long input) {
3838
return (int) ((input >>> 32) ^ input);
3939
}
4040

41-
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
41+
public static int hashUnsafeBytesBlock(MemoryBlock mb) {
42+
long lengthInBytes = mb.size();
4243
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
4344
int result = 0;
44-
for (int i = 0; i < lengthInBytes; i++) {
45-
result = (result * 31) + (int) Platform.getByte(base, offset + i);
45+
for (long i = 0; i < lengthInBytes; i++) {
46+
result = (result * 31) + (int) mb.getByte(i);
4647
}
4748
return result;
4849
}
50+
51+
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
52+
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
53+
}
4954
}

common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public static void setMemory(long address, byte value, long size) {
187187
}
188188

189189
public static void copyMemory(
190-
Object src, long srcOffset, Object dst, long dstOffset, long length) {
190+
Object src, long srcOffset, Object dst, long dstOffset, long length) {
191191
// Check if dstOffset is before or after srcOffset to determine if we should copy
192192
// forward or backwards. This is necessary in case src and dst overlap.
193193
if (dstOffset < srcOffset) {

common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.unsafe.array;
1919

2020
import org.apache.spark.unsafe.Platform;
21+
import org.apache.spark.unsafe.memory.MemoryBlock;
2122

2223
public class ByteArrayMethods {
2324

@@ -48,15 +49,25 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
4849
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
4950

5051
private static final boolean unaligned = Platform.unaligned();
52+
/**
53+
* MemoryBlock equality check for MemoryBlocks.
54+
* @return true if the arrays are equal, false otherwise
55+
*/
56+
public static boolean arrayEqualsBlock(
57+
MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) {
58+
return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
59+
rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
60+
}
61+
5162
/**
5263
* Optimized byte array equality check for byte arrays.
5364
* @return true if the arrays are equal, false otherwise
5465
*/
5566
public static boolean arrayEquals(
56-
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
67+
Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) {
5768
int i = 0;
5869

59-
// check if stars align and we can get both offsets to be aligned
70+
// check if starts align and we can get both offsets to be aligned
6071
if ((leftOffset % 8) == (rightOffset % 8)) {
6172
while ((leftOffset + i) % 8 != 0 && i < length) {
6273
if (Platform.getByte(leftBase, leftOffset + i) !=

common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.unsafe.array;
1919

20-
import org.apache.spark.unsafe.Platform;
2120
import org.apache.spark.unsafe.memory.MemoryBlock;
2221

2322
/**
@@ -33,16 +32,12 @@ public final class LongArray {
3332
private static final long WIDTH = 8;
3433

3534
private final MemoryBlock memory;
36-
private final Object baseObj;
37-
private final long baseOffset;
3835

3936
private final long length;
4037

4138
public LongArray(MemoryBlock memory) {
4239
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
4340
this.memory = memory;
44-
this.baseObj = memory.getBaseObject();
45-
this.baseOffset = memory.getBaseOffset();
4641
this.length = memory.size() / WIDTH;
4742
}
4843

@@ -51,11 +46,11 @@ public MemoryBlock memoryBlock() {
5146
}
5247

5348
public Object getBaseObject() {
54-
return baseObj;
49+
return memory.getBaseObject();
5550
}
5651

5752
public long getBaseOffset() {
58-
return baseOffset;
53+
return memory.getBaseOffset();
5954
}
6055

6156
/**
@@ -69,8 +64,8 @@ public long size() {
6964
* Fill this all with 0L.
7065
*/
7166
public void zeroOut() {
72-
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
73-
Platform.putLong(baseObj, off, 0);
67+
for (long off = 0; off < length * WIDTH; off += WIDTH) {
68+
memory.putLong(off, 0);
7469
}
7570
}
7671

@@ -80,7 +75,7 @@ public void zeroOut() {
8075
public void set(int index, long value) {
8176
assert index >= 0 : "index (" + index + ") should >= 0";
8277
assert index < length : "index (" + index + ") should < length (" + length + ")";
83-
Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
78+
memory.putLong(index * WIDTH, value);
8479
}
8580

8681
/**
@@ -89,6 +84,6 @@ public void set(int index, long value) {
8984
public long get(int index) {
9085
assert index >= 0 : "index (" + index + ") should >= 0";
9186
assert index < length : "index (" + index + ") should < length (" + length + ")";
92-
return Platform.getLong(baseObj, baseOffset + index * WIDTH);
87+
return memory.getLong(index * WIDTH);
9388
}
9489
}

common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.unsafe.hash;
1919

20-
import org.apache.spark.unsafe.Platform;
20+
import com.google.common.primitives.Ints;
21+
22+
import org.apache.spark.unsafe.memory.MemoryBlock;
2123

2224
/**
2325
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
@@ -49,49 +51,66 @@ public static int hashInt(int input, int seed) {
4951
}
5052

5153
public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
52-
return hashUnsafeWords(base, offset, lengthInBytes, seed);
54+
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
5355
}
5456

55-
public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
57+
public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
5658
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
59+
int lengthInBytes = Ints.checkedCast(base.size());
5760
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
58-
int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
61+
int h1 = hashBytesByIntBlock(base, seed);
5962
return fmix(h1, lengthInBytes);
6063
}
6164

62-
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
65+
public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
66+
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
67+
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
68+
}
69+
70+
public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
6371
// This is not compatible with original and another implementations.
6472
// But remain it for backward compatibility for the components existing before 2.3.
73+
int lengthInBytes = Ints.checkedCast(base.size());
6574
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
6675
int lengthAligned = lengthInBytes - lengthInBytes % 4;
67-
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
76+
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
6877
for (int i = lengthAligned; i < lengthInBytes; i++) {
69-
int halfWord = Platform.getByte(base, offset + i);
78+
int halfWord = base.getByte(i);
7079
int k1 = mixK1(halfWord);
7180
h1 = mixH1(h1, k1);
7281
}
7382
return fmix(h1, lengthInBytes);
7483
}
7584

85+
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
86+
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
87+
}
88+
7689
public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
90+
return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
91+
}
92+
93+
public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
7794
// This is compatible with original and another implementations.
7895
// Use this method for new components after Spark 2.3.
79-
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
96+
int lengthInBytes = Ints.checkedCast(base.size());
97+
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
8098
int lengthAligned = lengthInBytes - lengthInBytes % 4;
81-
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
99+
int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
82100
int k1 = 0;
83101
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
84-
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
102+
k1 ^= (base.getByte(i) & 0xFF) << shift;
85103
}
86104
h1 ^= mixK1(k1);
87105
return fmix(h1, lengthInBytes);
88106
}
89107

90-
private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
108+
private static int hashBytesByIntBlock(MemoryBlock base, int seed) {
109+
long lengthInBytes = base.size();
91110
assert (lengthInBytes % 4 == 0);
92111
int h1 = seed;
93-
for (int i = 0; i < lengthInBytes; i += 4) {
94-
int halfWord = Platform.getInt(base, offset + i);
112+
for (long i = 0; i < lengthInBytes; i += 4) {
113+
int halfWord = base.getInt(i);
95114
int k1 = mixK1(halfWord);
96115
h1 = mixH1(h1, k1);
97116
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.memory;
19+
20+
import com.google.common.primitives.Ints;
21+
22+
import org.apache.spark.unsafe.Platform;
23+
24+
/**
25+
* A consecutive block of memory with a byte array on Java heap.
26+
*/
27+
public final class ByteArrayMemoryBlock extends MemoryBlock {
28+
29+
private final byte[] array;
30+
31+
public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
32+
super(obj, offset, size);
33+
this.array = obj;
34+
assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
35+
"The sum of size " + size + " and offset " + offset + " should not be larger than " +
36+
"the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
37+
}
38+
39+
public ByteArrayMemoryBlock(long length) {
40+
this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
41+
}
42+
43+
@Override
44+
public MemoryBlock subBlock(long offset, long size) {
45+
checkSubBlockRange(offset, size);
46+
if (offset == 0 && size == this.size()) return this;
47+
return new ByteArrayMemoryBlock(array, this.offset + offset, size);
48+
}
49+
50+
public byte[] getByteArray() { return array; }
51+
52+
/**
53+
* Creates a memory block pointing to the memory used by the byte array.
54+
*/
55+
public static ByteArrayMemoryBlock fromArray(final byte[] array) {
56+
return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
57+
}
58+
59+
@Override
60+
public int getInt(long offset) {
61+
return Platform.getInt(array, this.offset + offset);
62+
}
63+
64+
@Override
65+
public void putInt(long offset, int value) {
66+
Platform.putInt(array, this.offset + offset, value);
67+
}
68+
69+
@Override
70+
public boolean getBoolean(long offset) {
71+
return Platform.getBoolean(array, this.offset + offset);
72+
}
73+
74+
@Override
75+
public void putBoolean(long offset, boolean value) {
76+
Platform.putBoolean(array, this.offset + offset, value);
77+
}
78+
79+
@Override
80+
public byte getByte(long offset) {
81+
return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
82+
}
83+
84+
@Override
85+
public void putByte(long offset, byte value) {
86+
array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
87+
}
88+
89+
@Override
90+
public short getShort(long offset) {
91+
return Platform.getShort(array, this.offset + offset);
92+
}
93+
94+
@Override
95+
public void putShort(long offset, short value) {
96+
Platform.putShort(array, this.offset + offset, value);
97+
}
98+
99+
@Override
100+
public long getLong(long offset) {
101+
return Platform.getLong(array, this.offset + offset);
102+
}
103+
104+
@Override
105+
public void putLong(long offset, long value) {
106+
Platform.putLong(array, this.offset + offset, value);
107+
}
108+
109+
@Override
110+
public float getFloat(long offset) {
111+
return Platform.getFloat(array, this.offset + offset);
112+
}
113+
114+
@Override
115+
public void putFloat(long offset, float value) {
116+
Platform.putFloat(array, this.offset + offset, value);
117+
}
118+
119+
@Override
120+
public double getDouble(long offset) {
121+
return Platform.getDouble(array, this.offset + offset);
122+
}
123+
124+
@Override
125+
public void putDouble(long offset, double value) {
126+
Platform.putDouble(array, this.offset + offset, value);
127+
}
128+
}

0 commit comments

Comments
 (0)