Skip to content

Commit 6c4ce88

Browse files
Merge branch 'main' into ES-11934
2 parents 572e063 + 4ca96c1 commit 6c4ce88

File tree

113 files changed

+3510
-1868
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+3510
-1868
lines changed

docs/changelog/129302.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 129302
2+
summary: Move HTTP content aggregation from Netty into `RestController`
3+
area: Network
4+
type: enhancement
5+
issues:
6+
- 120746

libs/simdvec/src/main/java/org/elasticsearch/simdvec/ESVectorUtil.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,20 +237,21 @@ public static void subtract(float[] v1, float[] v2, float[] result) {
237237
}
238238

239239
/**
240-
* calculates the spill-over score for a vector and a centroid, given its residual with
241-
* its actually nearest centroid
240+
* calculates the soar distance for a vector and a centroid
242241
* @param v1 the vector
243242
* @param centroid the centroid
244243
* @param originalResidual the residual with the actually nearest centroid
245-
* @return the spill-over score (soar)
244+
* @param soarLambda the lambda parameter
245+
* @param rnorm distance to the nearest centroid
246+
* @return the soar distance
246247
*/
247-
public static float soarResidual(float[] v1, float[] centroid, float[] originalResidual) {
248+
public static float soarDistance(float[] v1, float[] centroid, float[] originalResidual, float soarLambda, float rnorm) {
248249
if (v1.length != centroid.length) {
249250
throw new IllegalArgumentException("vector dimensions differ: " + v1.length + "!=" + centroid.length);
250251
}
251252
if (originalResidual.length != v1.length) {
252253
throw new IllegalArgumentException("vector dimensions differ: " + originalResidual.length + "!=" + v1.length);
253254
}
254-
return IMPL.soarResidual(v1, centroid, originalResidual);
255+
return IMPL.soarDistance(v1, centroid, originalResidual, soarLambda, rnorm);
255256
}
256257
}

libs/simdvec/src/main/java/org/elasticsearch/simdvec/internal/vectorization/DefaultESVectorUtilSupport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.util.BitUtil;
1313
import org.apache.lucene.util.Constants;
14+
import org.apache.lucene.util.VectorUtil;
1415

1516
final class DefaultESVectorUtilSupport implements ESVectorUtilSupport {
1617

@@ -139,15 +140,16 @@ public void centerAndCalculateOSQStatsDp(float[] target, float[] centroid, float
139140
}
140141

141142
@Override
142-
public float soarResidual(float[] v1, float[] centroid, float[] originalResidual) {
143+
public float soarDistance(float[] v1, float[] centroid, float[] originalResidual, float soarLambda, float rnorm) {
143144
assert v1.length == centroid.length;
144145
assert v1.length == originalResidual.length;
146+
float dsq = VectorUtil.squareDistance(v1, centroid);
145147
float proj = 0;
146148
for (int i = 0; i < v1.length; i++) {
147149
float djk = v1[i] - centroid[i];
148150
proj = fma(djk, originalResidual[i], proj);
149151
}
150-
return proj;
152+
return dsq + soarLambda * proj * proj / rnorm;
151153
}
152154

153155
public static int ipByteBitImpl(byte[] q, byte[] d) {

libs/simdvec/src/main/java/org/elasticsearch/simdvec/internal/vectorization/ESVectorUtilSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ public interface ESVectorUtilSupport {
3737

3838
void centerAndCalculateOSQStatsDp(float[] target, float[] centroid, float[] centered, float[] stats);
3939

40-
float soarResidual(float[] v1, float[] centroid, float[] originalResidual);
40+
float soarDistance(float[] v1, float[] centroid, float[] originalResidual, float soarLambda, float rnorm);
4141

4242
}

libs/simdvec/src/main21/java/org/elasticsearch/simdvec/internal/vectorization/PanamaESVectorUtilSupport.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,14 +368,17 @@ public float calculateOSQLoss(float[] target, float[] interval, float step, floa
368368
}
369369

370370
@Override
371-
public float soarResidual(float[] v1, float[] centroid, float[] originalResidual) {
371+
public float soarDistance(float[] v1, float[] centroid, float[] originalResidual, float soarLambda, float rnorm) {
372372
assert v1.length == centroid.length;
373373
assert v1.length == originalResidual.length;
374374
float proj = 0;
375+
float dsq = 0;
375376
int i = 0;
376377
if (v1.length > 2 * FLOAT_SPECIES.length()) {
377378
FloatVector projVec1 = FloatVector.zero(FLOAT_SPECIES);
378379
FloatVector projVec2 = FloatVector.zero(FLOAT_SPECIES);
380+
FloatVector acc1 = FloatVector.zero(FLOAT_SPECIES);
381+
FloatVector acc2 = FloatVector.zero(FLOAT_SPECIES);
379382
int unrolledLimit = FLOAT_SPECIES.loopBound(v1.length) - FLOAT_SPECIES.length();
380383
for (; i < unrolledLimit; i += 2 * FLOAT_SPECIES.length()) {
381384
// one
@@ -384,13 +387,15 @@ public float soarResidual(float[] v1, float[] centroid, float[] originalResidual
384387
FloatVector originalResidualVec0 = FloatVector.fromArray(FLOAT_SPECIES, originalResidual, i);
385388
FloatVector djkVec0 = v1Vec0.sub(centroidVec0);
386389
projVec1 = fma(djkVec0, originalResidualVec0, projVec1);
390+
acc1 = fma(djkVec0, djkVec0, acc1);
387391

388392
// two
389393
FloatVector v1Vec1 = FloatVector.fromArray(FLOAT_SPECIES, v1, i + FLOAT_SPECIES.length());
390394
FloatVector centroidVec1 = FloatVector.fromArray(FLOAT_SPECIES, centroid, i + FLOAT_SPECIES.length());
391395
FloatVector originalResidualVec1 = FloatVector.fromArray(FLOAT_SPECIES, originalResidual, i + FLOAT_SPECIES.length());
392396
FloatVector djkVec1 = v1Vec1.sub(centroidVec1);
393397
projVec2 = fma(djkVec1, originalResidualVec1, projVec2);
398+
acc2 = fma(djkVec1, djkVec1, acc2);
394399
}
395400
// vector tail
396401
for (; i < FLOAT_SPECIES.loopBound(v1.length); i += FLOAT_SPECIES.length()) {
@@ -399,15 +404,18 @@ public float soarResidual(float[] v1, float[] centroid, float[] originalResidual
399404
FloatVector originalResidualVec = FloatVector.fromArray(FLOAT_SPECIES, originalResidual, i);
400405
FloatVector djkVec = v1Vec.sub(centroidVec);
401406
projVec1 = fma(djkVec, originalResidualVec, projVec1);
407+
acc1 = fma(djkVec, djkVec, acc1);
402408
}
403409
proj += projVec1.add(projVec2).reduceLanes(ADD);
410+
dsq += acc1.add(acc2).reduceLanes(ADD);
404411
}
405412
// tail
406413
for (; i < v1.length; i++) {
407414
float djk = v1[i] - centroid[i];
408415
proj = fma(djk, originalResidual[i], proj);
416+
dsq = fma(djk, djk, dsq);
409417
}
410-
return proj;
418+
return dsq + soarLambda * proj * proj / rnorm;
411419
}
412420

413421
private static final VectorSpecies<Byte> BYTE_SPECIES_128 = ByteVector.SPECIES_128;

libs/simdvec/src/test/java/org/elasticsearch/simdvec/ESVectorUtilTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public void testOsqGridPoints() {
268268
}
269269
}
270270

271-
public void testSoarOverspillScore() {
271+
public void testSoarDistance() {
272272
int size = random().nextInt(128, 512);
273273
float deltaEps = 1e-5f * size;
274274
var vector = new float[size];
@@ -279,8 +279,10 @@ public void testSoarOverspillScore() {
279279
centroid[i] = random().nextFloat();
280280
preResidual[i] = random().nextFloat();
281281
}
282-
var expected = defaultedProvider.getVectorUtilSupport().soarResidual(vector, centroid, preResidual);
283-
var result = defOrPanamaProvider.getVectorUtilSupport().soarResidual(vector, centroid, preResidual);
282+
float soarLambda = random().nextFloat();
283+
float rnorm = random().nextFloat();
284+
var expected = defaultedProvider.getVectorUtilSupport().soarDistance(vector, centroid, preResidual, soarLambda, rnorm);
285+
var result = defOrPanamaProvider.getVectorUtilSupport().soarDistance(vector, centroid, preResidual, soarLambda, rnorm);
284286
assertEquals(expected, result, deltaEps);
285287
}
286288

modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1414

1515
import org.elasticsearch.test.cluster.ElasticsearchCluster;
16+
import org.elasticsearch.test.cluster.FeatureFlag;
1617
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
1718
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
1819
import org.junit.ClassRule;
@@ -28,10 +29,11 @@ public static Iterable<Object[]> parameters() throws Exception {
2829
}
2930

3031
@ClassRule
31-
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").build();
32+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
3233

3334
@Override
3435
protected String getTestRestCluster() {
3536
return cluster.getHttpAddresses();
3637
}
38+
3739
}

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,11 @@ public String getName() {
920920
return ROUTE;
921921
}
922922

923+
@Override
924+
public boolean supportsContentStream() {
925+
return true;
926+
}
927+
923928
@Override
924929
public List<Route> routes() {
925930
return List.of(new Route(RestRequest.Method.POST, ROUTE));

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
4747
if (pendingRead == false) {
4848
long now = timer.absoluteTimeInMillis();
4949
if (now >= lastRead + interval) {
50+
// if you encounter this warning during test make sure you consume content of RestRequest if it's a stream
51+
// or use AggregatingDispatcher that will consume stream fully and produce RestRequest with full content.
5052
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
5153
}
5254
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

0 commit comments

Comments
 (0)