2020import org .elasticsearch .common .util .BigArrays ;
2121import org .elasticsearch .common .util .MockBigArrays ;
2222import org .elasticsearch .common .util .MockPageCacheRecycler ;
23+ import org .elasticsearch .compute .data .Block ;
2324import org .elasticsearch .compute .data .BlockFactory ;
24- import org .elasticsearch .compute .data .BytesRefBlock ;
25- import org .elasticsearch .compute .data .BytesRefVector ;
26- import org .elasticsearch .compute .data .ElementType ;
2725import org .elasticsearch .compute .data .LongBlock ;
2826import org .elasticsearch .compute .data .LongVector ;
2927import org .elasticsearch .compute .lucene .DataPartitioning ;
3432import org .elasticsearch .compute .operator .DriverContext ;
3533import org .elasticsearch .compute .operator .DriverRunner ;
3634import org .elasticsearch .compute .operator .PageConsumerOperator ;
35+ import org .elasticsearch .compute .test .BlockTestUtils ;
3736import org .elasticsearch .core .TimeValue ;
3837import org .elasticsearch .index .IndexService ;
3938import org .elasticsearch .index .IndexSettings ;
5352import org .elasticsearch .xpack .esql .core .type .DataType ;
5453import org .elasticsearch .xpack .esql .enrich .LookupFromIndexOperator ;
5554import org .elasticsearch .xpack .esql .planner .EsPhysicalOperationProviders ;
55+ import org .elasticsearch .xpack .esql .planner .PlannerUtils ;
5656import org .elasticsearch .xpack .esql .plugin .EsqlPlugin ;
5757import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
5858import org .elasticsearch .xpack .esql .plugin .TransportEsqlQueryAction ;
6868import static org .elasticsearch .test .ListMatcher .matchesList ;
6969import static org .elasticsearch .test .MapMatcher .assertMap ;
7070import static org .hamcrest .Matchers .empty ;
71+ import static org .hamcrest .Matchers .hasSize ;
7172
7273public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
73- // TODO should we remove this now that this is integrated into ESQL proper?
74- /**
75- * Quick and dirty test for looking up data from a lookup index.
76- */
77- public void testLookupIndex () throws IOException {
78- runLookup (new UsingSingleLookupTable (new String [] { "aa" , "bb" , "cc" , "dd" }));
74+ public void testKeywordKey () throws IOException {
75+ runLookup ( DataType . KEYWORD , new UsingSingleLookupTable ( new String [] { "aa" , "bb" , "cc" , "dd" }));
76+ }
77+
78+ public void testLongKey () throws IOException {
79+ runLookup (DataType . LONG , new UsingSingleLookupTable (new Long [] { 12L , 33L , 1L }));
7980 }
8081
8182 /**
82- * Tests when multiple results match.
83+ * LOOKUP multiple results match.
8384 */
8485 public void testLookupIndexMultiResults () throws IOException {
85- runLookup (new UsingSingleLookupTable (new String [] { "aa" , "bb" , "bb" , "dd" }));
86+ runLookup (DataType . KEYWORD , new UsingSingleLookupTable (new String [] { "aa" , "bb" , "bb" , "dd" }));
8687 }
8788
8889 interface PopulateIndices {
8990 void populate (int docCount , List <String > expected ) throws IOException ;
9091 }
9192
9293 class UsingSingleLookupTable implements PopulateIndices {
93- private final Map <String , List <Integer >> matches = new HashMap <>();
94- private final String [] lookupData ;
94+ private final Map <Object , List <Integer >> matches = new HashMap <>();
95+ private final Object [] lookupData ;
9596
96- UsingSingleLookupTable (String [] lookupData ) {
97+ UsingSingleLookupTable (Object [] lookupData ) {
9798 this .lookupData = lookupData ;
9899 for (int i = 0 ; i < lookupData .length ; i ++) {
99100 matches .computeIfAbsent (lookupData [i ], k -> new ArrayList <>()).add (i );
@@ -104,26 +105,26 @@ class UsingSingleLookupTable implements PopulateIndices {
104105 public void populate (int docCount , List <String > expected ) {
105106 List <IndexRequestBuilder > docs = new ArrayList <>();
106107 for (int i = 0 ; i < docCount ; i ++) {
107- String data = lookupData [i % lookupData .length ];
108- docs .add (client ().prepareIndex ("source" ).setSource (Map .of ("data " , data )));
109- for (Integer match : matches .get (data )) {
110- expected .add (data + ":" + match );
108+ Object key = lookupData [i % lookupData .length ];
109+ docs .add (client ().prepareIndex ("source" ).setSource (Map .of ("key " , key )));
110+ for (Integer match : matches .get (key )) {
111+ expected .add (key + ":" + match );
111112 }
112113 }
113114 for (int i = 0 ; i < lookupData .length ; i ++) {
114- docs .add (client ().prepareIndex ("lookup" ).setSource (Map .of ("data " , lookupData [i ], "l" , i )));
115+ docs .add (client ().prepareIndex ("lookup" ).setSource (Map .of ("key " , lookupData [i ], "l" , i )));
115116 }
116117 Collections .sort (expected );
117118 indexRandom (true , true , docs );
118119 }
119120 }
120121
121- private void runLookup (PopulateIndices populateIndices ) throws IOException {
122+ private void runLookup (DataType keyType , PopulateIndices populateIndices ) throws IOException {
122123 client ().admin ()
123124 .indices ()
124125 .prepareCreate ("source" )
125126 .setSettings (Settings .builder ().put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 ))
126- .setMapping ("data " , "type=keyword" )
127+ .setMapping ("key " , "type=" + keyType . esType () )
127128 .get ();
128129 client ().admin ()
129130 .indices ()
@@ -134,7 +135,7 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
134135 // TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command.
135136 .put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 )
136137 )
137- .setMapping ("data " , "type=keyword" , "l" , "type=long" )
138+ .setMapping ("key " , "type=" + keyType . esType () , "l" , "type=long" )
138139 .get ();
139140 client ().admin ().cluster ().prepareHealth (TEST_REQUEST_TIMEOUT ).setWaitForGreenStatus ().get ();
140141
@@ -189,9 +190,9 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
189190 ValuesSourceReaderOperator .Factory reader = new ValuesSourceReaderOperator .Factory (
190191 List .of (
191192 new ValuesSourceReaderOperator .FieldInfo (
192- "data " ,
193- ElementType . BYTES_REF ,
194- shard -> searchContext .getSearchExecutionContext ().getFieldType ("data " ).blockLoader (null )
193+ "key " ,
194+ PlannerUtils . toElementType ( keyType ) ,
195+ shard -> searchContext .getSearchExecutionContext ().getFieldType ("key " ).blockLoader (null )
195196 )
196197 ),
197198 List .of (new ValuesSourceReaderOperator .ShardContext (searchContext .getSearchExecutionContext ().getIndexReader (), () -> {
@@ -217,9 +218,9 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
217218 QueryPragmas .ENRICH_MAX_WORKERS .get (Settings .EMPTY ),
218219 1 ,
219220 ctx -> internalCluster ().getInstance (TransportEsqlQueryAction .class , finalNodeWithShard ).getLookupFromIndexService (),
220- DataType . KEYWORD ,
221+ keyType ,
221222 "lookup" ,
222- "data " ,
223+ "key " ,
223224 List .of (new Alias (Source .EMPTY , "l" , new ReferenceAttribute (Source .EMPTY , "l" , DataType .LONG ))),
224225 Source .EMPTY
225226 );
@@ -231,10 +232,16 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
231232 List .of (reader .get (driverContext ), lookup .get (driverContext )),
232233 new PageConsumerOperator (page -> {
233234 try {
234- BytesRefVector dataBlock = page .< BytesRefBlock > getBlock (1 ). asVector ( );
235+ Block keyBlock = page .getBlock (1 );
235236 LongVector loadedBlock = page .<LongBlock >getBlock (2 ).asVector ();
236237 for (int p = 0 ; p < page .getPositionCount (); p ++) {
237- results .add (dataBlock .getBytesRef (p , new BytesRef ()).utf8ToString () + ":" + loadedBlock .getLong (p ));
238+ List <Object > key = BlockTestUtils .valuesAtPositions (keyBlock , p , p + 1 ).get (0 );
239+ assertThat (key , hasSize (1 ));
240+ Object keyValue = key .get (0 );
241+ if (keyValue instanceof BytesRef b ) {
242+ keyValue = b .utf8ToString ();
243+ }
244+ results .add (keyValue + ":" + loadedBlock .getLong (p ));
238245 }
239246 } finally {
240247 page .releaseBlocks ();
0 commit comments