@@ -566,6 +566,7 @@ private Response query(String query, String filterPath) throws IOException {
566566 .setRequestConfig (RequestConfig .custom ().setSocketTimeout (Math .toIntExact (TimeValue .timeValueMinutes (6 ).millis ())).build ())
567567 .setWarningsHandler (WarningsHandler .PERMISSIVE )
568568 );
569+ logger .info ("Running query:" + query );
569570 return runQuery (() -> client ().performRequest (request ));
570571 }
571572
@@ -735,28 +736,48 @@ private Map<String, Object> fetchMvLongs() throws IOException {
735736 public void testLookupExplosion () throws IOException {
736737 int sensorDataCount = 400 ;
737738 int lookupEntries = 10000 ;
738- Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , 1 , lookupEntries );
739+ Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , 1 , lookupEntries , false );
739740 assertMap (map , matchesMap ().extraOk ().entry ("values" , List .of (List .of (sensorDataCount * lookupEntries ))));
740741 }
741742
742743 public void testLookupExplosionManyFields () throws IOException {
743744 int sensorDataCount = 400 ;
744745 int lookupEntries = 1000 ;
745746 int joinFieldsCount = 990 ;
746- Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , joinFieldsCount , lookupEntries );
747+ Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , joinFieldsCount , lookupEntries , false );
748+ assertMap (map , matchesMap ().extraOk ().entry ("values" , List .of (List .of (sensorDataCount * lookupEntries ))));
749+ }
750+
751+ public void testLookupExplosionExpression () throws IOException {
752+ int sensorDataCount = 400 ;
753+ int lookupEntries = 10000 ;
754+ Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , 1 , lookupEntries , true );
755+ assertMap (map , matchesMap ().extraOk ().entry ("values" , List .of (List .of (sensorDataCount * lookupEntries ))));
756+ }
757+
758+ public void testLookupExplosionManyFieldsExpression () throws IOException {
759+ int sensorDataCount = 400 ;
760+ int lookupEntries = 1000 ;
761+ int joinFieldsCount = 399 ;// only join on 399 columns due to max expression size of 400
762+ Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , joinFieldsCount , lookupEntries , true );
747763 assertMap (map , matchesMap ().extraOk ().entry ("values" , List .of (List .of (sensorDataCount * lookupEntries ))));
748764 }
749765
750766 public void testLookupExplosionManyMatchesManyFields () throws IOException {
751767 // 1500, 10000 is enough locally, but some CI machines need more.
752768 int lookupEntries = 10000 ;
753- assertCircuitBreaks (attempt -> lookupExplosion (attempt * 1500 , lookupEntries , 30 , lookupEntries ));
769+ assertCircuitBreaks (attempt -> lookupExplosion (attempt * 1500 , lookupEntries , 30 , lookupEntries , false ));
754770 }
755771
756772 public void testLookupExplosionManyMatches () throws IOException {
757773 // 1500, 10000 is enough locally, but some CI machines need more.
758774 int lookupEntries = 10000 ;
759- assertCircuitBreaks (attempt -> lookupExplosion (attempt * 1500 , lookupEntries , 1 , lookupEntries ));
775+ assertCircuitBreaks (attempt -> lookupExplosion (attempt * 1500 , lookupEntries , 1 , lookupEntries , false ));
776+ }
777+
778+ public void testLookupExplosionManyMatchesExpression () throws IOException {
779+ int lookupEntries = 10000 ;
780+ assertCircuitBreaks (attempt -> lookupExplosion (attempt * 1500 , lookupEntries , 1 , lookupEntries , true ));
760781 }
761782
762783 public void testLookupExplosionManyMatchesFiltered () throws IOException {
@@ -768,9 +789,21 @@ public void testLookupExplosionManyMatchesFiltered() throws IOException {
768789 int reductionFactor = 1000 ; // reduce the number of matches by this factor
769790 // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
770791 assertTrue (0 == lookupEntries % reductionFactor );
771- Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , 1 , lookupEntries / reductionFactor );
792+ Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , 1 , lookupEntries / reductionFactor , false );
772793 assertMap (map , matchesMap ().extraOk ().entry ("values" , List .of (List .of (sensorDataCount * lookupEntries / reductionFactor ))));
794+ }
773795
796+ public void testLookupExplosionManyMatchesFilteredExpression () throws IOException {
797+ // This test will only work with the expanding join optimization
798+ // that pushes the filter to the right side of the lookup.
799+ // Without the optimization, it will fail with circuit_breaking_exception
800+ int sensorDataCount = 10000 ;
801+ int lookupEntries = 10000 ;
802+ int reductionFactor = 1000 ; // reduce the number of matches by this factor
803+ // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
804+ assertTrue (0 == lookupEntries % reductionFactor );
805+ Map <?, ?> map = lookupExplosion (sensorDataCount , lookupEntries , 1 , lookupEntries / reductionFactor , true );
806+ assertMap (map , matchesMap ().extraOk ().entry ("values" , List .of (List .of (sensorDataCount * lookupEntries / reductionFactor ))));
774807 }
775808
776809 public void testLookupExplosionNoFetch () throws IOException {
@@ -797,17 +830,33 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
797830 assertCircuitBreaks (attempt -> lookupExplosionBigString (attempt * 500 , 1 ));
798831 }
799832
800- private Map <String , Object > lookupExplosion (int sensorDataCount , int lookupEntries , int joinFieldsCount , int lookupEntriesToKeep )
801- throws IOException {
833+ private Map <String , Object > lookupExplosion (
834+ int sensorDataCount ,
835+ int lookupEntries ,
836+ int joinFieldsCount ,
837+ int lookupEntriesToKeep ,
838+ boolean expressionBasedJoin
839+ ) throws IOException {
802840 try {
803- lookupExplosionData (sensorDataCount , lookupEntries , joinFieldsCount );
841+ lookupExplosionData (sensorDataCount , lookupEntries , joinFieldsCount , expressionBasedJoin );
804842 StringBuilder query = startQuery ();
805843 query .append ("FROM sensor_data | LOOKUP JOIN sensor_lookup ON " );
806- for (int i = 0 ; i < joinFieldsCount ; i ++) {
807- if (i != 0 ) {
808- query .append ("," );
844+ if (expressionBasedJoin ) {
845+ for (int i = 0 ; i < joinFieldsCount ; i ++) {
846+ if (i != 0 ) {
847+ query .append (" AND " );
848+ }
849+ query .append ("id_left" ).append (i );
850+ query .append ("==" );
851+ query .append ("id_right" ).append (i );
852+ }
853+ } else {
854+ for (int i = 0 ; i < joinFieldsCount ; i ++) {
855+ if (i != 0 ) {
856+ query .append ("," );
857+ }
858+ query .append ("id" ).append (i );
809859 }
810- query .append ("id" ).append (i );
811860 }
812861 if (lookupEntries != lookupEntriesToKeep ) {
813862 // add a filter to reduce the number of matches
@@ -826,7 +875,7 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
826875
827876 private Map <String , Object > lookupExplosionNoFetch (int sensorDataCount , int lookupEntries ) throws IOException {
828877 try {
829- lookupExplosionData (sensorDataCount , lookupEntries , 1 );
878+ lookupExplosionData (sensorDataCount , lookupEntries , 1 , false );
830879 StringBuilder query = startQuery ();
831880 query .append ("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\" }" );
832881 return responseAsMap (query (query .toString (), null ));
@@ -836,14 +885,15 @@ private Map<String, Object> lookupExplosionNoFetch(int sensorDataCount, int look
836885 }
837886 }
838887
839- private void lookupExplosionData (int sensorDataCount , int lookupEntries , int joinFieldCount ) throws IOException {
840- initSensorData (sensorDataCount , 1 , joinFieldCount );
841- initSensorLookup (lookupEntries , 1 , i -> "73.9857 40.7484" , joinFieldCount );
888+ private void lookupExplosionData (int sensorDataCount , int lookupEntries , int joinFieldCount , boolean expressionBasedJoin )
889+ throws IOException {
890+ initSensorData (sensorDataCount , 1 , joinFieldCount , expressionBasedJoin );
891+ initSensorLookup (lookupEntries , 1 , i -> "73.9857 40.7484" , joinFieldCount , expressionBasedJoin );
842892 }
843893
844894 private Map <String , Object > lookupExplosionBigString (int sensorDataCount , int lookupEntries ) throws IOException {
845895 try {
846- initSensorData (sensorDataCount , 1 , 1 );
896+ initSensorData (sensorDataCount , 1 , 1 , false );
847897 initSensorLookupString (lookupEntries , 1 , i -> {
848898 int target = Math .toIntExact (ByteSizeValue .ofMb (1 ).getBytes ());
849899 StringBuilder str = new StringBuilder (Math .toIntExact (ByteSizeValue .ofMb (2 ).getBytes ()));
@@ -876,7 +926,7 @@ public void testEnrichExplosionManyMatches() throws IOException {
876926
877927 private Map <String , Object > enrichExplosion (int sensorDataCount , int lookupEntries ) throws IOException {
878928 try {
879- initSensorData (sensorDataCount , 1 , 1 );
929+ initSensorData (sensorDataCount , 1 , 1 , false );
880930 initSensorEnrich (lookupEntries , 1 , i -> "73.9857 40.7484" );
881931 try {
882932 StringBuilder query = startQuery ();
@@ -1050,7 +1100,7 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx
10501100 initIndex ("mv_longs" , bulk .toString ());
10511101 }
10521102
1053- private void initSensorData (int docCount , int sensorCount , int joinFieldCount ) throws IOException {
1103+ private void initSensorData (int docCount , int sensorCount , int joinFieldCount , boolean expressionBasedJoin ) throws IOException {
10541104 logger .info ("loading sensor data" );
10551105 // We cannot go over 1000 fields, due to failed on parsing mappings on index creation
10561106 // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
@@ -1061,8 +1111,9 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
10611111 "properties": {
10621112 "@timestamp": { "type": "date" },
10631113 """ );
1114+ String suffix = expressionBasedJoin ? "_left" : "" ;
10641115 for (int i = 0 ; i < joinFieldCount ; i ++) {
1065- createIndexBuilder .append ("\" id" ).append (i ).append ("\" : { \" type\" : \" long\" }," );
1116+ createIndexBuilder .append ("\" id" ).append (suffix ). append ( i ).append ("\" : { \" type\" : \" long\" }," );
10661117 }
10671118 createIndexBuilder .append ("""
10681119 "value": { "type": "double" }
@@ -1083,7 +1134,7 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
10831134 {"create":{}}
10841135 {"timestamp":"%s",""" , DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER .formatMillis (i * 10L + firstDate )));
10851136 for (int j = 0 ; j < joinFieldCount ; j ++) {
1086- data .append (String .format (Locale .ROOT , "\" id%d\" :%d, " , j , i % sensorCount ));
1137+ data .append (String .format (Locale .ROOT , "\" id%s% d\" :%d, " , suffix , j , i % sensorCount ));
10871138 }
10881139 data .append (String .format (Locale .ROOT , "\" value\" : %f}\n " , i * 1.1 ));
10891140 if (i % docsPerBulk == docsPerBulk - 1 ) {
@@ -1094,8 +1145,13 @@ private void initSensorData(int docCount, int sensorCount, int joinFieldCount) t
10941145 initIndex ("sensor_data" , data .toString ());
10951146 }
10961147
1097- private void initSensorLookup (int lookupEntries , int sensorCount , IntFunction <String > location , int joinFieldsCount )
1098- throws IOException {
1148+ private void initSensorLookup (
1149+ int lookupEntries ,
1150+ int sensorCount ,
1151+ IntFunction <String > location ,
1152+ int joinFieldsCount ,
1153+ boolean expressionBasedJoin
1154+ ) throws IOException {
10991155 logger .info ("loading sensor lookup" );
11001156 // cannot go over 1000 fields, due to failed on parsing mappings on index creation
11011157 // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded
@@ -1105,8 +1161,9 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
11051161 {
11061162 "properties": {
11071163 """ );
1164+ String suffix = expressionBasedJoin ? "_right" : "" ;
11081165 for (int i = 0 ; i < joinFieldsCount ; i ++) {
1109- createIndexBuilder .append ("\" id" ).append (i ).append ("\" : { \" type\" : \" long\" }," );
1166+ createIndexBuilder .append ("\" id" ).append (suffix ). append ( i ).append ("\" : { \" type\" : \" long\" }," );
11101167 }
11111168 createIndexBuilder .append ("""
11121169 "location": { "type": "geo_point" },
@@ -1127,7 +1184,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
11271184 {"create":{}}
11281185 {""" ));
11291186 for (int j = 0 ; j < joinFieldsCount ; j ++) {
1130- data .append (String .format (Locale .ROOT , "\" id%d\" :%d, " , j , sensor ));
1187+ data .append (String .format (Locale .ROOT , "\" id%s% d\" :%d, " , suffix , j , sensor ));
11311188 }
11321189 data .append (String .format (Locale .ROOT , """
11331190 "location": "POINT(%s)", "filter_key": %d}\n """ , location .apply (sensor ), i ));
@@ -1165,7 +1222,7 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct
11651222 }
11661223
11671224 private void initSensorEnrich (int lookupEntries , int sensorCount , IntFunction <String > location ) throws IOException {
1168- initSensorLookup (lookupEntries , sensorCount , location , 1 );
1225+ initSensorLookup (lookupEntries , sensorCount , location , 1 , false );
11691226 logger .info ("loading sensor enrich" );
11701227
11711228 Request create = new Request ("PUT" , "/_enrich/policy/sensor" );
0 commit comments