Skip to content

Commit 174d29a

Browse files
committed
Initial revision
1 parent 65ff85a commit 174d29a

File tree

1 file changed

+58
-12
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions

1 file changed

+58
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -381,28 +381,74 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with
381381
if (inputArray == null) {
382382
Nil
383383
} else {
384-
val rows = new Array[InternalRow](inputArray.numElements())
385-
inputArray.foreach(et, (i, e) => {
386-
rows(i) = if (position) InternalRow(i, e) else InternalRow(e)
387-
})
388-
rows
384+
new ArrayExplodeIterator(inputArray, et, position)
389385
}
390386
case MapType(kt, vt, _) =>
391387
val inputMap = child.eval(input).asInstanceOf[MapData]
392388
if (inputMap == null) {
393389
Nil
394390
} else {
395-
val rows = new Array[InternalRow](inputMap.numElements())
396-
var i = 0
397-
inputMap.foreach(kt, vt, (k, v) => {
398-
rows(i) = if (position) InternalRow(i, k, v) else InternalRow(k, v)
399-
i += 1
400-
})
401-
rows
391+
new MapExplodeIterator(inputMap, kt, vt, position)
402392
}
403393
}
404394
}
405395

396+
private class ArrayExplodeIterator(
397+
array: ArrayData,
398+
elementType: DataType,
399+
includePosition: Boolean)
400+
extends IterableOnce[InternalRow] {
401+
402+
override def iterator: Iterator[InternalRow] = new Iterator[InternalRow] {
403+
private var currentIndex = 0
404+
private val numElements = array.numElements()
405+
406+
override def hasNext: Boolean = currentIndex < numElements
407+
408+
override def next(): InternalRow = {
409+
if (!hasNext) throw new NoSuchElementException("No more elements")
410+
val element = array.get(currentIndex, elementType)
411+
val row = if (includePosition) {
412+
InternalRow(currentIndex, element)
413+
} else {
414+
InternalRow(element)
415+
}
416+
currentIndex += 1
417+
row
418+
}
419+
}
420+
}
421+
422+
private class MapExplodeIterator(
423+
map: MapData,
424+
keyType: DataType,
425+
valueType: DataType,
426+
includePosition: Boolean)
427+
extends IterableOnce[InternalRow] {
428+
429+
override def iterator: Iterator[InternalRow] = new Iterator[InternalRow] {
430+
private var currentIndex = 0
431+
private val numElements = map.numElements()
432+
private val keyArray = map.keyArray()
433+
private val valueArray = map.valueArray()
434+
435+
override def hasNext: Boolean = currentIndex < numElements
436+
437+
override def next(): InternalRow = {
438+
if (!hasNext) throw new NoSuchElementException("No more elements")
439+
val key = keyArray.get(currentIndex, keyType)
440+
val value = valueArray.get(currentIndex, valueType)
441+
val row = if (includePosition) {
442+
InternalRow(currentIndex, key, value)
443+
} else {
444+
InternalRow(key, value)
445+
}
446+
currentIndex += 1
447+
row
448+
}
449+
}
450+
}
451+
406452
override def collectionType: DataType = child.dataType
407453

408454
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {

0 commit comments

Comments
 (0)