Skip to content

Commit a99ee7d

Browse files
authored
Merge branch 'main' into online-prewarming-interface
2 parents 62bea30 + 6012590 commit a99ee7d

File tree

6 files changed

+194
-27
lines changed

6 files changed

+194
-27
lines changed

docs/changelog/126637.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126637
2+
summary: Improve resiliency of `UpdateTimeSeriesRangeService`
3+
area: TSDB
4+
type: bug
5+
issues: []

docs/release-notes/breaking-changes.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ Aggregations:
2020
Allocation:
2121
* Increase minimum threshold in shard balancer [#115831](https://github.com/elastic/elasticsearch/pull/115831)
2222
* Remove `cluster.routing.allocation.disk.watermark.enable_for_single_data_node` setting [#114207](https://github.com/elastic/elasticsearch/pull/114207)
23-
* Remove cluster state from `/_cluster/reroute` response [#114231](https://github.com/elastic/elasticsearch/pull/114231) (issue: {es-issue}88978[#88978])
23+
* Remove cluster state from `/_cluster/reroute` response [#114231](https://github.com/elastic/elasticsearch/pull/114231) (issue: https://github.com/elastic/elasticsearch/issues/88978[#88978])
2424

2525
Analysis:
2626
* Snowball stemmers have been upgraded [#114146](https://github.com/elastic/elasticsearch/pull/114146)
2727
* The 'german2' stemmer is now an alias for the 'german' snowball stemmer [#113614](https://github.com/elastic/elasticsearch/pull/113614)
28-
* The 'persian' analyzer has stemmer by default [#113482](https://github.com/elastic/elasticsearch/pull/113482) (issue: {es-issue}113050[#113050])
28+
* The 'persian' analyzer has stemmer by default [#113482](https://github.com/elastic/elasticsearch/pull/113482) (issue: https://github.com/elastic/elasticsearch/issues/113050[#113050])
2929
* The Korean dictionary for Nori has been updated [#114124](https://github.com/elastic/elasticsearch/pull/114124)
3030

3131
Authentication:
@@ -56,32 +56,32 @@ Indices APIs:
5656
Infra/Core:
5757
* Change Elasticsearch timeouts to 429 response instead of 5xx [#116026](https://github.com/elastic/elasticsearch/pull/116026)
5858
* Limit `ByteSizeUnit` to 2 decimals [#120142](https://github.com/elastic/elasticsearch/pull/120142)
59-
* Remove `client.type` setting [#118192](https://github.com/elastic/elasticsearch/pull/118192) (issue: {es-issue}104574[#104574])
59+
* Remove `client.type` setting [#118192](https://github.com/elastic/elasticsearch/pull/118192) (issue: https://github.com/elastic/elasticsearch/issues/104574[#104574])
6060
* Remove any references to org.elasticsearch.core.RestApiVersion#V_7 [#118103](https://github.com/elastic/elasticsearch/pull/118103)
6161

6262
Infra/Logging:
63-
* Change `deprecation.elasticsearch` keyword to `elasticsearch.deprecation` [#117933](https://github.com/elastic/elasticsearch/pull/117933) (issue: {es-issue}83251[#83251])
64-
* Rename deprecation index template [#125606](https://github.com/elastic/elasticsearch/pull/125606) (issue: {es-issue}125445[#125445])
63+
* Change `deprecation.elasticsearch` keyword to `elasticsearch.deprecation` [#117933](https://github.com/elastic/elasticsearch/pull/117933) (issue: https://github.com/elastic/elasticsearch/issues/83251[#83251])
64+
* Rename deprecation index template [#125606](https://github.com/elastic/elasticsearch/pull/125606) (issue: https://github.com/elastic/elasticsearch/issues/125445[#125445])
6565

6666
Infra/Metrics:
6767
* Deprecated tracing.apm.* settings got removed. [#119926](https://github.com/elastic/elasticsearch/pull/119926)
6868

6969
Infra/REST API:
70-
* Output a consistent format when generating error json [#90529](https://github.com/elastic/elasticsearch/pull/90529) (issue: {es-issue}89387[#89387])
70+
* Output a consistent format when generating error json [#90529](https://github.com/elastic/elasticsearch/pull/90529) (issue: https://github.com/elastic/elasticsearch/issues/89387[#89387])
7171

7272
Ingest Node:
7373
* Remove `ecs` option on `user_agent` processor [#116077](https://github.com/elastic/elasticsearch/pull/116077)
7474
* Remove ignored fallback option on GeoIP processor [#116112](https://github.com/elastic/elasticsearch/pull/116112)
7575

7676
Logs:
77-
* Conditionally enable logsdb by default for data streams matching with logs-*-* pattern. [#121049](https://github.com/elastic/elasticsearch/pull/121049) (issue: {es-issue}106489[#106489])
77+
* Conditionally enable logsdb by default for data streams matching with logs-*-* pattern. [#121049](https://github.com/elastic/elasticsearch/pull/121049) (issue: https://github.com/elastic/elasticsearch/issues/106489[#106489])
7878

7979
Machine Learning:
8080
* Disable machine learning on macOS x86_64 [#104125](https://github.com/elastic/elasticsearch/pull/104125)
8181

8282
Mapping:
8383
* Remove support for type, fields, `copy_to` and boost in metadata field definition [#118825](https://github.com/elastic/elasticsearch/pull/118825)
84-
* Turn `_source` meta fieldmapper's mode attribute into a no-op [#119072](https://github.com/elastic/elasticsearch/pull/119072) (issue: {es-issue}118596[#118596])
84+
* Turn `_source` meta fieldmapper's mode attribute into a no-op [#119072](https://github.com/elastic/elasticsearch/pull/119072) (issue: https://github.com/elastic/elasticsearch/issues/118596[#118596])
8585

8686
Search:
8787
* Adjust `random_score` default field to `_seq_no` field [#118671](https://github.com/elastic/elasticsearch/pull/118671)

modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,14 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
127127

128128
// getWriteIndex() selects the latest added index:
129129
Index head = dataStream.getWriteIndex();
130-
IndexMetadata im = project.getIndexSafe(head);
131-
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
132-
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
133-
Instant newEnd = DataStream.getCanonicalTimestampBound(
134-
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
135-
);
136-
if (newEnd.isAfter(currentEnd)) {
137-
try {
130+
try {
131+
IndexMetadata im = project.getIndexSafe(head);
132+
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
133+
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
134+
Instant newEnd = DataStream.getCanonicalTimestampBound(
135+
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
136+
);
137+
if (newEnd.isAfter(currentEnd)) {
138138
Settings settings = Settings.builder()
139139
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
140140
.build();
@@ -151,17 +151,17 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
151151
mBuilder.updateSettings(settings, head.getName());
152152
// Verify that all temporal ranges of each backing index is still valid:
153153
dataStream.validate(mBuilder::get);
154-
} catch (Exception e) {
155-
LOGGER.error(
156-
() -> format(
157-
"unable to update [%s] for data stream [%s] and backing index [%s]",
158-
IndexSettings.TIME_SERIES_END_TIME.getKey(),
159-
dataStream.getName(),
160-
head.getName()
161-
),
162-
e
163-
);
164154
}
155+
} catch (Exception e) {
156+
LOGGER.error(
157+
() -> format(
158+
"unable to update [%s] for data stream [%s] and backing index [%s]",
159+
IndexSettings.TIME_SERIES_END_TIME.getKey(),
160+
dataStream.getName(),
161+
head.getName()
162+
),
163+
e
164+
);
165165
}
166166
}
167167
return mBuilder;

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,19 @@
88
*/
99
package org.elasticsearch.datastreams;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.filter.RegexFilter;
16+
import org.apache.logging.log4j.message.Message;
1117
import org.elasticsearch.cluster.ClusterState;
1218
import org.elasticsearch.cluster.metadata.DataStream;
1319
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1420
import org.elasticsearch.cluster.metadata.ProjectId;
1521
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1622
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.logging.Loggers;
1724
import org.elasticsearch.common.settings.ClusterSettings;
1825
import org.elasticsearch.common.settings.Settings;
1926
import org.elasticsearch.core.TimeValue;
@@ -23,15 +30,22 @@
2330
import org.elasticsearch.threadpool.TestThreadPool;
2431
import org.elasticsearch.threadpool.ThreadPool;
2532
import org.junit.After;
33+
import org.junit.AfterClass;
2634
import org.junit.Before;
35+
import org.junit.BeforeClass;
2736

2837
import java.time.Duration;
2938
import java.time.Instant;
3039
import java.time.temporal.ChronoUnit;
3140
import java.time.temporal.TemporalAmount;
41+
import java.util.ArrayList;
42+
import java.util.HashMap;
3243
import java.util.List;
44+
import java.util.Map;
3345
import java.util.Set;
3446

47+
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
48+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
3549
import static org.hamcrest.Matchers.containsString;
3650
import static org.hamcrest.Matchers.equalTo;
3751
import static org.hamcrest.Matchers.not;
@@ -42,6 +56,22 @@
4256

4357
public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
4458

59+
static MockAppender appender;
60+
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);
61+
62+
@BeforeClass
63+
public static void classInit() throws IllegalAccessException {
64+
appender = new MockAppender("mock_appender");
65+
appender.start();
66+
Loggers.addAppender(testLogger1, appender);
67+
}
68+
69+
@AfterClass
70+
public static void classCleanup() {
71+
Loggers.removeAppender(testLogger1, appender);
72+
appender.stop();
73+
}
74+
4575
private ThreadPool threadPool;
4676
private UpdateTimeSeriesRangeService instance;
4777

@@ -199,6 +229,70 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
199229
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(start));
200230
}
201231

232+
public void testUpdateTimeSeriesTemporalOneBadDataStream() {
233+
String dataStreamName1 = "logs-app1";
234+
String dataStreamName2 = "logs-app2-broken";
235+
String dataStreamName3 = "logs-app3";
236+
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
237+
238+
Instant start = now.minus(90, ChronoUnit.MINUTES);
239+
Instant end = start.plus(30, ChronoUnit.MINUTES);
240+
final var projectId = randomProjectIdOrDefault();
241+
ProjectMetadata.Builder mbBuilder = ProjectMetadata.builder(projectId);
242+
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
243+
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
244+
}
245+
246+
Settings settings = Settings.builder().put("index.mode", "logsdb").build();
247+
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
248+
mbBuilder.put(im, true);
249+
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
250+
var ds2Indices = new ArrayList<>(ds2.getIndices());
251+
ds2Indices.add(im.getIndex());
252+
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
253+
copy.put(
254+
dataStreamName2,
255+
new DataStream(
256+
ds2.getName(),
257+
ds2Indices,
258+
2,
259+
ds2.getMetadata(),
260+
ds2.isHidden(),
261+
ds2.isReplicated(),
262+
ds2.isSystem(),
263+
ds2.isAllowCustomRouting(),
264+
ds2.getIndexMode(),
265+
ds2.getDataLifecycle(),
266+
ds2.getDataStreamOptions(),
267+
ds2.getFailureIndices(),
268+
ds2.rolloverOnWrite(),
269+
ds2.getAutoShardingEvent()
270+
)
271+
);
272+
mbBuilder.dataStreams(copy, Map.of());
273+
274+
now = now.minus(45, ChronoUnit.MINUTES);
275+
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(mbBuilder).build();
276+
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
277+
assertThat(result, not(sameInstance(before)));
278+
final var project = result.getMetadata().getProject(projectId);
279+
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
280+
assertThat(getEndTime(project, dataStreamName1, 0), equalTo(expectedEndTime));
281+
assertThat(getEndTime(project, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
282+
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(expectedEndTime));
283+
284+
String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
285+
assertThat(
286+
message,
287+
equalTo(
288+
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
289+
+ "backing index ["
290+
+ im.getIndex().getName()
291+
+ "]"
292+
)
293+
);
294+
}
295+
202296
public void testUpdateTimeSeriesTemporalRange_multipleProjects() {
203297
String dataStreamName = "logs-app1";
204298
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
@@ -253,4 +347,27 @@ static Instant getStartTime(ProjectMetadata project, String dataStreamName, int
253347
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
254348
}
255349

350+
static class MockAppender extends AbstractAppender {
351+
public LogEvent lastEvent;
352+
353+
MockAppender(final String name) throws IllegalAccessException {
354+
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
355+
}
356+
357+
@Override
358+
public void append(LogEvent event) {
359+
lastEvent = event.toImmutable();
360+
}
361+
362+
Message lastMessage() {
363+
return lastEvent.getMessage();
364+
}
365+
366+
public LogEvent getLastEventAndReset() {
367+
LogEvent toReturn = lastEvent;
368+
lastEvent = null;
369+
return toReturn;
370+
}
371+
}
372+
256373
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
{
2+
"inference.inference": {
3+
"documentation": {
4+
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/post-inference-api.html",
5+
"description": "Perform inference"
6+
},
7+
"stability": "stable",
8+
"visibility": "public",
9+
"headers": {
10+
"accept": ["application/json"],
11+
"content_type": ["application/json"]
12+
},
13+
"url": {
14+
"paths": [
15+
{
16+
"path": "/_inference/{inference_id}",
17+
"methods": ["POST"],
18+
"parts": {
19+
"inference_id": {
20+
"type": "string",
21+
"description": "The inference Id"
22+
}
23+
}
24+
},
25+
{
26+
"path": "/_inference/{task_type}/{inference_id}",
27+
"methods": ["POST"],
28+
"parts": {
29+
"task_type": {
30+
"type": "string",
31+
"description": "The task type"
32+
},
33+
"inference_id": {
34+
"type": "string",
35+
"description": "The inference Id"
36+
}
37+
}
38+
}
39+
]
40+
},
41+
"body": {
42+
"description": "The inference payload"
43+
}
44+
}
45+
}

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ public static void getClusterStateWithDataStream(
617617
builder.put(dataStreamBuilder.build());
618618
}
619619

620-
private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
620+
public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
621621
Settings.Builder b = Settings.builder()
622622
.put(settings)
623623
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())

0 commit comments

Comments
 (0)