Skip to content

Commit c379793

Browse files
Aitozidanzhewuju
authored andcommitted
[spark] Eliminate the De/serialization process when writing to the append bucket table (apache#5159)
1 parent 3e527f9 commit c379793

File tree

11 files changed

+792
-8
lines changed

11 files changed

+792
-8
lines changed
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark;
20+
21+
import org.apache.paimon.data.BinaryString;
22+
import org.apache.paimon.data.Decimal;
23+
import org.apache.paimon.data.InternalArray;
24+
import org.apache.paimon.data.InternalMap;
25+
import org.apache.paimon.data.InternalRow;
26+
import org.apache.paimon.data.Timestamp;
27+
import org.apache.paimon.data.variant.Variant;
28+
import org.apache.paimon.spark.util.SparkRowUtils$;
29+
import org.apache.paimon.spark.util.shim.TypeUtils$;
30+
import org.apache.paimon.types.RowKind;
31+
32+
import org.apache.spark.sql.catalyst.util.ArrayData;
33+
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
34+
import org.apache.spark.sql.catalyst.util.MapData;
35+
import org.apache.spark.sql.paimon.shims.SparkShimLoader;
36+
import org.apache.spark.sql.types.ArrayType;
37+
import org.apache.spark.sql.types.DataType;
38+
import org.apache.spark.sql.types.MapType;
39+
import org.apache.spark.sql.types.StructType;
40+
import org.apache.spark.sql.types.TimestampNTZType;
41+
import org.apache.spark.sql.types.TimestampType;
42+
43+
import java.math.BigDecimal;
44+
45+
/** Wrapper to fetch value from the spark internal row. */
46+
public class SparkInternalRowWrapper implements InternalRow {
47+
48+
private org.apache.spark.sql.catalyst.InternalRow internalRow;
49+
private final int length;
50+
private final int rowKindIdx;
51+
private final StructType structType;
52+
53+
public SparkInternalRowWrapper(
54+
org.apache.spark.sql.catalyst.InternalRow internalRow,
55+
int rowKindIdx,
56+
StructType structType,
57+
int length) {
58+
this.internalRow = internalRow;
59+
this.rowKindIdx = rowKindIdx;
60+
this.length = length;
61+
this.structType = structType;
62+
}
63+
64+
public SparkInternalRowWrapper(int rowKindIdx, StructType structType, int length) {
65+
this.rowKindIdx = rowKindIdx;
66+
this.length = length;
67+
this.structType = structType;
68+
}
69+
70+
public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) {
71+
this.internalRow = internalRow;
72+
return this;
73+
}
74+
75+
@Override
76+
public int getFieldCount() {
77+
return length;
78+
}
79+
80+
@Override
81+
public RowKind getRowKind() {
82+
return SparkRowUtils$.MODULE$.getRowKind(internalRow, rowKindIdx);
83+
}
84+
85+
@Override
86+
public void setRowKind(RowKind kind) {
87+
throw new UnsupportedOperationException();
88+
}
89+
90+
@Override
91+
public boolean isNullAt(int pos) {
92+
return internalRow.isNullAt(pos);
93+
}
94+
95+
@Override
96+
public boolean getBoolean(int pos) {
97+
return internalRow.getBoolean(pos);
98+
}
99+
100+
@Override
101+
public byte getByte(int pos) {
102+
return internalRow.getByte(pos);
103+
}
104+
105+
@Override
106+
public short getShort(int pos) {
107+
return internalRow.getShort(pos);
108+
}
109+
110+
@Override
111+
public int getInt(int pos) {
112+
return internalRow.getInt(pos);
113+
}
114+
115+
@Override
116+
public long getLong(int pos) {
117+
return internalRow.getLong(pos);
118+
}
119+
120+
@Override
121+
public float getFloat(int pos) {
122+
return internalRow.getFloat(pos);
123+
}
124+
125+
@Override
126+
public double getDouble(int pos) {
127+
return internalRow.getDouble(pos);
128+
}
129+
130+
@Override
131+
public BinaryString getString(int pos) {
132+
return BinaryString.fromBytes(internalRow.getUTF8String(pos).getBytes());
133+
}
134+
135+
@Override
136+
public Decimal getDecimal(int pos, int precision, int scale) {
137+
org.apache.spark.sql.types.Decimal decimal = internalRow.getDecimal(pos, precision, scale);
138+
BigDecimal bigDecimal = decimal.toJavaBigDecimal();
139+
return Decimal.fromBigDecimal(bigDecimal, precision, scale);
140+
}
141+
142+
@Override
143+
public Timestamp getTimestamp(int pos, int precision) {
144+
return convertToTimestamp(structType.fields()[pos].dataType(), internalRow.getLong(pos));
145+
}
146+
147+
@Override
148+
public byte[] getBinary(int pos) {
149+
return internalRow.getBinary(pos);
150+
}
151+
152+
@Override
153+
public Variant getVariant(int pos) {
154+
return SparkShimLoader.getSparkShim().toPaimonVariant(internalRow, pos);
155+
}
156+
157+
@Override
158+
public InternalArray getArray(int pos) {
159+
return new SparkInternalArray(
160+
internalRow.getArray(pos),
161+
((ArrayType) (structType.fields()[pos].dataType())).elementType());
162+
}
163+
164+
@Override
165+
public InternalMap getMap(int pos) {
166+
MapType mapType = (MapType) structType.fields()[pos].dataType();
167+
return new SparkInternalMap(
168+
internalRow.getMap(pos), mapType.keyType(), mapType.valueType());
169+
}
170+
171+
@Override
172+
public InternalRow getRow(int pos, int numFields) {
173+
return new SparkInternalRowWrapper(
174+
internalRow.getStruct(pos, numFields),
175+
-1,
176+
(StructType) structType.fields()[pos].dataType(),
177+
numFields);
178+
}
179+
180+
private static Timestamp convertToTimestamp(DataType dataType, long micros) {
181+
if (dataType instanceof TimestampType) {
182+
if (TypeUtils$.MODULE$.treatPaimonTimestampTypeAsSparkTimestampType()) {
183+
return Timestamp.fromSQLTimestamp(DateTimeUtils.toJavaTimestamp(micros));
184+
} else {
185+
return Timestamp.fromMicros(micros);
186+
}
187+
} else if (dataType instanceof TimestampNTZType) {
188+
return Timestamp.fromMicros(micros);
189+
} else {
190+
throw new UnsupportedOperationException("Unsupported data type:" + dataType);
191+
}
192+
}
193+
194+
/** adapt to spark internal array. */
195+
public static class SparkInternalArray implements InternalArray {
196+
197+
private final ArrayData arrayData;
198+
private final DataType elementType;
199+
200+
public SparkInternalArray(ArrayData arrayData, DataType elementType) {
201+
this.arrayData = arrayData;
202+
this.elementType = elementType;
203+
}
204+
205+
@Override
206+
public int size() {
207+
return arrayData.numElements();
208+
}
209+
210+
@Override
211+
public boolean[] toBooleanArray() {
212+
return arrayData.toBooleanArray();
213+
}
214+
215+
@Override
216+
public byte[] toByteArray() {
217+
return arrayData.toByteArray();
218+
}
219+
220+
@Override
221+
public short[] toShortArray() {
222+
return arrayData.toShortArray();
223+
}
224+
225+
@Override
226+
public int[] toIntArray() {
227+
return arrayData.toIntArray();
228+
}
229+
230+
@Override
231+
public long[] toLongArray() {
232+
return arrayData.toLongArray();
233+
}
234+
235+
@Override
236+
public float[] toFloatArray() {
237+
return arrayData.toFloatArray();
238+
}
239+
240+
@Override
241+
public double[] toDoubleArray() {
242+
return arrayData.toDoubleArray();
243+
}
244+
245+
@Override
246+
public boolean isNullAt(int pos) {
247+
return arrayData.isNullAt(pos);
248+
}
249+
250+
@Override
251+
public boolean getBoolean(int pos) {
252+
return arrayData.getBoolean(pos);
253+
}
254+
255+
@Override
256+
public byte getByte(int pos) {
257+
return arrayData.getByte(pos);
258+
}
259+
260+
@Override
261+
public short getShort(int pos) {
262+
return arrayData.getShort(pos);
263+
}
264+
265+
@Override
266+
public int getInt(int pos) {
267+
return arrayData.getInt(pos);
268+
}
269+
270+
@Override
271+
public long getLong(int pos) {
272+
return arrayData.getLong(pos);
273+
}
274+
275+
@Override
276+
public float getFloat(int pos) {
277+
return arrayData.getFloat(pos);
278+
}
279+
280+
@Override
281+
public double getDouble(int pos) {
282+
return arrayData.getDouble(pos);
283+
}
284+
285+
@Override
286+
public BinaryString getString(int pos) {
287+
return BinaryString.fromBytes(arrayData.getUTF8String(pos).getBytes());
288+
}
289+
290+
@Override
291+
public Decimal getDecimal(int pos, int precision, int scale) {
292+
org.apache.spark.sql.types.Decimal decimal =
293+
arrayData.getDecimal(pos, precision, scale);
294+
return Decimal.fromBigDecimal(decimal.toJavaBigDecimal(), precision, scale);
295+
}
296+
297+
@Override
298+
public Timestamp getTimestamp(int pos, int precision) {
299+
return convertToTimestamp(elementType, arrayData.getLong(pos));
300+
}
301+
302+
@Override
303+
public byte[] getBinary(int pos) {
304+
return arrayData.getBinary(pos);
305+
}
306+
307+
@Override
308+
public Variant getVariant(int pos) {
309+
return SparkShimLoader.getSparkShim().toPaimonVariant(arrayData, pos);
310+
}
311+
312+
@Override
313+
public InternalArray getArray(int pos) {
314+
return new SparkInternalArray(
315+
arrayData.getArray(pos), ((ArrayType) elementType).elementType());
316+
}
317+
318+
@Override
319+
public InternalMap getMap(int pos) {
320+
MapType mapType = (MapType) elementType;
321+
return new SparkInternalMap(
322+
arrayData.getMap(pos), mapType.keyType(), mapType.valueType());
323+
}
324+
325+
@Override
326+
public InternalRow getRow(int pos, int numFields) {
327+
return new SparkInternalRowWrapper(
328+
arrayData.getStruct(pos, numFields), -1, (StructType) elementType, numFields);
329+
}
330+
}
331+
332+
/** adapt to spark internal map. */
333+
public static class SparkInternalMap implements InternalMap {
334+
335+
private final MapData mapData;
336+
private final DataType keyType;
337+
private final DataType valueType;
338+
339+
public SparkInternalMap(MapData mapData, DataType keyType, DataType valueType) {
340+
this.mapData = mapData;
341+
this.keyType = keyType;
342+
this.valueType = valueType;
343+
}
344+
345+
@Override
346+
public int size() {
347+
return mapData.numElements();
348+
}
349+
350+
@Override
351+
public InternalArray keyArray() {
352+
return new SparkInternalArray(mapData.keyArray(), keyType);
353+
}
354+
355+
@Override
356+
public InternalArray valueArray() {
357+
return new SparkInternalArray(mapData.valueArray(), valueType);
358+
}
359+
}
360+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.spark.catalyst
2020

21-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EvalMode, Expression}
21+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression}
2222
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
2323
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2424
import org.apache.spark.sql.connector.read.Scan

0 commit comments

Comments
 (0)