Skip to content

Commit 6a5ff53

Browse files
authored
Merge branch 'main' into cef_processor
2 parents e7242d6 + 39e594f commit 6a5ff53

File tree

16 files changed

+346
-120
lines changed

16 files changed

+346
-120
lines changed

.buildkite/scripts/dra-workflow.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ echo --- Building release artifacts
7070
$VERSION_QUALIFIER_ARG \
7171
buildReleaseArtifacts \
7272
exportCompressedDockerImages \
73+
exportDockerContexts \
7374
:distribution:generateDependenciesReport
7475

7576
PATH="$PATH:${JAVA_HOME}/bin" # Required by the following script

distribution/docker/build.gradle

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ dependencies {
132132
fips "org.bouncycastle:bctls-fips:1.0.19"
133133
}
134134

135-
ext.expansions = { Architecture architecture, DockerBase base ->
135+
ext.expansions = { Architecture architecture, DockerBase base, String publicationContext = '' ->
136136
def (major, minor) = VersionProperties.elasticsearch.split("\\.")
137137

138138
// We tag our Docker images with various pieces of information, including a timestamp
@@ -152,6 +152,7 @@ ext.expansions = { Architecture architecture, DockerBase base ->
152152
'license' : base == DockerBase.IRON_BANK ? 'Elastic License 2.0' : 'Elastic-License-2.0',
153153
'package_manager' : base.packageManager,
154154
'docker_base' : base.name().toLowerCase(),
155+
'docker_context' : publicationContext,
155156
'version' : VersionProperties.elasticsearch,
156157
'major_minor_version': "${major}.${minor}",
157158
'retry' : ShellRetry
@@ -179,9 +180,9 @@ private static String taskName(String prefix, Architecture architecture, DockerB
179180
suffix
180181
}
181182

182-
ext.dockerBuildContext = { Architecture architecture, DockerBase base ->
183+
ext.dockerBuildContext = { Architecture architecture, DockerBase base, String publicationContext = '' ->
183184
copySpec {
184-
final Map<String, String> varExpansions = expansions(architecture, base)
185+
final Map<String, String> varExpansions = expansions(architecture, base, publicationContext)
185186
final Path projectDir = project.projectDir.toPath()
186187

187188
if (base == DockerBase.IRON_BANK) {
@@ -291,17 +292,22 @@ tasks.named("composeUp").configure {
291292
dependsOn tasks.named("preProcessFixture")
292293
}
293294

294-
void addBuildDockerContextTask(Architecture architecture, DockerBase base) {
295+
296+
def exportDockerImages = tasks.register("exportDockerImages")
297+
def exportCompressedDockerImages = tasks.register("exportCompressedDockerImages")
298+
def exportDockerContexts = tasks.register("exportDockerContexts")
299+
300+
void addBuildDockerContextTask(Architecture architecture, DockerBase base, String taskSuffix = 'DockerContext', String classifier = "docker-build-context") {
295301
String configDirectory = base == DockerBase.IRON_BANK ? 'scripts' : 'config'
296302
String arch = architecture == Architecture.AARCH64 ? '-aarch64' : ''
297303

298304
final TaskProvider<Tar> buildDockerContextTask =
299-
tasks.register(taskName('build', architecture, base, 'DockerContext'), Tar) {
305+
tasks.register(taskName('build', architecture, base, taskSuffix), Tar) {
300306
archiveExtension = 'tar.gz'
301307
compression = Compression.GZIP
302-
archiveClassifier = "docker-build-context${arch}"
308+
archiveClassifier = "${classifier}${arch}"
303309
archiveBaseName = "elasticsearch${base.suffix}"
304-
with dockerBuildContext(architecture, base)
310+
with dockerBuildContext(architecture, base, classifier)
305311

306312
into(configDirectory) {
307313
from(configurations.log4jConfig) {
@@ -344,6 +350,10 @@ void addBuildDockerContextTask(Architecture architecture, DockerBase base) {
344350
onlyIf("$architecture supported") { serviceProvider.get().isArchitectureSupported(architecture) }
345351
}
346352

353+
exportDockerContexts.configure {
354+
dependsOn buildDockerContextTask
355+
}
356+
347357
if (base == DockerBase.IRON_BANK) {
348358
tasks.named("assemble").configure {
349359
dependsOn(buildDockerContextTask)
@@ -578,12 +588,14 @@ for (final Architecture architecture : Architecture.values()) {
578588
addTransformDockerContextTask(architecture, base)
579589
addBuildDockerImageTask(architecture, base)
580590
}
591+
if(base == DockerBase.DEFAULT) {
592+
// Add additional docker hub specific context which we use solely for publishing to docker hub.
593+
// At the moment it only differs in not labels added that we need for openshift certification
594+
addBuildDockerContextTask(architecture, base, 'DockerHubContext', "docker-hub-build-context")
595+
}
581596
}
582597
}
583598

584-
def exportDockerImages = tasks.register("exportDockerImages")
585-
def exportCompressedDockerImages = tasks.register("exportCompressedDockerImages")
586-
587599
/*
588600
* The export subprojects write out the generated Docker images to disk, so
589601
* that they can be easily reloaded, for example into a VM for distribution testing

distribution/docker/src/docker/Dockerfile.default

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,15 @@ LABEL org.label-schema.build-date="${build_date}" \\
139139
org.opencontainers.image.vendor="Elastic" \\
140140
org.opencontainers.image.version="${version}"
141141

142+
<% if (docker_context != 'docker-hub-build-context') { %>
142143
LABEL name="Elasticsearch" \\
143144
maintainer="[email protected]" \\
144145
vendor="Elastic" \\
145146
version="${version}" \\
146147
release="1" \\
147148
summary="Elasticsearch" \\
148149
description="You know, for search."
150+
<% } %>
149151

150152
RUN mkdir /licenses && ln LICENSE.txt /licenses/LICENSE
151153

docs/changelog/126637.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126637
2+
summary: Improve resiliency of `UpdateTimeSeriesRangeService`
3+
area: TSDB
4+
type: bug
5+
issues: []

docs/release-notes/breaking-changes.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ Aggregations:
2020
Allocation:
2121
* Increase minimum threshold in shard balancer [#115831](https://github.com/elastic/elasticsearch/pull/115831)
2222
* Remove `cluster.routing.allocation.disk.watermark.enable_for_single_data_node` setting [#114207](https://github.com/elastic/elasticsearch/pull/114207)
23-
* Remove cluster state from `/_cluster/reroute` response [#114231](https://github.com/elastic/elasticsearch/pull/114231) (issue: {es-issue}88978[#88978])
23+
* Remove cluster state from `/_cluster/reroute` response [#114231](https://github.com/elastic/elasticsearch/pull/114231) (issue: https://github.com/elastic/elasticsearch/issues/88978[#88978])
2424

2525
Analysis:
2626
* Snowball stemmers have been upgraded [#114146](https://github.com/elastic/elasticsearch/pull/114146)
2727
* The 'german2' stemmer is now an alias for the 'german' snowball stemmer [#113614](https://github.com/elastic/elasticsearch/pull/113614)
28-
* The 'persian' analyzer has stemmer by default [#113482](https://github.com/elastic/elasticsearch/pull/113482) (issue: {es-issue}113050[#113050])
28+
* The 'persian' analyzer has stemmer by default [#113482](https://github.com/elastic/elasticsearch/pull/113482) (issue: https://github.com/elastic/elasticsearch/issues/113050[#113050])
2929
* The Korean dictionary for Nori has been updated [#114124](https://github.com/elastic/elasticsearch/pull/114124)
3030

3131
Authentication:
@@ -56,32 +56,32 @@ Indices APIs:
5656
Infra/Core:
5757
* Change Elasticsearch timeouts to 429 response instead of 5xx [#116026](https://github.com/elastic/elasticsearch/pull/116026)
5858
* Limit `ByteSizeUnit` to 2 decimals [#120142](https://github.com/elastic/elasticsearch/pull/120142)
59-
* Remove `client.type` setting [#118192](https://github.com/elastic/elasticsearch/pull/118192) (issue: {es-issue}104574[#104574])
59+
* Remove `client.type` setting [#118192](https://github.com/elastic/elasticsearch/pull/118192) (issue: https://github.com/elastic/elasticsearch/issues/104574[#104574])
6060
* Remove any references to org.elasticsearch.core.RestApiVersion#V_7 [#118103](https://github.com/elastic/elasticsearch/pull/118103)
6161

6262
Infra/Logging:
63-
* Change `deprecation.elasticsearch` keyword to `elasticsearch.deprecation` [#117933](https://github.com/elastic/elasticsearch/pull/117933) (issue: {es-issue}83251[#83251])
64-
* Rename deprecation index template [#125606](https://github.com/elastic/elasticsearch/pull/125606) (issue: {es-issue}125445[#125445])
63+
* Change `deprecation.elasticsearch` keyword to `elasticsearch.deprecation` [#117933](https://github.com/elastic/elasticsearch/pull/117933) (issue: https://github.com/elastic/elasticsearch/issues/83251[#83251])
64+
* Rename deprecation index template [#125606](https://github.com/elastic/elasticsearch/pull/125606) (issue: https://github.com/elastic/elasticsearch/issues/125445[#125445])
6565

6666
Infra/Metrics:
6767
* Deprecated tracing.apm.* settings got removed. [#119926](https://github.com/elastic/elasticsearch/pull/119926)
6868

6969
Infra/REST API:
70-
* Output a consistent format when generating error json [#90529](https://github.com/elastic/elasticsearch/pull/90529) (issue: {es-issue}89387[#89387])
70+
* Output a consistent format when generating error json [#90529](https://github.com/elastic/elasticsearch/pull/90529) (issue: https://github.com/elastic/elasticsearch/issues/89387[#89387])
7171

7272
Ingest Node:
7373
* Remove `ecs` option on `user_agent` processor [#116077](https://github.com/elastic/elasticsearch/pull/116077)
7474
* Remove ignored fallback option on GeoIP processor [#116112](https://github.com/elastic/elasticsearch/pull/116112)
7575

7676
Logs:
77-
* Conditionally enable logsdb by default for data streams matching with logs-*-* pattern. [#121049](https://github.com/elastic/elasticsearch/pull/121049) (issue: {es-issue}106489[#106489])
77+
* Conditionally enable logsdb by default for data streams matching with logs-*-* pattern. [#121049](https://github.com/elastic/elasticsearch/pull/121049) (issue: https://github.com/elastic/elasticsearch/issues/106489[#106489])
7878

7979
Machine Learning:
8080
* Disable machine learning on macOS x86_64 [#104125](https://github.com/elastic/elasticsearch/pull/104125)
8181

8282
Mapping:
8383
* Remove support for type, fields, `copy_to` and boost in metadata field definition [#118825](https://github.com/elastic/elasticsearch/pull/118825)
84-
* Turn `_source` meta fieldmapper's mode attribute into a no-op [#119072](https://github.com/elastic/elasticsearch/pull/119072) (issue: {es-issue}118596[#118596])
84+
* Turn `_source` meta fieldmapper's mode attribute into a no-op [#119072](https://github.com/elastic/elasticsearch/pull/119072) (issue: https://github.com/elastic/elasticsearch/issues/118596[#118596])
8585

8686
Search:
8787
* Adjust `random_score` default field to `_seq_no` field [#118671](https://github.com/elastic/elasticsearch/pull/118671)

modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,14 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
127127

128128
// getWriteIndex() selects the latest added index:
129129
Index head = dataStream.getWriteIndex();
130-
IndexMetadata im = project.getIndexSafe(head);
131-
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
132-
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
133-
Instant newEnd = DataStream.getCanonicalTimestampBound(
134-
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
135-
);
136-
if (newEnd.isAfter(currentEnd)) {
137-
try {
130+
try {
131+
IndexMetadata im = project.getIndexSafe(head);
132+
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
133+
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
134+
Instant newEnd = DataStream.getCanonicalTimestampBound(
135+
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
136+
);
137+
if (newEnd.isAfter(currentEnd)) {
138138
Settings settings = Settings.builder()
139139
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
140140
.build();
@@ -151,17 +151,17 @@ private ProjectMetadata.Builder updateTimeSeriesTemporalRange(ProjectMetadata pr
151151
mBuilder.updateSettings(settings, head.getName());
152152
// Verify that all temporal ranges of each backing index is still valid:
153153
dataStream.validate(mBuilder::get);
154-
} catch (Exception e) {
155-
LOGGER.error(
156-
() -> format(
157-
"unable to update [%s] for data stream [%s] and backing index [%s]",
158-
IndexSettings.TIME_SERIES_END_TIME.getKey(),
159-
dataStream.getName(),
160-
head.getName()
161-
),
162-
e
163-
);
164154
}
155+
} catch (Exception e) {
156+
LOGGER.error(
157+
() -> format(
158+
"unable to update [%s] for data stream [%s] and backing index [%s]",
159+
IndexSettings.TIME_SERIES_END_TIME.getKey(),
160+
dataStream.getName(),
161+
head.getName()
162+
),
163+
e
164+
);
165165
}
166166
}
167167
return mBuilder;

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,19 @@
88
*/
99
package org.elasticsearch.datastreams;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.filter.RegexFilter;
16+
import org.apache.logging.log4j.message.Message;
1117
import org.elasticsearch.cluster.ClusterState;
1218
import org.elasticsearch.cluster.metadata.DataStream;
1319
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1420
import org.elasticsearch.cluster.metadata.ProjectId;
1521
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1622
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.logging.Loggers;
1724
import org.elasticsearch.common.settings.ClusterSettings;
1825
import org.elasticsearch.common.settings.Settings;
1926
import org.elasticsearch.core.TimeValue;
@@ -23,15 +30,22 @@
2330
import org.elasticsearch.threadpool.TestThreadPool;
2431
import org.elasticsearch.threadpool.ThreadPool;
2532
import org.junit.After;
33+
import org.junit.AfterClass;
2634
import org.junit.Before;
35+
import org.junit.BeforeClass;
2736

2837
import java.time.Duration;
2938
import java.time.Instant;
3039
import java.time.temporal.ChronoUnit;
3140
import java.time.temporal.TemporalAmount;
41+
import java.util.ArrayList;
42+
import java.util.HashMap;
3243
import java.util.List;
44+
import java.util.Map;
3345
import java.util.Set;
3446

47+
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
48+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
3549
import static org.hamcrest.Matchers.containsString;
3650
import static org.hamcrest.Matchers.equalTo;
3751
import static org.hamcrest.Matchers.not;
@@ -42,6 +56,22 @@
4256

4357
public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
4458

59+
static MockAppender appender;
60+
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);
61+
62+
@BeforeClass
63+
public static void classInit() throws IllegalAccessException {
64+
appender = new MockAppender("mock_appender");
65+
appender.start();
66+
Loggers.addAppender(testLogger1, appender);
67+
}
68+
69+
@AfterClass
70+
public static void classCleanup() {
71+
Loggers.removeAppender(testLogger1, appender);
72+
appender.stop();
73+
}
74+
4575
private ThreadPool threadPool;
4676
private UpdateTimeSeriesRangeService instance;
4777

@@ -199,6 +229,70 @@ public void testUpdateTimeSeriesTemporalRangeMultipleDataStream() {
199229
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(start));
200230
}
201231

232+
public void testUpdateTimeSeriesTemporalOneBadDataStream() {
233+
String dataStreamName1 = "logs-app1";
234+
String dataStreamName2 = "logs-app2-broken";
235+
String dataStreamName3 = "logs-app3";
236+
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
237+
238+
Instant start = now.minus(90, ChronoUnit.MINUTES);
239+
Instant end = start.plus(30, ChronoUnit.MINUTES);
240+
final var projectId = randomProjectIdOrDefault();
241+
ProjectMetadata.Builder mbBuilder = ProjectMetadata.builder(projectId);
242+
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
243+
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
244+
}
245+
246+
Settings settings = Settings.builder().put("index.mode", "logsdb").build();
247+
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
248+
mbBuilder.put(im, true);
249+
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
250+
var ds2Indices = new ArrayList<>(ds2.getIndices());
251+
ds2Indices.add(im.getIndex());
252+
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
253+
copy.put(
254+
dataStreamName2,
255+
new DataStream(
256+
ds2.getName(),
257+
ds2Indices,
258+
2,
259+
ds2.getMetadata(),
260+
ds2.isHidden(),
261+
ds2.isReplicated(),
262+
ds2.isSystem(),
263+
ds2.isAllowCustomRouting(),
264+
ds2.getIndexMode(),
265+
ds2.getDataLifecycle(),
266+
ds2.getDataStreamOptions(),
267+
ds2.getFailureIndices(),
268+
ds2.rolloverOnWrite(),
269+
ds2.getAutoShardingEvent()
270+
)
271+
);
272+
mbBuilder.dataStreams(copy, Map.of());
273+
274+
now = now.minus(45, ChronoUnit.MINUTES);
275+
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(mbBuilder).build();
276+
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
277+
assertThat(result, not(sameInstance(before)));
278+
final var project = result.getMetadata().getProject(projectId);
279+
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
280+
assertThat(getEndTime(project, dataStreamName1, 0), equalTo(expectedEndTime));
281+
assertThat(getEndTime(project, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
282+
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(expectedEndTime));
283+
284+
String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
285+
assertThat(
286+
message,
287+
equalTo(
288+
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
289+
+ "backing index ["
290+
+ im.getIndex().getName()
291+
+ "]"
292+
)
293+
);
294+
}
295+
202296
public void testUpdateTimeSeriesTemporalRange_multipleProjects() {
203297
String dataStreamName = "logs-app1";
204298
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
@@ -253,4 +347,27 @@ static Instant getStartTime(ProjectMetadata project, String dataStreamName, int
253347
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
254348
}
255349

350+
static class MockAppender extends AbstractAppender {
351+
public LogEvent lastEvent;
352+
353+
MockAppender(final String name) throws IllegalAccessException {
354+
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
355+
}
356+
357+
@Override
358+
public void append(LogEvent event) {
359+
lastEvent = event.toImmutable();
360+
}
361+
362+
Message lastMessage() {
363+
return lastEvent.getMessage();
364+
}
365+
366+
public LogEvent getLastEventAndReset() {
367+
LogEvent toReturn = lastEvent;
368+
lastEvent = null;
369+
return toReturn;
370+
}
371+
}
372+
256373
}

0 commit comments

Comments
 (0)