Skip to content

Commit e72a312

Browse files
Merge branch 'main' of https://github.com/Jan-Kazlouski-elastic/elasticsearch into feature/hugging-face-chat-completion-integration
# Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java
2 parents 9044bee + 0f9c1ea commit e72a312

File tree

6 files changed

+63
-10
lines changed

6 files changed

+63
-10
lines changed

docs/changelog/127988.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127988
2+
summary: Add emit time to hash aggregation status
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.http.conn.DnsResolver;
3838
import org.apache.logging.log4j.LogManager;
3939
import org.apache.logging.log4j.Logger;
40+
import org.apache.lucene.store.AlreadyClosedException;
4041
import org.elasticsearch.ElasticsearchException;
4142
import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService;
4243
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -176,6 +177,13 @@ public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) {
176177
if (existing != null && existing.tryIncRef()) {
177178
return existing;
178179
}
180+
181+
if (lifecycle.started() == false) {
182+
// doClose() calls releaseCachedClients() which is also synchronized (this) so if we're STARTED here then the client we
183+
// create will definitely not leak on close.
184+
throw new AlreadyClosedException("S3Service is in state [" + lifecycle + "]");
185+
}
186+
179187
final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver());
180188
Releasable toRelease = httpClient::close;
181189
try {

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,9 @@ tests:
474474
- class: org.elasticsearch.packaging.test.DockerTests
475475
method: test041AmazonCaCertsAreInTheKeystore
476476
issue: https://github.com/elastic/elasticsearch/issues/128007
477+
- class: org.elasticsearch.compute.aggregation.SampleDoubleAggregatorFunctionTests
478+
method: testSimpleWithCranky
479+
issue: https://github.com/elastic/elasticsearch/issues/128024
477480

478481
# Examples:
479482
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ static TransportVersion def(int id) {
174174
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO_BACKPORT_8_19 = def(8_841_0_27);
175175
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT_8_19 = def(8_841_0_28);
176176
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29);
177-
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED_8_19 = def(8_842_0_29);
177+
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED_8_19 = def(8_841_0_30);
178178
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
179179
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
180180
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);
@@ -254,7 +254,8 @@ static TransportVersion def(int id) {
254254
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00);
255255
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00);
256256
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);
257-
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED = def(9_077_0_00);
257+
public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME = def(9_077_0_00);
258+
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED = def(9_078_0_00);
258259

259260
/*
260261
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public String describe() {
110110
*/
111111
private long rowsEmitted;
112112

113+
/**
114+
* Total nanos for emitting the output
115+
*/
116+
protected long emitNanos;
117+
113118
@SuppressWarnings("this-escape")
114119
public HashAggregationOperator(
115120
List<GroupingAggregator.Factory> aggregators,
@@ -223,6 +228,7 @@ public void finish() {
223228
finished = true;
224229
Block[] blocks = null;
225230
IntVector selected = null;
231+
long startInNanos = System.nanoTime();
226232
boolean success = false;
227233
try {
228234
selected = blockHash.nonEmpty();
@@ -247,6 +253,7 @@ public void finish() {
247253
if (success == false && blocks != null) {
248254
Releasables.closeExpectNoException(blocks);
249255
}
256+
emitNanos += System.nanoTime() - startInNanos;
250257
}
251258
}
252259

@@ -269,7 +276,7 @@ public void close() {
269276

270277
@Override
271278
public Operator.Status status() {
272-
return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
279+
return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
273280
}
274281

275282
protected static void checkState(boolean condition, String msg) {
@@ -320,20 +327,24 @@ public static class Status implements Operator.Status {
320327
*/
321328
private final long rowsEmitted;
322329

330+
private final long emitNanos;
331+
323332
/**
324333
* Build.
325334
* @param hashNanos Nanoseconds this operator has spent hashing grouping keys.
326335
* @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
327336
* @param pagesProcessed Count of pages this operator has processed.
328337
* @param rowsReceived Count of rows this operator has received.
329338
* @param rowsEmitted Count of rows this operator has emitted.
339+
* @param emitNanos Nanoseconds this operator has spent emitting the output.
330340
*/
331-
public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
341+
public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted, long emitNanos) {
332342
this.hashNanos = hashNanos;
333343
this.aggregationNanos = aggregationNanos;
334344
this.pagesProcessed = pagesProcessed;
335345
this.rowsReceived = rowsReceived;
336346
this.rowsEmitted = rowsEmitted;
347+
this.emitNanos = emitNanos;
337348
}
338349

339350
protected Status(StreamInput in) throws IOException {
@@ -348,6 +359,11 @@ protected Status(StreamInput in) throws IOException {
348359
rowsReceived = 0;
349360
rowsEmitted = 0;
350361
}
362+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME)) {
363+
emitNanos = in.readVLong();
364+
} else {
365+
emitNanos = 0;
366+
}
351367
}
352368

353369
@Override
@@ -360,6 +376,9 @@ public void writeTo(StreamOutput out) throws IOException {
360376
out.writeVLong(rowsReceived);
361377
out.writeVLong(rowsEmitted);
362378
}
379+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME)) {
380+
out.writeVLong(emitNanos);
381+
}
363382
}
364383

365384
@Override
@@ -402,6 +421,13 @@ public long rowsEmitted() {
402421
return rowsEmitted;
403422
}
404423

424+
/**
425+
* Nanoseconds this operator has spent emitting the output.
426+
*/
427+
public long emitNanos() {
428+
return emitNanos;
429+
}
430+
405431
@Override
406432
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
407433
builder.startObject();
@@ -416,6 +442,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
416442
builder.field("pages_processed", pagesProcessed);
417443
builder.field("rows_received", rowsReceived);
418444
builder.field("rows_emitted", rowsEmitted);
445+
builder.field("emit_nanos", emitNanos);
446+
if (builder.humanReadable()) {
447+
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos));
448+
}
419449
return builder.endObject();
420450

421451
}
@@ -429,12 +459,13 @@ public boolean equals(Object o) {
429459
&& aggregationNanos == status.aggregationNanos
430460
&& pagesProcessed == status.pagesProcessed
431461
&& rowsReceived == status.rowsReceived
432-
&& rowsEmitted == status.rowsEmitted;
462+
&& rowsEmitted == status.rowsEmitted
463+
&& emitNanos == status.emitNanos;
433464
}
434465

435466
@Override
436467
public int hashCode() {
437-
return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
468+
return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
438469
}
439470

440471
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
public class HashAggregationOperatorStatusTests extends AbstractWireSerializingTestCase<HashAggregationOperator.Status> {
1818
public static HashAggregationOperator.Status simple() {
19-
return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222);
19+
return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222, 180017);
2020
}
2121

2222
public static String simpleToJson() {
@@ -28,7 +28,9 @@ public static String simpleToJson() {
2828
"aggregation_time" : "200micros",
2929
"pages_processed" : 123,
3030
"rows_received" : 111,
31-
"rows_emitted" : 222
31+
"rows_emitted" : 222,
32+
"emit_nanos" : 180017,
33+
"emit_time" : "180micros"
3234
}""";
3335
}
3436

@@ -48,6 +50,7 @@ public HashAggregationOperator.Status createTestInstance() {
4850
randomNonNegativeLong(),
4951
randomNonNegativeInt(),
5052
randomNonNegativeLong(),
53+
randomNonNegativeLong(),
5154
randomNonNegativeLong()
5255
);
5356
}
@@ -59,14 +62,16 @@ protected HashAggregationOperator.Status mutateInstance(HashAggregationOperator.
5962
int pagesProcessed = instance.pagesProcessed();
6063
long rowsReceived = instance.rowsReceived();
6164
long rowsEmitted = instance.rowsEmitted();
62-
switch (between(0, 4)) {
65+
long emitNanos = instance.emitNanos();
66+
switch (between(0, 5)) {
6367
case 0 -> hashNanos = randomValueOtherThan(hashNanos, ESTestCase::randomNonNegativeLong);
6468
case 1 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
6569
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
6670
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
6771
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
72+
case 5 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
6873
default -> throw new UnsupportedOperationException();
6974
}
70-
return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
75+
return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
7176
}
7277
}

0 commit comments

Comments
 (0)