Skip to content

Commit 9c6d560

Browse files
authored
Merge branch 'main' into fix/ftf_lj
2 parents 4f2e84d + 8cb5b89 commit 9c6d560

File tree

11 files changed

+276
-162
lines changed

11 files changed

+276
-162
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/vector/VectorScorerBenchmark.java renamed to benchmarks/src/main/java/org/elasticsearch/benchmark/vector/Int7uScorerBenchmark.java

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -55,24 +55,23 @@
5555
/**
5656
* Benchmark that compares various scalar quantized vector similarity function
5757
* implementations;: scalar, lucene's panama-ized, and Elasticsearch's native.
58-
* Run with ./gradlew -p benchmarks run --args 'VectorScorerBenchmark'
58+
* Run with ./gradlew -p benchmarks run --args 'Int7uScorerBenchmark'
5959
*/
60-
public class VectorScorerBenchmark {
60+
public class Int7uScorerBenchmark {
6161

6262
static {
6363
LogConfigurator.configureESLogging(); // native access requires logging to be initialized
6464
}
6565

6666
@Param({ "96", "768", "1024" })
67-
int dims;
68-
int size = 2; // there are only two vectors to compare
67+
public int dims;
68+
final int size = 2; // there are only two vectors to compare
6969

7070
Directory dir;
7171
IndexInput in;
7272
VectorScorerFactory factory;
7373

74-
byte[] vec1;
75-
byte[] vec2;
74+
byte[] vec1, vec2;
7675
float vec1Offset;
7776
float vec2Offset;
7877
float scoreCorrectionConstant;
@@ -139,39 +138,6 @@ public void setup() throws IOException {
139138
nativeDotScorerQuery = factory.getInt7SQVectorScorer(VectorSimilarityFunction.DOT_PRODUCT, values, queryVec).get();
140139
luceneSqrScorerQuery = luceneScorer(values, VectorSimilarityFunction.EUCLIDEAN, queryVec);
141140
nativeSqrScorerQuery = factory.getInt7SQVectorScorer(VectorSimilarityFunction.EUCLIDEAN, values, queryVec).get();
142-
143-
// sanity
144-
var f1 = dotProductLucene();
145-
var f2 = dotProductNative();
146-
var f3 = dotProductScalar();
147-
if (f1 != f2) {
148-
throw new AssertionError("lucene[" + f1 + "] != " + "native[" + f2 + "]");
149-
}
150-
if (f1 != f3) {
151-
throw new AssertionError("lucene[" + f1 + "] != " + "scalar[" + f3 + "]");
152-
}
153-
// square distance
154-
f1 = squareDistanceLucene();
155-
f2 = squareDistanceNative();
156-
f3 = squareDistanceScalar();
157-
if (f1 != f2) {
158-
throw new AssertionError("lucene[" + f1 + "] != " + "native[" + f2 + "]");
159-
}
160-
if (f1 != f3) {
161-
throw new AssertionError("lucene[" + f1 + "] != " + "scalar[" + f3 + "]");
162-
}
163-
164-
var q1 = dotProductLuceneQuery();
165-
var q2 = dotProductNativeQuery();
166-
if (q1 != q2) {
167-
throw new AssertionError("query: lucene[" + q1 + "] != " + "native[" + q2 + "]");
168-
}
169-
170-
var sqr1 = squareDistanceLuceneQuery();
171-
var sqr2 = squareDistanceNativeQuery();
172-
if (sqr1 != sqr2) {
173-
throw new AssertionError("query: lucene[" + q1 + "] != " + "native[" + q2 + "]");
174-
}
175141
}
176142

177143
@TearDown

benchmarks/src/main/java/org/elasticsearch/benchmark/vector/JDKVectorInt7uBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class JDKVectorInt7uBenchmark {
5252

5353
Arena arena;
5454

55-
@Param({ "1", "128", "207", "256", "300", "512", "702", "1024" })
55+
@Param({ "1", "128", "207", "256", "300", "512", "702", "1024", "1536", "2048" })
5656
public int size;
5757

5858
@Setup(Level.Iteration)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.vector;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
13+
14+
import org.apache.lucene.util.Constants;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.junit.BeforeClass;
17+
import org.openjdk.jmh.annotations.Param;
18+
19+
import java.util.Arrays;
20+
21+
public class Int7uScorerBenchmarkTests extends ESTestCase {
22+
23+
final double delta = 1e-3;
24+
final int dims;
25+
26+
public Int7uScorerBenchmarkTests(int dims) {
27+
this.dims = dims;
28+
}
29+
30+
@BeforeClass
31+
public static void skipWindows() {
32+
assumeFalse("doesn't work on windows yet", Constants.WINDOWS);
33+
}
34+
35+
public void testDotProduct() throws Exception {
36+
for (int i = 0; i < 100; i++) {
37+
var bench = new Int7uScorerBenchmark();
38+
bench.dims = dims;
39+
bench.setup();
40+
try {
41+
float expected = bench.dotProductScalar();
42+
assertEquals(expected, bench.dotProductLucene(), delta);
43+
assertEquals(expected, bench.dotProductNative(), delta);
44+
45+
expected = bench.dotProductLuceneQuery();
46+
assertEquals(expected, bench.dotProductNativeQuery(), delta);
47+
} finally {
48+
bench.teardown();
49+
}
50+
}
51+
}
52+
53+
public void testSquareDistance() throws Exception {
54+
for (int i = 0; i < 100; i++) {
55+
var bench = new Int7uScorerBenchmark();
56+
bench.dims = dims;
57+
bench.setup();
58+
try {
59+
float expected = bench.squareDistanceScalar();
60+
assertEquals(expected, bench.squareDistanceLucene(), delta);
61+
assertEquals(expected, bench.squareDistanceNative(), delta);
62+
63+
expected = bench.squareDistanceLuceneQuery();
64+
assertEquals(expected, bench.squareDistanceNativeQuery(), delta);
65+
} finally {
66+
bench.teardown();
67+
}
68+
}
69+
}
70+
71+
@ParametersFactory
72+
public static Iterable<Object[]> parametersFactory() {
73+
try {
74+
var params = Int7uScorerBenchmark.class.getField("dims").getAnnotationsByType(Param.class)[0].value();
75+
return () -> Arrays.stream(params).map(Integer::parseInt).map(i -> new Object[] { i }).iterator();
76+
} catch (NoSuchFieldException e) {
77+
throw new AssertionError(e);
78+
}
79+
}
80+
}

build-tools/src/main/java/org/elasticsearch/gradle/testclusters/MockApmServer.java

Lines changed: 90 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,30 @@
99

1010
package org.elasticsearch.gradle.testclusters;
1111

12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import com.fasterxml.jackson.databind.node.ObjectNode;
1214
import com.sun.net.httpserver.HttpExchange;
1315
import com.sun.net.httpserver.HttpHandler;
1416
import com.sun.net.httpserver.HttpServer;
1517

18+
import org.apache.commons.io.IOUtils;
19+
import org.apache.commons.lang3.stream.Streams;
1620
import org.gradle.api.logging.Logger;
1721
import org.gradle.api.logging.Logging;
22+
import org.slf4j.LoggerFactory;
1823

24+
import java.io.BufferedReader;
1925
import java.io.ByteArrayOutputStream;
2026
import java.io.IOException;
2127
import java.io.InputStream;
28+
import java.io.InputStreamReader;
2229
import java.io.OutputStream;
2330
import java.net.InetSocketAddress;
31+
import java.util.Arrays;
32+
import java.util.regex.Pattern;
33+
import java.util.stream.Collectors;
34+
35+
import javax.annotation.concurrent.NotThreadSafe;
2436

2537
/**
2638
* This is a server which just accepts lines of JSON code and if the JSON
@@ -32,102 +44,127 @@
3244
* <p>
3345
* The HTTP server used is the JDK embedded com.sun.net.httpserver
3446
*/
47+
@NotThreadSafe
3548
public class MockApmServer {
3649
private static final Logger logger = Logging.getLogger(MockApmServer.class);
37-
private int port;
50+
private static final org.slf4j.Logger log = LoggerFactory.getLogger(MockApmServer.class);
51+
private final Pattern metricFilter;
52+
private final Pattern transactionFilter;
53+
private final Pattern transactionExcludesFilter;
3854

39-
public MockApmServer(int port) {
40-
this.port = port;
41-
}
55+
private HttpServer instance;
4256

43-
/**
44-
* Simple main that starts a mock APM server and prints the port it is
45-
* running on. This is not needed
46-
* for testing, it is just a convenient template for trying things out
47-
* if you want play around.
48-
*/
49-
public static void main(String[] args) throws IOException, InterruptedException {
50-
MockApmServer server = new MockApmServer(9999);
51-
server.start();
57+
public MockApmServer(String metricFilter, String transactionFilter, String transactionExcludesFilter) {
58+
this.metricFilter = createWildcardPattern(metricFilter);
59+
this.transactionFilter = createWildcardPattern(transactionFilter);
60+
this.transactionExcludesFilter = createWildcardPattern(transactionExcludesFilter);
5261
}
5362

54-
private static volatile HttpServer instance;
63+
private Pattern createWildcardPattern(String filter) {
64+
if (filter == null || filter.isEmpty()) {
65+
return null;
66+
}
67+
var pattern = Arrays.stream(filter.split(",\\s*"))
68+
.map(Pattern::quote)
69+
.map(s -> s.replace("*", "\\E.*\\Q"))
70+
.collect(Collectors.joining(")|(", "(", ")"));
71+
return Pattern.compile(pattern);
72+
}
5573

5674
/**
5775
* Start the Mock APM server. Just returns empty JSON structures for every incoming message
5876
*
59-
* @return - the port the Mock APM server started on
6077
* @throws IOException
6178
*/
62-
public synchronized int start() throws IOException {
79+
public void start() throws IOException {
6380
if (instance != null) {
64-
String hostname = instance.getAddress().getHostName();
65-
int port = instance.getAddress().getPort();
66-
logger.lifecycle("MockApmServer is already running. Reusing on address:port " + hostname + ":" + port);
67-
return port;
81+
throw new IllegalStateException("MockApmServer already started");
6882
}
69-
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", port);
83+
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 0);
7084
HttpServer server = HttpServer.create(addr, 10);
71-
server.createContext("/exit", new ExitHandler());
7285
server.createContext("/", new RootHandler());
73-
7486
server.start();
7587
instance = server;
7688
logger.lifecycle("MockApmServer started on port " + server.getAddress().getPort());
77-
return server.getAddress().getPort();
7889
}
7990

8091
public int getPort() {
81-
return port;
92+
if (instance == null) {
93+
throw new IllegalStateException("MockApmServer not started");
94+
}
95+
return instance.getAddress().getPort();
8296
}
8397

8498
/**
8599
* Stop the server gracefully if possible
86100
*/
87-
public synchronized void stop() {
88-
logger.lifecycle("stopping apm server");
89-
instance.stop(1);
90-
instance = null;
101+
public void stop() {
102+
if (instance != null) {
103+
logger.lifecycle("stopping apm server");
104+
instance.stop(1);
105+
instance = null;
106+
}
91107
}
92108

93109
class RootHandler implements HttpHandler {
94110
public void handle(HttpExchange t) {
95111
try {
96112
InputStream body = t.getRequestBody();
97-
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
98-
byte[] buffer = new byte[8 * 1024];
99-
int lengthRead;
100-
while ((lengthRead = body.read(buffer)) > 0) {
101-
bytes.write(buffer, 0, lengthRead);
113+
if (metricFilter == null && transactionFilter == null) {
114+
logRequestBody(body);
115+
} else {
116+
logFiltered(body);
102117
}
103-
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
104118

105119
String response = "{}";
106120
t.sendResponseHeaders(200, response.length());
107-
OutputStream os = t.getResponseBody();
108-
os.write(response.getBytes());
109-
os.close();
121+
try (OutputStream os = t.getResponseBody()) {
122+
os.write(response.getBytes());
123+
}
110124
} catch (Exception e) {
111125
e.printStackTrace();
112126
}
113127
}
114-
}
115128

116-
static class ExitHandler implements HttpHandler {
117-
private static final int STOP_TIME = 3;
129+
private void logRequestBody(InputStream body) throws IOException {
130+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
131+
IOUtils.copy(body, bytes);
132+
logger.lifecycle(("MockApmServer reading JSON objects: " + bytes.toString()));
133+
}
118134

119-
public void handle(HttpExchange t) {
120-
try {
121-
InputStream body = t.getRequestBody();
122-
String response = "{}";
123-
t.sendResponseHeaders(200, response.length());
124-
OutputStream os = t.getResponseBody();
125-
os.write(response.getBytes());
126-
os.close();
127-
instance.stop(STOP_TIME);
128-
instance = null;
129-
} catch (Exception e) {
130-
e.printStackTrace();
135+
private void logFiltered(InputStream body) throws IOException {
136+
ObjectMapper mapper = new ObjectMapper();
137+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(body))) {
138+
String line;
139+
String tier = null;
140+
String node = null;
141+
142+
while ((line = reader.readLine()) != null) {
143+
var jsonNode = mapper.readTree(line);
144+
145+
if (jsonNode.has("metadata")) {
146+
node = jsonNode.path("metadata").path("service").path("node").path("configured_name").asText(null);
147+
tier = jsonNode.path("metadata").path("labels").path("node_tier").asText(null);
148+
} else if (transactionFilter != null && jsonNode.has("transaction")) {
149+
var transaction = jsonNode.get("transaction");
150+
var name = transaction.get("name").asText();
151+
if (transactionFilter.matcher(name).matches()
152+
&& (transactionExcludesFilter == null || transactionExcludesFilter.matcher(name).matches() == false)) {
153+
logger.lifecycle("Transaction [{}/{}]: {}", node, tier, transaction);
154+
}
155+
} else if (metricFilter != null && jsonNode.has("metricset")) {
156+
var metricset = jsonNode.get("metricset");
157+
var samples = (ObjectNode) metricset.get("samples");
158+
for (var name : Streams.of(samples.fieldNames()).toList()) {
159+
if (metricFilter.matcher(name).matches() == false) {
160+
samples.remove(name);
161+
}
162+
}
163+
if (samples.isEmpty() == false) {
164+
logger.lifecycle("Metricset [{}/{}]", node, tier, metricset);
165+
}
166+
}
167+
}
131168
}
132169
}
133170
}

0 commit comments

Comments
 (0)