Skip to content

Commit c6712a9

Browse files
committed
[V2 Streaming] Add generic Kernel Row <-> Spark Row zero-copy wrappers
1 parent 925c206 commit c6712a9

File tree

4 files changed

+814
-0
lines changed

4 files changed

+814
-0
lines changed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.utils;
17+
18+
import io.delta.kernel.data.ArrayValue;
19+
import io.delta.kernel.data.ColumnVector;
20+
import io.delta.kernel.data.MapValue;
21+
import io.delta.kernel.internal.data.StructRow;
22+
import io.delta.kernel.types.*;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.apache.spark.sql.GenericRowWithSchema;
28+
import org.apache.spark.sql.Row;
29+
30+
/**
31+
* Zero-copy wrapper that presents a Kernel {@link io.delta.kernel.data.Row} as a Spark {@link Row}.
32+
* Primitive field access delegates directly to the Kernel Row with no data copy. Complex types
33+
* (Map, Array, Struct) are lazily converted on access.
34+
*/
35+
public class KernelRowToSparkRow implements Row {
36+
37+
private final io.delta.kernel.data.Row kernelRow;
38+
private final StructType kernelSchema;
39+
private final org.apache.spark.sql.types.StructType sparkSchema;
40+
41+
public KernelRowToSparkRow(io.delta.kernel.data.Row kernelRow) {
42+
this(kernelRow, SchemaUtils.convertKernelSchemaToSparkSchema(kernelRow.getSchema()));
43+
}
44+
45+
/**
46+
* Constructor that accepts a pre-computed Spark schema to avoid redundant schema conversion when
47+
* wrapping many rows that share the same schema.
48+
*/
49+
public KernelRowToSparkRow(
50+
io.delta.kernel.data.Row kernelRow, org.apache.spark.sql.types.StructType sparkSchema) {
51+
this.kernelRow = kernelRow;
52+
this.kernelSchema = kernelRow.getSchema();
53+
this.sparkSchema = sparkSchema;
54+
}
55+
56+
@Override
57+
public int length() {
58+
return kernelSchema.length();
59+
}
60+
61+
@Override
62+
public org.apache.spark.sql.types.StructType schema() {
63+
return sparkSchema;
64+
}
65+
66+
@Override
67+
public Object get(int i) {
68+
if (kernelRow.isNullAt(i)) {
69+
return null;
70+
}
71+
return extractSparkValue(kernelRow, i, kernelSchema.at(i).getDataType());
72+
}
73+
74+
@Override
75+
public Row copy() {
76+
Object[] values = new Object[length()];
77+
for (int i = 0; i < values.length; i++) {
78+
values[i] = get(i);
79+
}
80+
return new GenericRowWithSchema(values, sparkSchema);
81+
}
82+
83+
// ---- type dispatch: Kernel Row field -> Spark-compatible Object ----
84+
85+
private static Object extractSparkValue(io.delta.kernel.data.Row row, int ordinal, DataType dt) {
86+
if (dt instanceof BooleanType) {
87+
return row.getBoolean(ordinal);
88+
} else if (dt instanceof ByteType) {
89+
return row.getByte(ordinal);
90+
} else if (dt instanceof ShortType) {
91+
return row.getShort(ordinal);
92+
} else if (dt instanceof IntegerType || dt instanceof DateType) {
93+
return row.getInt(ordinal);
94+
} else if (dt instanceof LongType
95+
|| dt instanceof TimestampType
96+
|| dt instanceof TimestampNTZType) {
97+
return row.getLong(ordinal);
98+
} else if (dt instanceof FloatType) {
99+
return row.getFloat(ordinal);
100+
} else if (dt instanceof DoubleType) {
101+
return row.getDouble(ordinal);
102+
} else if (dt instanceof StringType) {
103+
return row.getString(ordinal);
104+
} else if (dt instanceof DecimalType) {
105+
return row.getDecimal(ordinal);
106+
} else if (dt instanceof BinaryType) {
107+
return row.getBinary(ordinal);
108+
} else if (dt instanceof StructType) {
109+
return new KernelRowToSparkRow(
110+
row.getStruct(ordinal), SchemaUtils.convertKernelSchemaToSparkSchema((StructType) dt));
111+
} else if (dt instanceof MapType) {
112+
return mapValueToScalaMap(row.getMap(ordinal), (MapType) dt);
113+
} else if (dt instanceof ArrayType) {
114+
return arrayValueToScalaSeq(row.getArray(ordinal), (ArrayType) dt);
115+
}
116+
throw new UnsupportedOperationException("Unsupported Kernel DataType: " + dt);
117+
}
118+
119+
// ---- Kernel ColumnVector element -> Spark-compatible Object ----
120+
121+
static Object vectorValueToSpark(ColumnVector cv, int rowId, DataType dt) {
122+
if (cv.isNullAt(rowId)) {
123+
return null;
124+
}
125+
if (dt instanceof BooleanType) {
126+
return cv.getBoolean(rowId);
127+
} else if (dt instanceof ByteType) {
128+
return cv.getByte(rowId);
129+
} else if (dt instanceof ShortType) {
130+
return cv.getShort(rowId);
131+
} else if (dt instanceof IntegerType || dt instanceof DateType) {
132+
return cv.getInt(rowId);
133+
} else if (dt instanceof LongType
134+
|| dt instanceof TimestampType
135+
|| dt instanceof TimestampNTZType) {
136+
return cv.getLong(rowId);
137+
} else if (dt instanceof FloatType) {
138+
return cv.getFloat(rowId);
139+
} else if (dt instanceof DoubleType) {
140+
return cv.getDouble(rowId);
141+
} else if (dt instanceof StringType) {
142+
return cv.getString(rowId);
143+
} else if (dt instanceof DecimalType) {
144+
return cv.getDecimal(rowId);
145+
} else if (dt instanceof BinaryType) {
146+
return cv.getBinary(rowId);
147+
} else if (dt instanceof StructType) {
148+
return new KernelRowToSparkRow(
149+
StructRow.fromStructVector(cv, rowId),
150+
SchemaUtils.convertKernelSchemaToSparkSchema((StructType) dt));
151+
} else if (dt instanceof MapType) {
152+
return mapValueToScalaMap(cv.getMap(rowId), (MapType) dt);
153+
} else if (dt instanceof ArrayType) {
154+
return arrayValueToScalaSeq(cv.getArray(rowId), (ArrayType) dt);
155+
}
156+
throw new UnsupportedOperationException("Unsupported Kernel DataType: " + dt);
157+
}
158+
159+
// ---- MapValue -> scala.collection.Map ----
160+
161+
static scala.collection.Map<Object, Object> mapValueToScalaMap(MapValue mv, MapType mt) {
162+
ColumnVector keys = mv.getKeys();
163+
ColumnVector values = mv.getValues();
164+
Map<Object, Object> javaMap = new HashMap<>();
165+
for (int i = 0; i < mv.getSize(); i++) {
166+
Object key = vectorValueToSpark(keys, i, mt.getKeyType());
167+
Object value = vectorValueToSpark(values, i, mt.getValueType());
168+
javaMap.put(key, value);
169+
}
170+
return scala.jdk.javaapi.CollectionConverters.asScala(javaMap);
171+
}
172+
173+
// ---- ArrayValue -> scala.collection.Seq ----
174+
175+
static scala.collection.Seq<Object> arrayValueToScalaSeq(ArrayValue av, ArrayType at) {
176+
ColumnVector elements = av.getElements();
177+
List<Object> javaList = new ArrayList<>();
178+
for (int i = 0; i < av.getSize(); i++) {
179+
javaList.add(vectorValueToSpark(elements, i, at.getElementType()));
180+
}
181+
return scala.jdk.javaapi.CollectionConverters.asScala(javaList).toList();
182+
}
183+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.utils;
17+
18+
import io.delta.kernel.data.ArrayValue;
19+
import io.delta.kernel.data.ColumnVector;
20+
import io.delta.kernel.data.MapValue;
21+
import io.delta.kernel.data.Row;
22+
import io.delta.kernel.internal.util.VectorUtils;
23+
import io.delta.kernel.types.*;
24+
import java.math.BigDecimal;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
/**
30+
* Zero-copy wrapper that presents a Spark {@link org.apache.spark.sql.Row} as a Kernel {@link Row}.
31+
* Primitive field access delegates directly to the Spark Row with no data copy. Complex types (Map,
32+
* Array, Struct) are lazily converted on access.
33+
*/
34+
public class SparkRowToKernelRow implements Row {
35+
36+
private final org.apache.spark.sql.Row sparkRow;
37+
private final StructType kernelSchema;
38+
39+
public SparkRowToKernelRow(org.apache.spark.sql.Row sparkRow, StructType kernelSchema) {
40+
this.sparkRow = sparkRow;
41+
this.kernelSchema = kernelSchema;
42+
}
43+
44+
@Override
45+
public StructType getSchema() {
46+
return kernelSchema;
47+
}
48+
49+
@Override
50+
public boolean isNullAt(int ordinal) {
51+
return sparkRow.isNullAt(ordinal);
52+
}
53+
54+
@Override
55+
public boolean getBoolean(int ordinal) {
56+
return sparkRow.getBoolean(ordinal);
57+
}
58+
59+
@Override
60+
public byte getByte(int ordinal) {
61+
return sparkRow.getByte(ordinal);
62+
}
63+
64+
@Override
65+
public short getShort(int ordinal) {
66+
return sparkRow.getShort(ordinal);
67+
}
68+
69+
@Override
70+
public int getInt(int ordinal) {
71+
return sparkRow.getInt(ordinal);
72+
}
73+
74+
@Override
75+
public long getLong(int ordinal) {
76+
return sparkRow.getLong(ordinal);
77+
}
78+
79+
@Override
80+
public float getFloat(int ordinal) {
81+
return sparkRow.getFloat(ordinal);
82+
}
83+
84+
@Override
85+
public double getDouble(int ordinal) {
86+
return sparkRow.getDouble(ordinal);
87+
}
88+
89+
@Override
90+
public String getString(int ordinal) {
91+
return sparkRow.getString(ordinal);
92+
}
93+
94+
@Override
95+
public BigDecimal getDecimal(int ordinal) {
96+
return sparkRow.getDecimal(ordinal);
97+
}
98+
99+
@Override
100+
public byte[] getBinary(int ordinal) {
101+
return (byte[]) sparkRow.get(ordinal);
102+
}
103+
104+
@Override
105+
public Row getStruct(int ordinal) {
106+
org.apache.spark.sql.Row nested = sparkRow.getStruct(ordinal);
107+
StructType nestedSchema = (StructType) kernelSchema.at(ordinal).getDataType();
108+
return new SparkRowToKernelRow(nested, nestedSchema);
109+
}
110+
111+
@Override
112+
public MapValue getMap(int ordinal) {
113+
Map<?, ?> javaMap = sparkRow.getJavaMap(ordinal);
114+
MapType mt = (MapType) kernelSchema.at(ordinal).getDataType();
115+
return javaMapToKernelMapValue(javaMap, mt);
116+
}
117+
118+
@Override
119+
public ArrayValue getArray(int ordinal) {
120+
List<?> javaList = sparkRow.getList(ordinal);
121+
ArrayType at = (ArrayType) kernelSchema.at(ordinal).getDataType();
122+
return javaListToKernelArrayValue(javaList, at);
123+
}
124+
125+
// ---- java.util.Map -> Kernel MapValue ----
126+
127+
static MapValue javaMapToKernelMapValue(Map<?, ?> javaMap, MapType mt) {
128+
List<Object> keys = new ArrayList<>(javaMap.size());
129+
List<Object> values = new ArrayList<>(javaMap.size());
130+
for (Map.Entry<?, ?> entry : javaMap.entrySet()) {
131+
keys.add(sparkValueToKernel(entry.getKey(), mt.getKeyType()));
132+
values.add(sparkValueToKernel(entry.getValue(), mt.getValueType()));
133+
}
134+
ColumnVector keyVector = VectorUtils.buildColumnVector(keys, mt.getKeyType());
135+
ColumnVector valueVector = VectorUtils.buildColumnVector(values, mt.getValueType());
136+
return new MapValue() {
137+
@Override
138+
public int getSize() {
139+
return keys.size();
140+
}
141+
142+
@Override
143+
public ColumnVector getKeys() {
144+
return keyVector;
145+
}
146+
147+
@Override
148+
public ColumnVector getValues() {
149+
return valueVector;
150+
}
151+
};
152+
}
153+
154+
// ---- java.util.List -> Kernel ArrayValue ----
155+
156+
static ArrayValue javaListToKernelArrayValue(List<?> javaList, ArrayType at) {
157+
List<Object> kernelValues = new ArrayList<>(javaList.size());
158+
for (Object element : javaList) {
159+
kernelValues.add(sparkValueToKernel(element, at.getElementType()));
160+
}
161+
return VectorUtils.buildArrayValue(kernelValues, at.getElementType());
162+
}
163+
164+
// ---- Spark value -> Kernel-compatible value ----
165+
166+
@SuppressWarnings("unchecked")
167+
static Object sparkValueToKernel(Object sparkValue, DataType dt) {
168+
if (sparkValue == null) {
169+
return null;
170+
}
171+
if (dt instanceof BooleanType
172+
|| dt instanceof ByteType
173+
|| dt instanceof ShortType
174+
|| dt instanceof IntegerType
175+
|| dt instanceof DateType
176+
|| dt instanceof LongType
177+
|| dt instanceof TimestampType
178+
|| dt instanceof TimestampNTZType
179+
|| dt instanceof FloatType
180+
|| dt instanceof DoubleType
181+
|| dt instanceof StringType
182+
|| dt instanceof BinaryType
183+
|| dt instanceof DecimalType) {
184+
return sparkValue;
185+
}
186+
if (dt instanceof StructType) {
187+
return new SparkRowToKernelRow((org.apache.spark.sql.Row) sparkValue, (StructType) dt);
188+
}
189+
if (dt instanceof MapType) {
190+
Map<?, ?> javaMap;
191+
if (sparkValue instanceof scala.collection.Map) {
192+
javaMap =
193+
scala.jdk.javaapi.CollectionConverters.asJava((scala.collection.Map<?, ?>) sparkValue);
194+
} else {
195+
javaMap = (Map<?, ?>) sparkValue;
196+
}
197+
return javaMapToKernelMapValue(javaMap, (MapType) dt);
198+
}
199+
if (dt instanceof ArrayType) {
200+
List<?> javaList;
201+
if (sparkValue instanceof scala.collection.Seq) {
202+
javaList =
203+
scala.jdk.javaapi.CollectionConverters.asJava((scala.collection.Seq<?>) sparkValue);
204+
} else {
205+
javaList = (List<?>) sparkValue;
206+
}
207+
return javaListToKernelArrayValue(javaList, (ArrayType) dt);
208+
}
209+
throw new UnsupportedOperationException("Unsupported Kernel DataType: " + dt);
210+
}
211+
}

0 commit comments

Comments
 (0)