|
| 1 | +/* |
| 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one |
| 3 | + * or more contributor license agreements. Licensed under the Elastic License |
| 4 | + * 2.0; you may not use this file except in compliance with the Elastic License |
| 5 | + * 2.0. |
| 6 | + */ |
| 7 | + |
| 8 | +package org.elasticsearch.xpack.esql.action; |
| 9 | + |
| 10 | +import org.apache.lucene.search.DocIdSetIterator; |
| 11 | +import org.apache.lucene.search.MatchAllDocsQuery; |
| 12 | +import org.apache.lucene.util.BytesRef; |
| 13 | +import org.elasticsearch.action.ActionListener; |
| 14 | +import org.elasticsearch.action.index.IndexRequestBuilder; |
| 15 | +import org.elasticsearch.action.support.PlainActionFuture; |
| 16 | +import org.elasticsearch.cluster.metadata.IndexMetadata; |
| 17 | +import org.elasticsearch.common.breaker.CircuitBreakingException; |
| 18 | +import org.elasticsearch.common.settings.Settings; |
| 19 | +import org.elasticsearch.common.unit.ByteSizeValue; |
| 20 | +import org.elasticsearch.common.util.BigArrays; |
| 21 | +import org.elasticsearch.common.util.MockBigArrays; |
| 22 | +import org.elasticsearch.common.util.MockPageCacheRecycler; |
| 23 | +import org.elasticsearch.compute.data.BlockFactory; |
| 24 | +import org.elasticsearch.compute.data.BytesRefBlock; |
| 25 | +import org.elasticsearch.compute.data.BytesRefVector; |
| 26 | +import org.elasticsearch.compute.data.ElementType; |
| 27 | +import org.elasticsearch.compute.data.LongBlock; |
| 28 | +import org.elasticsearch.compute.data.LongVector; |
| 29 | +import org.elasticsearch.compute.lucene.DataPartitioning; |
| 30 | +import org.elasticsearch.compute.lucene.LuceneSourceOperator; |
| 31 | +import org.elasticsearch.compute.lucene.ShardContext; |
| 32 | +import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; |
| 33 | +import org.elasticsearch.compute.operator.Driver; |
| 34 | +import org.elasticsearch.compute.operator.DriverContext; |
| 35 | +import org.elasticsearch.compute.operator.DriverRunner; |
| 36 | +import org.elasticsearch.compute.operator.PageConsumerOperator; |
| 37 | +import org.elasticsearch.core.TimeValue; |
| 38 | +import org.elasticsearch.index.IndexService; |
| 39 | +import org.elasticsearch.index.IndexSettings; |
| 40 | +import org.elasticsearch.index.shard.ShardId; |
| 41 | +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; |
| 42 | +import org.elasticsearch.search.SearchService; |
| 43 | +import org.elasticsearch.search.internal.AliasFilter; |
| 44 | +import org.elasticsearch.search.internal.SearchContext; |
| 45 | +import org.elasticsearch.search.internal.ShardSearchRequest; |
| 46 | +import org.elasticsearch.tasks.CancellableTask; |
| 47 | +import org.elasticsearch.tasks.TaskId; |
| 48 | +import org.elasticsearch.threadpool.ThreadPool; |
| 49 | +import org.elasticsearch.xpack.core.async.AsyncExecutionId; |
| 50 | +import org.elasticsearch.xpack.esql.core.expression.Alias; |
| 51 | +import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; |
| 52 | +import org.elasticsearch.xpack.esql.core.tree.Source; |
| 53 | +import org.elasticsearch.xpack.esql.core.type.DataType; |
| 54 | +import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator; |
| 55 | +import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; |
| 56 | +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; |
| 57 | +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; |
| 58 | +import org.elasticsearch.xpack.esql.plugin.TransportEsqlQueryAction; |
| 59 | + |
| 60 | +import java.io.IOException; |
| 61 | +import java.util.ArrayList; |
| 62 | +import java.util.Collections; |
| 63 | +import java.util.List; |
| 64 | +import java.util.Map; |
| 65 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 66 | + |
| 67 | +import static org.elasticsearch.test.ListMatcher.matchesList; |
| 68 | +import static org.elasticsearch.test.MapMatcher.assertMap; |
| 69 | +import static org.hamcrest.Matchers.empty; |
| 70 | + |
| 71 | +public class LookupFromIndexIT extends AbstractEsqlIntegTestCase { |
| 72 | + /** |
| 73 | + * Quick and dirty test for looking up data from a lookup index. |
| 74 | + */ |
| 75 | + public void testLookupIndex() throws IOException { |
| 76 | + // TODO this should *fail* if the target index isn't a lookup type index - it doesn't now. |
| 77 | + int docCount = between(10, 1000); |
| 78 | + List<String> expected = new ArrayList<>(docCount); |
| 79 | + client().admin() |
| 80 | + .indices() |
| 81 | + .prepareCreate("source") |
| 82 | + .setSettings(Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)) |
| 83 | + .setMapping("data", "type=keyword") |
| 84 | + .get(); |
| 85 | + client().admin() |
| 86 | + .indices() |
| 87 | + .prepareCreate("lookup") |
| 88 | + .setSettings( |
| 89 | + Settings.builder() |
| 90 | + .put(IndexSettings.MODE.getKey(), "lookup") |
| 91 | + // TODO lookup index mode doesn't seem to force a single shard. That'll break the lookup command. |
| 92 | + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) |
| 93 | + ) |
| 94 | + .setMapping("data", "type=keyword", "l", "type=long") |
| 95 | + .get(); |
| 96 | + client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get(); |
| 97 | + |
| 98 | + String[] data = new String[] { "aa", "bb", "cc", "dd" }; |
| 99 | + List<IndexRequestBuilder> docs = new ArrayList<>(); |
| 100 | + for (int i = 0; i < docCount; i++) { |
| 101 | + docs.add(client().prepareIndex("source").setSource(Map.of("data", data[i % data.length]))); |
| 102 | + expected.add(data[i % data.length] + ":" + (i % data.length)); |
| 103 | + } |
| 104 | + for (int i = 0; i < data.length; i++) { |
| 105 | + docs.add(client().prepareIndex("lookup").setSource(Map.of("data", data[i], "l", i))); |
| 106 | + } |
| 107 | + Collections.sort(expected); |
| 108 | + indexRandom(true, true, docs); |
| 109 | + |
| 110 | + /* |
| 111 | + * Find the data node hosting the only shard of the source index. |
| 112 | + */ |
| 113 | + SearchService searchService = null; |
| 114 | + String nodeWithShard = null; |
| 115 | + ShardId shardId = null; |
| 116 | + node: for (String node : internalCluster().getNodeNames()) { |
| 117 | + searchService = internalCluster().getInstance(SearchService.class, node); |
| 118 | + for (IndexService idx : searchService.getIndicesService()) { |
| 119 | + if (idx.index().getName().equals("source")) { |
| 120 | + nodeWithShard = node; |
| 121 | + shardId = new ShardId(idx.index(), 0); |
| 122 | + break node; |
| 123 | + } |
| 124 | + } |
| 125 | + } |
| 126 | + if (nodeWithShard == null) { |
| 127 | + throw new IllegalStateException("couldn't find any copy of source index"); |
| 128 | + } |
| 129 | + |
| 130 | + List<String> results = new CopyOnWriteArrayList<>(); |
| 131 | + /* |
| 132 | + * Run the Driver. |
| 133 | + */ |
| 134 | + try ( |
| 135 | + SearchContext searchContext = searchService.createSearchContext( |
| 136 | + new ShardSearchRequest(shardId, System.currentTimeMillis(), AliasFilter.EMPTY, null), |
| 137 | + SearchService.NO_TIMEOUT |
| 138 | + ) |
| 139 | + ) { |
| 140 | + ShardContext esqlContext = new EsPhysicalOperationProviders.DefaultShardContext( |
| 141 | + 0, |
| 142 | + searchContext.getSearchExecutionContext(), |
| 143 | + AliasFilter.EMPTY |
| 144 | + ); |
| 145 | + LuceneSourceOperator.Factory source = new LuceneSourceOperator.Factory( |
| 146 | + List.of(esqlContext), |
| 147 | + ctx -> new MatchAllDocsQuery(), |
| 148 | + DataPartitioning.SEGMENT, |
| 149 | + 1, |
| 150 | + 10000, |
| 151 | + DocIdSetIterator.NO_MORE_DOCS |
| 152 | + ); |
| 153 | + ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory( |
| 154 | + List.of( |
| 155 | + new ValuesSourceReaderOperator.FieldInfo( |
| 156 | + "data", |
| 157 | + ElementType.BYTES_REF, |
| 158 | + shard -> searchContext.getSearchExecutionContext().getFieldType("data").blockLoader(null) |
| 159 | + ) |
| 160 | + ), |
| 161 | + List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> { |
| 162 | + throw new IllegalStateException("can't load source here"); |
| 163 | + })), |
| 164 | + 0 |
| 165 | + ); |
| 166 | + CancellableTask parentTask = new EsqlQueryTask( |
| 167 | + 1, |
| 168 | + "test", |
| 169 | + "test", |
| 170 | + "test", |
| 171 | + null, |
| 172 | + Map.of(), |
| 173 | + Map.of(), |
| 174 | + new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID), |
| 175 | + TEST_REQUEST_TIMEOUT |
| 176 | + ); |
| 177 | + LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory( |
| 178 | + "test", |
| 179 | + parentTask, |
| 180 | + QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY), |
| 181 | + 1, |
| 182 | + internalCluster().getInstance(TransportEsqlQueryAction.class, nodeWithShard).getLookupFromIndexService(), |
| 183 | + DataType.KEYWORD, |
| 184 | + "lookup", |
| 185 | + "data", |
| 186 | + List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))) |
| 187 | + ); |
| 188 | + DriverContext driverContext = driverContext(); |
| 189 | + try ( |
| 190 | + var driver = new Driver( |
| 191 | + driverContext, |
| 192 | + source.get(driverContext), |
| 193 | + List.of(reader.get(driverContext), lookup.get(driverContext)), |
| 194 | + new PageConsumerOperator(page -> { |
| 195 | + try { |
| 196 | + BytesRefVector dataBlock = page.<BytesRefBlock>getBlock(1).asVector(); |
| 197 | + LongVector loadedBlock = page.<LongBlock>getBlock(2).asVector(); |
| 198 | + for (int p = 0; p < page.getPositionCount(); p++) { |
| 199 | + results.add(dataBlock.getBytesRef(p, new BytesRef()).utf8ToString() + ":" + loadedBlock.getLong(p)); |
| 200 | + } |
| 201 | + } finally { |
| 202 | + page.releaseBlocks(); |
| 203 | + } |
| 204 | + }), |
| 205 | + () -> {} |
| 206 | + ) |
| 207 | + ) { |
| 208 | + PlainActionFuture<Void> future = new PlainActionFuture<>(); |
| 209 | + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeWithShard); |
| 210 | + var driverRunner = new DriverRunner(threadPool.getThreadContext()) { |
| 211 | + @Override |
| 212 | + protected void start(Driver driver, ActionListener<Void> driverListener) { |
| 213 | + Driver.start( |
| 214 | + threadPool.getThreadContext(), |
| 215 | + threadPool.executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME), |
| 216 | + driver, |
| 217 | + between(1, 10000), |
| 218 | + driverListener |
| 219 | + ); |
| 220 | + } |
| 221 | + }; |
| 222 | + driverRunner.runToCompletion(List.of(driver), future); |
| 223 | + future.actionGet(TimeValue.timeValueSeconds(30)); |
| 224 | + assertMap(results.stream().sorted().toList(), matchesList(expected)); |
| 225 | + } |
| 226 | + assertDriverContext(driverContext); |
| 227 | + } |
| 228 | + } |
| 229 | + |
| 230 | + /** |
| 231 | + * Creates a {@link BigArrays} that tracks releases but doesn't throw circuit breaking exceptions. |
| 232 | + */ |
| 233 | + private BigArrays bigArrays() { |
| 234 | + return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); |
| 235 | + } |
| 236 | + |
| 237 | + /** |
| 238 | + * A {@link DriverContext} that won't throw {@link CircuitBreakingException}. |
| 239 | + */ |
| 240 | + protected final DriverContext driverContext() { |
| 241 | + var breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1)); |
| 242 | + return new DriverContext(bigArrays(), BlockFactory.getInstance(breaker, bigArrays())); |
| 243 | + } |
| 244 | + |
| 245 | + public static void assertDriverContext(DriverContext driverContext) { |
| 246 | + assertTrue(driverContext.isFinished()); |
| 247 | + assertThat(driverContext.getSnapshot().releasables(), empty()); |
| 248 | + } |
| 249 | +} |
0 commit comments