20
20
import org .elasticsearch .common .util .BigArrays ;
21
21
import org .elasticsearch .common .util .MockBigArrays ;
22
22
import org .elasticsearch .common .util .MockPageCacheRecycler ;
23
+ import org .elasticsearch .compute .data .Block ;
23
24
import org .elasticsearch .compute .data .BlockFactory ;
24
- import org .elasticsearch .compute .data .BytesRefBlock ;
25
- import org .elasticsearch .compute .data .BytesRefVector ;
26
- import org .elasticsearch .compute .data .ElementType ;
27
25
import org .elasticsearch .compute .data .LongBlock ;
28
26
import org .elasticsearch .compute .data .LongVector ;
29
27
import org .elasticsearch .compute .lucene .DataPartitioning ;
34
32
import org .elasticsearch .compute .operator .DriverContext ;
35
33
import org .elasticsearch .compute .operator .DriverRunner ;
36
34
import org .elasticsearch .compute .operator .PageConsumerOperator ;
35
+ import org .elasticsearch .compute .test .BlockTestUtils ;
37
36
import org .elasticsearch .core .TimeValue ;
38
37
import org .elasticsearch .index .IndexService ;
39
38
import org .elasticsearch .index .IndexSettings ;
53
52
import org .elasticsearch .xpack .esql .core .type .DataType ;
54
53
import org .elasticsearch .xpack .esql .enrich .LookupFromIndexOperator ;
55
54
import org .elasticsearch .xpack .esql .planner .EsPhysicalOperationProviders ;
55
+ import org .elasticsearch .xpack .esql .planner .PlannerUtils ;
56
56
import org .elasticsearch .xpack .esql .plugin .EsqlPlugin ;
57
57
import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
58
58
import org .elasticsearch .xpack .esql .plugin .TransportEsqlQueryAction ;
68
68
import static org .elasticsearch .test .ListMatcher .matchesList ;
69
69
import static org .elasticsearch .test .MapMatcher .assertMap ;
70
70
import static org .hamcrest .Matchers .empty ;
71
+ import static org .hamcrest .Matchers .hasSize ;
71
72
72
73
public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
73
- // TODO should we remove this now that this is integrated into ESQL proper?
74
- /**
75
- * Quick and dirty test for looking up data from a lookup index.
76
- */
77
- public void testLookupIndex () throws IOException {
78
- runLookup (new UsingSingleLookupTable (new String [] { "aa" , "bb" , "cc" , "dd" }));
74
+ public void testKeywordKey () throws IOException {
75
+ runLookup ( DataType . KEYWORD , new UsingSingleLookupTable ( new String [] { "aa" , "bb" , "cc" , "dd" }));
76
+ }
77
+
78
+ public void testLongKey () throws IOException {
79
+ runLookup (DataType . LONG , new UsingSingleLookupTable (new Long [] { 12L , 33L , 1L }));
79
80
}
80
81
81
82
/**
82
- * Tests when multiple results match.
83
+ * LOOKUP multiple results match.
83
84
*/
84
85
public void testLookupIndexMultiResults () throws IOException {
85
- runLookup (new UsingSingleLookupTable (new String [] { "aa" , "bb" , "bb" , "dd" }));
86
+ runLookup (DataType . KEYWORD , new UsingSingleLookupTable (new String [] { "aa" , "bb" , "bb" , "dd" }));
86
87
}
87
88
88
89
interface PopulateIndices {
89
90
void populate (int docCount , List <String > expected ) throws IOException ;
90
91
}
91
92
92
93
class UsingSingleLookupTable implements PopulateIndices {
93
- private final Map <String , List <Integer >> matches = new HashMap <>();
94
- private final String [] lookupData ;
94
+ private final Map <Object , List <Integer >> matches = new HashMap <>();
95
+ private final Object [] lookupData ;
95
96
96
- UsingSingleLookupTable (String [] lookupData ) {
97
+ UsingSingleLookupTable (Object [] lookupData ) {
97
98
this .lookupData = lookupData ;
98
99
for (int i = 0 ; i < lookupData .length ; i ++) {
99
100
matches .computeIfAbsent (lookupData [i ], k -> new ArrayList <>()).add (i );
@@ -104,26 +105,26 @@ class UsingSingleLookupTable implements PopulateIndices {
104
105
public void populate (int docCount , List <String > expected ) {
105
106
List <IndexRequestBuilder > docs = new ArrayList <>();
106
107
for (int i = 0 ; i < docCount ; i ++) {
107
- String data = lookupData [i % lookupData .length ];
108
- docs .add (client ().prepareIndex ("source" ).setSource (Map .of ("data " , data )));
109
- for (Integer match : matches .get (data )) {
110
- expected .add (data + ":" + match );
108
+ Object key = lookupData [i % lookupData .length ];
109
+ docs .add (client ().prepareIndex ("source" ).setSource (Map .of ("key " , key )));
110
+ for (Integer match : matches .get (key )) {
111
+ expected .add (key + ":" + match );
111
112
}
112
113
}
113
114
for (int i = 0 ; i < lookupData .length ; i ++) {
114
- docs .add (client ().prepareIndex ("lookup" ).setSource (Map .of ("data " , lookupData [i ], "l" , i )));
115
+ docs .add (client ().prepareIndex ("lookup" ).setSource (Map .of ("key " , lookupData [i ], "l" , i )));
115
116
}
116
117
Collections .sort (expected );
117
118
indexRandom (true , true , docs );
118
119
}
119
120
}
120
121
121
- private void runLookup (PopulateIndices populateIndices ) throws IOException {
122
+ private void runLookup (DataType keyType , PopulateIndices populateIndices ) throws IOException {
122
123
client ().admin ()
123
124
.indices ()
124
125
.prepareCreate ("source" )
125
126
.setSettings (Settings .builder ().put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 ))
126
- .setMapping ("data " , "type=keyword" )
127
+ .setMapping ("key " , "type=" + keyType . esType () )
127
128
.get ();
128
129
client ().admin ()
129
130
.indices ()
@@ -134,7 +135,7 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
134
135
// TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command.
135
136
.put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 )
136
137
)
137
- .setMapping ("data " , "type=keyword" , "l" , "type=long" )
138
+ .setMapping ("key " , "type=" + keyType . esType () , "l" , "type=long" )
138
139
.get ();
139
140
client ().admin ().cluster ().prepareHealth (TEST_REQUEST_TIMEOUT ).setWaitForGreenStatus ().get ();
140
141
@@ -189,9 +190,9 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
189
190
ValuesSourceReaderOperator .Factory reader = new ValuesSourceReaderOperator .Factory (
190
191
List .of (
191
192
new ValuesSourceReaderOperator .FieldInfo (
192
- "data " ,
193
- ElementType . BYTES_REF ,
194
- shard -> searchContext .getSearchExecutionContext ().getFieldType ("data " ).blockLoader (null )
193
+ "key " ,
194
+ PlannerUtils . toElementType ( keyType ) ,
195
+ shard -> searchContext .getSearchExecutionContext ().getFieldType ("key " ).blockLoader (null )
195
196
)
196
197
),
197
198
List .of (new ValuesSourceReaderOperator .ShardContext (searchContext .getSearchExecutionContext ().getIndexReader (), () -> {
@@ -217,9 +218,9 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
217
218
QueryPragmas .ENRICH_MAX_WORKERS .get (Settings .EMPTY ),
218
219
1 ,
219
220
ctx -> internalCluster ().getInstance (TransportEsqlQueryAction .class , finalNodeWithShard ).getLookupFromIndexService (),
220
- DataType . KEYWORD ,
221
+ keyType ,
221
222
"lookup" ,
222
- "data " ,
223
+ "key " ,
223
224
List .of (new Alias (Source .EMPTY , "l" , new ReferenceAttribute (Source .EMPTY , "l" , DataType .LONG ))),
224
225
Source .EMPTY
225
226
);
@@ -231,10 +232,16 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
231
232
List .of (reader .get (driverContext ), lookup .get (driverContext )),
232
233
new PageConsumerOperator (page -> {
233
234
try {
234
- BytesRefVector dataBlock = page .< BytesRefBlock > getBlock (1 ). asVector ( );
235
+ Block keyBlock = page .getBlock (1 );
235
236
LongVector loadedBlock = page .<LongBlock >getBlock (2 ).asVector ();
236
237
for (int p = 0 ; p < page .getPositionCount (); p ++) {
237
- results .add (dataBlock .getBytesRef (p , new BytesRef ()).utf8ToString () + ":" + loadedBlock .getLong (p ));
238
+ List <Object > key = BlockTestUtils .valuesAtPositions (keyBlock , p , p + 1 ).get (0 );
239
+ assertThat (key , hasSize (1 ));
240
+ Object keyValue = key .get (0 );
241
+ if (keyValue instanceof BytesRef b ) {
242
+ keyValue = b .utf8ToString ();
243
+ }
244
+ results .add (keyValue + ":" + loadedBlock .getLong (p ));
238
245
}
239
246
} finally {
240
247
page .releaseBlocks ();
0 commit comments