Skip to content

Commit 0e2b6da

Browse files
committed
In LateralJoinIter, provide parent rows to the secondary child rowIter.
1 parent 0147ad3 commit 0e2b6da

File tree

1 file changed

+49
-47
lines changed

1 file changed

+49
-47
lines changed

sql/rowexec/join_iters.go

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -734,16 +734,19 @@ type lateralJoinIterator struct {
734734
secondaryNode sql.Node
735735
cond sql.Expression
736736
b sql.NodeExecBuilder
737-
parentRow sql.Row
738-
primaryRow sql.Row
739-
secondaryRow sql.Row
740-
rowSize int
741-
scopeLen int
742-
jType plan.JoinType
743-
foundMatch bool
737+
// primaryRow contains the parent row concatenated with the current row from the primary child,
738+
// and is used to build the secondary child iter.
739+
primaryRow sql.Row
740+
// secondaryRow contains the current row from the secondary child.
741+
secondaryRow sql.Row
742+
rowSize int
743+
scopeLen int
744+
parentLen int
745+
jType plan.JoinType
746+
foundMatch bool
744747
}
745748

746-
func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, row sql.Row) (sql.RowIter, error) {
749+
func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, parentRow sql.Row) (sql.RowIter, error) {
747750
var left, right string
748751
if leftTable, ok := j.Left().(sql.Nameable); ok {
749752
left = leftTable.Name()
@@ -761,73 +764,72 @@ func newLateralJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNod
761764
attribute.String("right", right),
762765
))
763766

764-
l, err := b.Build(ctx, j.Left(), row)
767+
l, err := b.Build(ctx, j.Left(), parentRow)
765768
if err != nil {
766769
span.End()
767770
return nil, err
768771
}
769772

773+
parentLen := len(parentRow)
774+
775+
primaryRow := make(sql.Row, parentLen+len(j.Left().Schema()))
776+
copy(primaryRow, parentRow)
777+
770778
return sql.NewSpanIter(span, &lateralJoinIterator{
771-
parentRow: row,
779+
primaryRow: primaryRow,
780+
parentLen: len(parentRow),
772781
primary: l,
773782
secondaryNode: j.Right(),
774783
cond: j.Filter,
775784
jType: j.Op,
776-
rowSize: len(row) + len(j.Left().Schema()) + len(j.Right().Schema()),
785+
rowSize: len(parentRow) + len(j.Left().Schema()) + len(j.Right().Schema()),
777786
scopeLen: j.ScopeLen,
778787
b: b,
779788
}), nil
780789
}
781790

782791
func (i *lateralJoinIterator) loadPrimary(ctx *sql.Context) error {
783-
if i.primaryRow == nil {
784-
lRow, err := i.primary.Next(ctx)
785-
if err != nil {
786-
return err
787-
}
788-
i.primaryRow = lRow
789-
i.foundMatch = false
792+
lRow, err := i.primary.Next(ctx)
793+
if err != nil {
794+
return err
790795
}
796+
copy(i.primaryRow[i.parentLen:], lRow)
797+
i.foundMatch = false
791798
return nil
792799
}
793800

794801
func (i *lateralJoinIterator) buildSecondary(ctx *sql.Context) error {
795-
if i.secondary == nil {
796-
prepended, _, err := transform.Node(i.secondaryNode, plan.PrependRowInPlan(i.primaryRow, true))
797-
if err != nil {
798-
return err
799-
}
800-
iter, err := i.b.Build(ctx, prepended, i.primaryRow)
801-
if err != nil {
802-
return err
803-
}
804-
i.secondary = iter
802+
prepended, _, err := transform.Node(i.secondaryNode, plan.PrependRowInPlan(i.primaryRow, true))
803+
if err != nil {
804+
return err
805+
}
806+
iter, err := i.b.Build(ctx, prepended, i.primaryRow)
807+
if err != nil {
808+
return err
805809
}
810+
i.secondary = iter
806811
return nil
807812
}
808813

809814
func (i *lateralJoinIterator) loadSecondary(ctx *sql.Context) error {
810-
if i.secondaryRow == nil {
811-
sRow, err := i.secondary.Next(ctx)
812-
if err != nil {
813-
return err
814-
}
815-
i.secondaryRow = sRow[len(i.primaryRow):]
815+
sRow, err := i.secondary.Next(ctx)
816+
if err != nil {
817+
return err
816818
}
819+
i.secondaryRow = sRow[len(i.primaryRow):]
817820
return nil
818821
}
819822

820823
func (i *lateralJoinIterator) buildRow(primaryRow, secondaryRow sql.Row) sql.Row {
821824
row := make(sql.Row, i.rowSize)
822-
copy(row, i.parentRow)
823-
copy(row[len(i.parentRow):], primaryRow)
824-
copy(row[len(i.parentRow)+len(primaryRow):], secondaryRow)
825+
copy(row, primaryRow)
826+
copy(row[len(primaryRow):], secondaryRow)
825827
return row
826828
}
827829

828830
func (i *lateralJoinIterator) removeParentRow(r sql.Row) sql.Row {
829-
copy(r[i.scopeLen:], r[len(i.parentRow):])
830-
r = r[:len(r)-len(i.parentRow)+i.scopeLen]
831+
copy(r[i.scopeLen:], r[i.parentLen:])
832+
r = r[:len(r)-i.parentLen+i.scopeLen]
831833
return r
832834
}
833835

@@ -836,18 +838,20 @@ func (i *lateralJoinIterator) reset(ctx *sql.Context) (err error) {
836838
err = i.secondary.Close(ctx)
837839
i.secondary = nil
838840
}
839-
i.primaryRow = nil
840841
i.secondaryRow = nil
841842
return
842843
}
843844

844845
func (i *lateralJoinIterator) Next(ctx *sql.Context) (sql.Row, error) {
845846
for {
846-
if err := i.loadPrimary(ctx); err != nil {
847-
return nil, err
848-
}
849-
if err := i.buildSecondary(ctx); err != nil {
850-
return nil, err
847+
// secondary being nil means we've exhausted all secondary rows for the current primary.
848+
if i.secondary == nil {
849+
if err := i.loadPrimary(ctx); err != nil {
850+
return nil, err
851+
}
852+
if err := i.buildSecondary(ctx); err != nil {
853+
return nil, err
854+
}
851855
}
852856
if err := i.loadSecondary(ctx); err != nil {
853857
if errors.Is(err, io.EOF) {
@@ -865,9 +869,7 @@ func (i *lateralJoinIterator) Next(ctx *sql.Context) (sql.Row, error) {
865869
}
866870
return nil, err
867871
}
868-
869872
row := i.buildRow(i.primaryRow, i.secondaryRow)
870-
i.secondaryRow = nil
871873
if i.cond != nil {
872874
if res, err := sql.EvaluateCondition(ctx, i.cond, row); err != nil {
873875
return nil, err

0 commit comments

Comments
 (0)