Skip to content

Commit 1c5c3fe

Browse files
Merge branch 'main' into esql/explain
2 parents ff436ec + 04231e9 commit 1c5c3fe

File tree

19 files changed

+1806
-1737
lines changed

19 files changed

+1806
-1737
lines changed

docs/changelog/129606.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
pr: 129606
2+
summary: Release FORK in tech preview
3+
area: ES|QL
4+
type: feature
5+
issues: []
6+
highlight:
7+
title: Release FORK in tech preview
8+
body: |-
9+
Fork is a foundational building block that allows multiple branches of execution.
10+
Conceptually, fork is:
11+
- a bifurcation of the stream, with all data going to each fork branch, followed by
12+
- a merge of the branches, enhanced with a discriminator column called FORK:
13+
14+
Example:
15+
16+
[source,yaml]
17+
----------------------------
18+
FROM test
19+
| FORK
20+
( WHERE content:"fox" )
21+
( WHERE content:"dog" )
22+
| SORT _fork
23+
----------------------------
24+
25+
The FORK command add a discriminator column called `_fork`:
26+
27+
[source,yaml]
28+
----------------------------
29+
| id | content | _fork |
30+
|-----|-----------|-------|
31+
| 3 | brown fox | fork1 |
32+
| 4 | white dog | fork2 |
33+
----------------------------
34+
35+
notable: true

server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ NeighborQueue scorePostingLists(FieldInfo fieldInfo, KnnCollector knnCollector,
122122
PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring)
123123
throws IOException {
124124
FieldEntry entry = fields.get(fieldInfo.number);
125-
return new MemorySegmentPostingsVisitor(target, indexInput, entry, fieldInfo, needsScoring);
125+
return new MemorySegmentPostingsVisitor(target, indexInput.clone(), entry, fieldInfo, needsScoring);
126126
}
127127

128128
// TODO can we do this in off-heap blocks?

server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.IOException;
3535
import java.util.List;
3636
import java.util.Locale;
37+
import java.util.concurrent.atomic.AtomicBoolean;
3738

3839
import static java.lang.String.format;
3940
import static org.elasticsearch.index.codec.vectors.IVFVectorsFormat.MAX_VECTORS_PER_CLUSTER;
@@ -128,4 +129,49 @@ public void testSimpleOffHeapSize() throws IOException {
128129
}
129130
}
130131
}
132+
133+
// this is a modified version of lucene's TestSearchWithThreads test case
134+
public void testWithThreads() throws Exception {
135+
final int numThreads = random().nextInt(2, 5);
136+
final int numSearches = atLeast(100);
137+
final int numDocs = atLeast(1000);
138+
final int dimensions = random().nextInt(12, 500);
139+
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
140+
for (int docCount = 0; docCount < numDocs; docCount++) {
141+
final Document doc = new Document();
142+
doc.add(new KnnFloatVectorField("f", randomVector(dimensions), VectorSimilarityFunction.EUCLIDEAN));
143+
w.addDocument(doc);
144+
}
145+
w.forceMerge(1);
146+
try (IndexReader reader = DirectoryReader.open(w)) {
147+
final AtomicBoolean failed = new AtomicBoolean();
148+
Thread[] threads = new Thread[numThreads];
149+
for (int threadID = 0; threadID < numThreads; threadID++) {
150+
threads[threadID] = new Thread(() -> {
151+
try {
152+
long totSearch = 0;
153+
for (; totSearch < numSearches && failed.get() == false; totSearch++) {
154+
float[] vector = randomVector(dimensions);
155+
LeafReader leafReader = getOnlyLeafReader(reader);
156+
leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE);
157+
}
158+
assertTrue(totSearch > 0);
159+
} catch (Exception exc) {
160+
failed.set(true);
161+
throw new RuntimeException(exc);
162+
}
163+
});
164+
threads[threadID].setDaemon(true);
165+
}
166+
167+
for (Thread t : threads) {
168+
t.start();
169+
}
170+
171+
for (Thread t : threads) {
172+
t.join();
173+
}
174+
}
175+
}
176+
}
131177
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/rerank.csv-spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ book_no:keyword | title:text | author
9696

9797

9898
reranker after RRF
99-
required_capability: fork
99+
required_capability: fork_v9
100100
required_capability: rrf
101101
required_capability: match_operator_colon
102102
required_capability: rerank

x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44

55
simpleRrf
6-
required_capability: fork
6+
required_capability: fork_v9
77
required_capability: rrf
88
required_capability: match_operator_colon
99

@@ -22,7 +22,7 @@ _score:double | _fork:keyword | emp_no:integer
2222
;
2323

2424
rrfWithMatchAndScore
25-
required_capability: fork
25+
required_capability: fork_v9
2626
required_capability: rrf
2727
required_capability: match_operator_colon
2828

@@ -44,7 +44,7 @@ _score:double | _fork:keyword | _id:keyword
4444
;
4545

4646
rrfWithDisjunctionAndPostFilter
47-
required_capability: fork
47+
required_capability: fork_v9
4848
required_capability: rrf
4949
required_capability: match_operator_colon
5050

@@ -66,7 +66,7 @@ _score:double | _fork:keyword | _id:keyword
6666
;
6767

6868
rrfWithStats
69-
required_capability: fork
69+
required_capability: fork_v9
7070
required_capability: rrf
7171
required_capability: match_operator_colon
7272

@@ -86,7 +86,7 @@ count_fork:long | _fork:keyword
8686
;
8787

8888
rrfWithMultipleForkBranches
89-
required_capability: fork
89+
required_capability: fork_v9
9090
required_capability: rrf
9191
required_capability: match_operator_colon
9292

@@ -112,6 +112,7 @@ _score:double | author:keyword | title:keyword | _fork
112112
;
113113

114114
rrfWithSemanticSearch
115+
required_capability: fork_v9
115116
required_capability: rrf
116117
required_capability: semantic_text_field_caps
117118
required_capability: metadata_score

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class ForkIT extends AbstractEsqlIntegTestCase {
3232

3333
@Before
3434
public void setupIndex() {
35-
assumeTrue("requires FORK capability", EsqlCapabilities.Cap.FORK.isEnabled());
35+
assumeTrue("requires FORK capability", EsqlCapabilities.Cap.FORK_V9.isEnabled());
3636
createAndPopulateIndices();
3737
}
3838

x-pack/plugin/esql/src/main/antlr/EsqlBaseLexer.tokens

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ processingCommand
5858
| changePointCommand
5959
| completionCommand
6060
| sampleCommand
61+
| forkCommand
6162
// in development
6263
| {this.isDevVersion()}? inlinestatsCommand
6364
| {this.isDevVersion()}? lookupCommand
6465
| {this.isDevVersion()}? insistCommand
65-
| {this.isDevVersion()}? forkCommand
6666
| {this.isDevVersion()}? rerankCommand
6767
| {this.isDevVersion()}? rrfCommand
6868
;
@@ -282,7 +282,7 @@ insistCommand
282282
;
283283

284284
forkCommand
285-
: DEV_FORK forkSubQueries
285+
: FORK forkSubQueries
286286
;
287287

288288
forkSubQueries

x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.tokens

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/src/main/antlr/lexer/Fork.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ lexer grammar Fork;
99
//
1010
// Fork
1111
//
12-
DEV_FORK : {this.isDevVersion()}? 'fork' -> pushMode(FORK_MODE);
12+
FORK : 'fork' -> pushMode(FORK_MODE);
1313

1414
mode FORK_MODE;
1515
// commands needs to break out of their mode and the default mode when they encounter RP

0 commit comments

Comments
 (0)