Skip to content

Commit b844e45

Browse files
authored
SOLR-17923: Add fullOuterJoin stream function (#3676)
1 parent 8679f66 commit b844e45

File tree

6 files changed

+443
-42
lines changed

6 files changed

+443
-42
lines changed

solr/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ New Features
277277
---------------------
278278
* SOLR-17915: shards.preference=replica.location now supports the "host" option for routing to replicas on the same host. (Houston Putman)
279279

280+
* SOLR-17923: Add fullOuterJoin stream function (Andy Webb)
281+
280282
Improvements
281283
---------------------
282284
* SOLR-17860: DocBasedVersionConstraintsProcessorFactory now supports PULL replicas. (Houston Putman)

solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc

Lines changed: 154 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -786,73 +786,42 @@ fetch(addresses,
786786

787787
The example above fetches addresses for users by matching the username in the tuple with the userId field in the addresses collection.
788788

789-
== having
790-
791-
The `having` expression wraps a stream and applies a boolean operation to each tuple.
792-
It emits only tuples for which the boolean operation returns *true*.
793-
794-
=== having Parameters
795-
796-
* `StreamExpression`: (Mandatory) The stream source for the having function.
797-
* `booleanEvaluator`: (Mandatory) The following boolean operations are supported: `eq` (equals), `gt` (greater than), `lt` (less than), `gteq` (greater than or equal to), `lteq` (less than or equal to), `and`, `or`, `eor` (exclusive or), and `not`.
798-
Boolean evaluators can be nested with other evaluators to form complex boolean logic.
799-
800-
The comparison evaluators compare the value in a specific field with a value, whether a string, number, or boolean.
801-
For example: `eq(field1, 10)`, returns `true` if `field1` is equal to 10.
802-
803-
=== having Syntax
804-
805-
[source,text]
806-
----
807-
having(rollup(over=a_s,
808-
sum(a_i),
809-
search(collection1,
810-
q="*:*",
811-
qt="/export",
812-
fl="id,a_s,a_i,a_f",
813-
sort="a_s asc")),
814-
and(gt(sum(a_i), 100), lt(sum(a_i), 110)))
815-
816-
----
817-
818-
In this example, the `having` expression iterates the aggregated tuples from the `rollup` expression and emits all tuples where the field `sum(a_i)` is greater than 100 and less than 110.
819-
820-
== leftOuterJoin
789+
== fullOuterJoin
821790

822-
The `leftOuterJoin` function wraps two streams, Left and Right, and emits tuples from Left.
823-
If there is a tuple in Right equal (as defined by `on`) then the values in that tuple will be included in the emitted tuple.
824-
An equal tuple in Right *need not* exist for the Left tuple to be emitted.
825-
This supports one-to-one, one-to-many, many-to-one, and many-to-many left outer join scenarios.
826-
The tuples are emitted in the order in which they appear in the Left stream.
791+
The `fullOuterJoin` function wraps two streams, Left and Right, and emits tuples from both.
792+
If there is a tuple in Right equal to that in the Left (as defined by `on`) then the values in that tuple will be included in the emitted tuple.
793+
An equal tuple in one stream *need not* exist for a tuple in the other to be emitted.
794+
This supports one-to-one, one-to-many, many-to-one, and many-to-many full outer join scenarios.
795+
The tuples are emitted in the order in which they appear in the streams.
827796
Both streams must be sorted by the fields being used to determine equality (using the `on` parameter).
828797
If both tuples contain a field of the same name then the value from the Right stream will be used in the emitted tuple.
829798

830799
You can wrap the incoming streams with a `select` function to be specific about which field values are included in the emitted tuple.
831800

832-
=== leftOuterJoin Parameters
801+
=== fullOuterJoin Parameters
833802

834803
* `StreamExpression for StreamLeft`
835804
* `StreamExpression for StreamRight`
836805
* `on`: Fields to be used for checking equality of tuples between Left and Right.
837806
Can be of the format `on="fieldName"`, `on="fieldNameInLeft=fieldNameInRight"`, or `on="fieldName, otherFieldName=rightOtherFieldName"`.
838807

839-
=== leftOuterJoin Syntax
808+
=== fullOuterJoin Syntax
840809

841810
[source,text]
842811
----
843-
leftOuterJoin(
812+
fullOuterJoin(
844813
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
845814
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
846815
on="personId"
847816
)
848817
849-
leftOuterJoin(
818+
fullOuterJoin(
850819
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
851820
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
852821
on="personId=ownerId"
853822
)
854823
855-
leftOuterJoin(
824+
fullOuterJoin(
856825
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
857826
select(
858827
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
@@ -863,6 +832,103 @@ leftOuterJoin(
863832
)
864833
----
865834

835+
=== Reciprocal Rank Fusion (RRF) using fullOuterJoin
836+
837+
The `fullOuterJoin` function can be used to construct an RRF algorithm for merging together result sets, as illustrated below.
838+
839+
(In a real-world example the two `sort/list/tuple` blocks would likely use `search` instead.)
840+
841+
[source,text]
842+
----
843+
top(
844+
n=10,
845+
sort(
846+
select(
847+
fullOuterJoin(
848+
sort(
849+
select(
850+
sort(
851+
list(
852+
tuple(id=1, title="L 1", left="a", score=4.5),
853+
tuple(id=2, title="L 2", left="b", score=3.5),
854+
tuple(id=3, title="L 3", left="c", score=2.5),
855+
tuple(id=4, title="L 4", left="d", score=2.5),
856+
tuple(id=5, title="L 5", left="e", score=2.5),
857+
tuple(id=6, title="L 6", left="f", score=2.5),
858+
),
859+
by="score desc"
860+
),
861+
*,
862+
score as scoreL,
863+
add(recNum(),1) as rankL,
864+
div(1,add(rankL,60)) as rrL
865+
),
866+
by="id asc"
867+
),
868+
sort(
869+
select(
870+
sort(
871+
list(
872+
tuple(id=3, title="R 3", right="g", score=0.9),
873+
tuple(id=2, title="R 2", right="h", score=0.8),
874+
tuple(id=4, title="R 4", right="i", score=0.7),
875+
tuple(id=7, title="R 7", right="j", score=0.6),
876+
tuple(id=8, title="R 8", right="k", score=0.5),
877+
tuple(id=9, title="R 9", right="l", score=0.5),
878+
),
879+
by="score desc"
880+
),
881+
*,
882+
score as scoreR,
883+
add(recNum(),1) as rankR,
884+
div(1,add(rankR,60)) as rrR
885+
),
886+
by="id asc"
887+
),
888+
on="id"
889+
),
890+
*,
891+
replace(rrL,null,withValue=0),
892+
replace(rrR,null,withValue=0),
893+
add(rrL,rrR) as rrf,
894+
),
895+
by="rrf desc"
896+
),
897+
sort="rrf desc"
898+
)
899+
----
900+
901+
== having
902+
903+
The `having` expression wraps a stream and applies a boolean operation to each tuple.
904+
It emits only tuples for which the boolean operation returns *true*.
905+
906+
=== having Parameters
907+
908+
* `StreamExpression`: (Mandatory) The stream source for the having function.
909+
* `booleanEvaluator`: (Mandatory) The following boolean operations are supported: `eq` (equals), `gt` (greater than), `lt` (less than), `gteq` (greater than or equal to), `lteq` (less than or equal to), `and`, `or`, `eor` (exclusive or), and `not`.
910+
Boolean evaluators can be nested with other evaluators to form complex boolean logic.
911+
912+
The comparison evaluators compare the value in a specific field with a value, whether a string, number, or boolean.
913+
For example: `eq(field1, 10)`, returns `true` if `field1` is equal to 10.
914+
915+
=== having Syntax
916+
917+
[source,text]
918+
----
919+
having(rollup(over=a_s,
920+
sum(a_i),
921+
search(collection1,
922+
q="*:*",
923+
qt="/export",
924+
fl="id,a_s,a_i,a_f",
925+
sort="a_s asc")),
926+
and(gt(sum(a_i), 100), lt(sum(a_i), 110)))
927+
928+
----
929+
930+
In this example, the `having` expression iterates the aggregated tuples from the `rollup` expression and emits all tuples where the field `sum(a_i)` is greater than 100 and less than 110.
931+
866932
== hashJoin
867933

868934
The `hashJoin` function wraps two streams, Left and Right, and for every tuple in Left which exists in Right will emit a tuple containing the fields of both tuples.
@@ -986,6 +1052,52 @@ intersect(
9861052
)
9871053
----
9881054

1055+
== leftOuterJoin
1056+
1057+
The `leftOuterJoin` function wraps two streams, Left and Right, and emits tuples from Left.
1058+
If there is a tuple in Right equal (as defined by `on`) then the values in that tuple will be included in the emitted tuple.
1059+
An equal tuple in Right *need not* exist for the Left tuple to be emitted.
1060+
This supports one-to-one, one-to-many, many-to-one, and many-to-many left outer join scenarios.
1061+
The tuples are emitted in the order in which they appear in the Left stream.
1062+
Both streams must be sorted by the fields being used to determine equality (using the `on` parameter).
1063+
If both tuples contain a field of the same name then the value from the Right stream will be used in the emitted tuple.
1064+
1065+
You can wrap the incoming streams with a `select` function to be specific about which field values are included in the emitted tuple.
1066+
1067+
=== leftOuterJoin Parameters
1068+
1069+
* `StreamExpression for StreamLeft`
1070+
* `StreamExpression for StreamRight`
1071+
* `on`: Fields to be used for checking equality of tuples between Left and Right.
1072+
Can be of the format `on="fieldName"`, `on="fieldNameInLeft=fieldNameInRight"`, or `on="fieldName, otherFieldName=rightOtherFieldName"`.
1073+
1074+
=== leftOuterJoin Syntax
1075+
1076+
[source,text]
1077+
----
1078+
leftOuterJoin(
1079+
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
1080+
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
1081+
on="personId"
1082+
)
1083+
1084+
leftOuterJoin(
1085+
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
1086+
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
1087+
on="personId=ownerId"
1088+
)
1089+
1090+
leftOuterJoin(
1091+
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
1092+
select(
1093+
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
1094+
ownerId,
1095+
name as petName
1096+
),
1097+
on="personId=ownerId"
1098+
)
1099+
----
1100+
9891101
[#list_expression]
9901102
== list
9911103

solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@
272272
import org.apache.solr.client.solrj.io.stream.FacetStream;
273273
import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
274274
import org.apache.solr.client.solrj.io.stream.FetchStream;
275+
import org.apache.solr.client.solrj.io.stream.FullOuterJoinStream;
275276
import org.apache.solr.client.solrj.io.stream.GetStream;
276277
import org.apache.solr.client.solrj.io.stream.HashJoinStream;
277278
import org.apache.solr.client.solrj.io.stream.HashRollupStream;
@@ -355,6 +356,7 @@ public static void register(StreamFactory streamFactory) {
355356
.withFunctionName("stats", StatsStream.class)
356357
.withFunctionName("innerJoin", InnerJoinStream.class)
357358
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
359+
.withFunctionName("fullOuterJoin", FullOuterJoinStream.class)
358360
.withFunctionName("hashJoin", HashJoinStream.class)
359361
.withFunctionName("outerHashJoin", OuterHashJoinStream.class)
360362
.withFunctionName("intersect", IntersectStream.class)
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.client.solrj.io.stream;
18+
19+
import java.io.IOException;
20+
import java.util.LinkedList;
21+
import org.apache.solr.client.solrj.io.Tuple;
22+
import org.apache.solr.client.solrj.io.comp.StreamComparator;
23+
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
24+
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
25+
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
26+
27+
/**
28+
* Joins leftStream with rightStream based on an Equalitor. Both streams must be sorted by the
29+
* fields being joined on. Resulting stream is sorted by the equalitor.
30+
*
31+
* @since 9.10.0
32+
*/
33+
public class FullOuterJoinStream extends BiJoinStream {
34+
35+
@SuppressWarnings("JdkObsolete")
36+
private final LinkedList<Tuple> joinedTuples = new LinkedList<>();
37+
38+
@SuppressWarnings("JdkObsolete")
39+
private final LinkedList<Tuple> leftTupleGroup = new LinkedList<>();
40+
41+
@SuppressWarnings("JdkObsolete")
42+
private final LinkedList<Tuple> rightTupleGroup = new LinkedList<>();
43+
44+
public FullOuterJoinStream(TupleStream leftStream, TupleStream rightStream, StreamEqualitor eq)
45+
throws IOException {
46+
super(leftStream, rightStream, eq);
47+
}
48+
49+
public FullOuterJoinStream(StreamExpression expression, StreamFactory factory)
50+
throws IOException {
51+
super(expression, factory);
52+
}
53+
54+
@Override
55+
public Tuple read() throws IOException {
56+
// if we've already figured out the next joined tuple then just return it
57+
if (joinedTuples.size() > 0) {
58+
return joinedTuples.removeFirst();
59+
}
60+
61+
// keep going until we find something to return or both streams are empty
62+
while (true) {
63+
64+
// load next set of equal tuples from leftStream into leftTupleGroup
65+
if (0 == leftTupleGroup.size()) {
66+
loadEqualTupleGroup(leftStream, leftTupleGroup, leftStreamComparator);
67+
}
68+
69+
// same for right
70+
if (0 == rightTupleGroup.size()) {
71+
loadEqualTupleGroup(rightStream, rightTupleGroup, rightStreamComparator);
72+
}
73+
74+
Boolean leftFinished = (0 == leftTupleGroup.size() || leftTupleGroup.get(0).EOF);
75+
Boolean rightFinished = (0 == rightTupleGroup.size() || rightTupleGroup.get(0).EOF);
76+
77+
// If both streams are at EOF, we're done
78+
if (leftFinished && rightFinished) {
79+
return Tuple.EOF();
80+
}
81+
82+
// If the left stream is at the EOF, we just return the next element from the right stream
83+
if (leftFinished) {
84+
return rightTupleGroup.removeFirst();
85+
}
86+
87+
// If the right stream is at the EOF, we just return the next element from the left stream
88+
if (rightFinished) {
89+
return leftTupleGroup.removeFirst();
90+
}
91+
92+
// At this point we know both left and right groups have at least 1 member
93+
if (eq.test(leftTupleGroup.get(0), rightTupleGroup.get(0))) {
94+
// The groups are equal. Join em together and build the joinedTuples
95+
for (Tuple left : leftTupleGroup) {
96+
for (Tuple right : rightTupleGroup) {
97+
Tuple clone = left.clone();
98+
clone.merge(right);
99+
joinedTuples.add(clone);
100+
}
101+
}
102+
103+
// Cause each to advance next time we need to look
104+
leftTupleGroup.clear();
105+
rightTupleGroup.clear();
106+
107+
return joinedTuples.removeFirst();
108+
} else {
109+
int c = iterationComparator.compare(leftTupleGroup.get(0), rightTupleGroup.get(0));
110+
if (c < 0) {
111+
// If there's no match, we still advance the left stream while returning every element.
112+
// Because it's an outer join we still return the left tuple if no match on right.
113+
return leftTupleGroup.removeFirst();
114+
} else {
115+
// return right item as it didn't match left and this is a full outer join
116+
return rightTupleGroup.removeFirst();
117+
}
118+
}
119+
}
120+
}
121+
122+
@Override
123+
public StreamComparator getStreamSort() {
124+
return iterationComparator;
125+
}
126+
}

0 commit comments

Comments
 (0)