Skip to content

Commit cbe8aa9

Browse files
authored
[ML] Add internal flag to flush api to control whether or not to flush indices (#96803)
When datafeeds send flush requests they don't require the indices to be refreshed (and in fact may be detrimentally expensive in the case of small bucket sizes). This change adds an (internal) flag to control whether or not a refresh is required when flushing.
1 parent 008a9fc commit cbe8aa9

File tree

18 files changed

+176
-25
lines changed

18 files changed

+176
-25
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
135135
public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
136136
public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
137137
public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E");
138+
public static final TransportVersion V_8_500_012 = registerTransportVersion(8_500_012, "BB6F4AF1-A860-4FD4-A138-8150FFBE0ABD");
138139

139140
private static class CurrentHolder {
140-
private static final TransportVersion CURRENT = findCurrent(V_8_500_011);
141+
private static final TransportVersion CURRENT = findCurrent(V_8_500_012);
141142

142143
// finds the pluggable current version, or uses the given fallback
143144
private static TransportVersion findCurrent(TransportVersion fallback) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.core.ml.action;
88

9+
import org.elasticsearch.TransportVersion;
910
import org.elasticsearch.action.ActionType;
1011
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
1112
import org.elasticsearch.common.io.stream.StreamInput;
@@ -62,6 +63,7 @@ public static Request parseRequest(String jobId, XContentParser parser) {
6263

6364
private boolean calcInterim = false;
6465
private boolean waitForNormalization = true;
66+
private boolean refreshRequired = true;
6567
private String start;
6668
private String end;
6769
private String advanceTime;
@@ -77,6 +79,9 @@ public Request(StreamInput in) throws IOException {
7779
advanceTime = in.readOptionalString();
7880
skipTime = in.readOptionalString();
7981
waitForNormalization = in.readBoolean();
82+
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
83+
refreshRequired = in.readBoolean();
84+
}
8085
}
8186

8287
@Override
@@ -88,6 +93,9 @@ public void writeTo(StreamOutput out) throws IOException {
8893
out.writeOptionalString(advanceTime);
8994
out.writeOptionalString(skipTime);
9095
out.writeBoolean(waitForNormalization);
96+
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
97+
out.writeBoolean(refreshRequired);
98+
}
9199
}
92100

93101
public Request(String jobId) {
@@ -138,18 +146,32 @@ public boolean isWaitForNormalization() {
138146
return waitForNormalization;
139147
}
140148

149+
public boolean isRefreshRequired() {
150+
return refreshRequired;
151+
}
152+
141153
/**
142-
* Used internally. Datafeeds do not need to wait renormalization to complete before continuing.
154+
* Used internally. Datafeeds do not need to wait for renormalization to complete before continuing.
143155
*
144156
* For large jobs, renormalization can take minutes, causing datafeeds to needlessly pause execution.
145157
*/
146158
public void setWaitForNormalization(boolean waitForNormalization) {
147159
this.waitForNormalization = waitForNormalization;
148160
}
149161

162+
/**
163+
* Used internally. For datafeeds, there is no need for the results to be searchable after the flush,
164+
* as the datafeed itself does not search them immediately.
165+
*
166+
* Particularly for short bucket spans these refreshes could be a significant cost.
167+
**/
168+
public void setRefreshRequired(boolean refreshRequired) {
169+
this.refreshRequired = refreshRequired;
170+
}
171+
150172
@Override
151173
public int hashCode() {
152-
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization);
174+
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization, refreshRequired);
153175
}
154176

155177
@Override
@@ -164,6 +186,7 @@ public boolean equals(Object obj) {
164186
return Objects.equals(jobId, other.jobId)
165187
&& calcInterim == other.calcInterim
166188
&& waitForNormalization == other.waitForNormalization
189+
&& refreshRequired == other.refreshRequired
167190
&& Objects.equals(start, other.start)
168191
&& Objects.equals(end, other.end)
169192
&& Objects.equals(advanceTime, other.advanceTime)

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.core.ml.job.process.autodetect.output;
88

9+
import org.elasticsearch.TransportVersion;
910
import org.elasticsearch.common.io.stream.StreamInput;
1011
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.common.io.stream.Writeable;
@@ -28,44 +29,57 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
2829
public static final ParseField TYPE = new ParseField("flush");
2930
public static final ParseField ID = new ParseField("id");
3031
public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
32+
public static final ParseField REFRESH_REQUIRED = new ParseField("refresh_required");
3133

3234
public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
3335
TYPE.getPreferredName(),
34-
a -> new FlushAcknowledgement((String) a[0], (Long) a[1])
36+
a -> new FlushAcknowledgement((String) a[0], (Long) a[1], (Boolean) a[2])
3537
);
3638

3739
static {
3840
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
3941
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END);
42+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REFRESH_REQUIRED);
4043
}
4144

4245
private final String id;
4346
private final Instant lastFinalizedBucketEnd;
47+
private final boolean refreshRequired;
4448

45-
public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs) {
49+
public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs, Boolean refreshRequired) {
4650
this.id = id;
4751
// The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null
4852
this.lastFinalizedBucketEnd = (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0)
4953
? Instant.ofEpochMilli(lastFinalizedBucketEndMs)
5054
: null;
55+
this.refreshRequired = refreshRequired == null || refreshRequired;
5156
}
5257

53-
public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd) {
58+
public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd, Boolean refreshRequired) {
5459
this.id = id;
5560
// Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object
5661
long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0;
5762
this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null;
63+
this.refreshRequired = refreshRequired == null || refreshRequired;
5864
}
5965

6066
public FlushAcknowledgement(StreamInput in) throws IOException {
6167
id = in.readString();
6268
lastFinalizedBucketEnd = in.readOptionalInstant();
69+
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
70+
refreshRequired = in.readBoolean();
71+
} else {
72+
refreshRequired = true;
73+
}
6374
}
6475

6576
@Override
6677
public void writeTo(StreamOutput out) throws IOException {
6778
out.writeString(id);
6879
out.writeOptionalInstant(lastFinalizedBucketEnd);
80+
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_012)) {
81+
out.writeBoolean(refreshRequired);
82+
}
6983
}
7084

7185
public String getId() {
@@ -76,6 +90,10 @@ public Instant getLastFinalizedBucketEnd() {
7690
return lastFinalizedBucketEnd;
7791
}
7892

93+
public boolean getRefreshRequired() {
94+
return refreshRequired;
95+
}
96+
7997
@Override
8098
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8199
builder.startObject();
@@ -87,13 +105,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
87105
lastFinalizedBucketEnd.toEpochMilli()
88106
);
89107
}
108+
builder.field(REFRESH_REQUIRED.getPreferredName(), refreshRequired);
90109
builder.endObject();
91110
return builder;
92111
}
93112

94113
@Override
95114
public int hashCode() {
96-
return Objects.hash(id, lastFinalizedBucketEnd);
115+
return Objects.hash(id, lastFinalizedBucketEnd, refreshRequired);
97116
}
98117

99118
@Override
@@ -105,6 +124,8 @@ public boolean equals(Object obj) {
105124
return false;
106125
}
107126
FlushAcknowledgement other = (FlushAcknowledgement) obj;
108-
return Objects.equals(id, other.id) && Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
127+
return Objects.equals(id, other.id)
128+
&& Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd)
129+
&& refreshRequired == other.refreshRequired;
109130
}
110131
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ protected Request createTestInstance() {
1919
if (randomBoolean()) {
2020
request.setWaitForNormalization(randomBoolean());
2121
}
22+
if (randomBoolean()) {
23+
request.setRefreshRequired(randomBoolean());
24+
}
2225
if (randomBoolean()) {
2326
request.setCalcInterim(randomBoolean());
2427
}
@@ -49,6 +52,9 @@ protected Writeable.Reader<Request> instanceReader() {
4952

5053
@Override
5154
protected Request mutateInstanceForVersion(Request instance, TransportVersion version) {
55+
if (version.before(TransportVersion.V_8_500_012)) {
56+
instance.setRefreshRequired(true);
57+
}
5258
return instance;
5359
}
5460
}

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ private static Quantiles createQuantiles() {
575575
}
576576

577577
private static FlushAcknowledgement createFlushAcknowledgement() {
578-
return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant());
578+
return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant(), true);
579579
}
580580

581581
private static class ResultsBuilder {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ protected void taskOperation(
5151
FlushJobParams.Builder paramsBuilder = FlushJobParams.builder();
5252
paramsBuilder.calcInterim(request.getCalcInterim());
5353
paramsBuilder.waitForNormalization(request.isWaitForNormalization());
54+
paramsBuilder.refreshRequired(request.isRefreshRequired());
5455
if (request.getAdvanceTime() != null) {
5556
paramsBuilder.advanceTime(request.getAdvanceTime());
5657
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ Long runLookBack(long startTime, Long endTime) throws Exception {
175175

176176
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
177177
request.setCalcInterim(true);
178+
request.setRefreshRequired(false);
178179
run(lookbackStartTimeMs, lookbackEnd, request);
179180
if (shouldPersistAfterLookback(isLookbackOnly)) {
180181
sendPersistRequest();
@@ -205,6 +206,7 @@ private long skipToStartTime(long startTime) {
205206
// start time is after last checkpoint, thus we need to skip time
206207
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
207208
request.setSkipTime(String.valueOf(startTime));
209+
request.setRefreshRequired(false);
208210
FlushJobAction.Response flushResponse = flushJob(request);
209211
LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli());
210212
return flushResponse.getLastFinalizedBucketEnd().toEpochMilli();
@@ -218,6 +220,7 @@ long runRealtime() throws Exception {
218220
long end = toIntervalStartEpochMs(nowMinusQueryDelay);
219221
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
220222
request.setWaitForNormalization(false);
223+
request.setRefreshRequired(false);
221224
request.setCalcInterim(true);
222225
request.setAdvanceTime(String.valueOf(end));
223226
run(start, end, request);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeV
101101
*/
102102
@Override
103103
public String flushJob(FlushJobParams params) {
104-
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L);
104+
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L, true);
105105
AutodetectResult result = new AutodetectResult(
106106
null,
107107
null,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,10 +394,12 @@ void processResult(AutodetectResult result) {
394394
try {
395395
bulkResultsPersister.executeRequest();
396396
bulkAnnotationsPersister.executeRequest();
397-
persister.commitWrites(
398-
jobId,
399-
EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
400-
);
397+
if (flushAcknowledgement.getRefreshRequired()) {
398+
persister.commitWrites(
399+
jobId,
400+
EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
401+
);
402+
}
401403
} catch (Exception e) {
402404
logger.error(
403405
"["

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private static class FlushAcknowledgementHolder {
7070
private volatile Exception flushException;
7171

7272
private FlushAcknowledgementHolder(String flushId) {
73-
this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L);
73+
this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L, true);
7474
this.latch = new CountDownLatch(1);
7575
}
7676
}

0 commit comments

Comments
 (0)