Skip to content

Commit bb9ee76

Browse files
authored
[8.x] Record whether data streams for logs-*-* exist for logsdb enablement in 9.x (elastic#121025)
Backporting elastic#120708 to 8.x branch. Add LogsPatternUsageService that records whether there are data streams matching with logs-*-* pattern. This is recorded via the new logsdb.prior_logs_usage cluster setting. Upon upgrade to 9.x this can be used to determine whether logsdb should be enabled by default if cluster.logsdb.enabled hasn't been set. The recommended upgrade path to 9.x is always to go to 8.latest. This component will run in clusters with version greater than 8.18.0 but not on 9.0 and newer.
1 parent 442fc1f commit bb9ee76

File tree

6 files changed

+539
-3
lines changed

6 files changed

+539
-3
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {
3636
persistentSettings = Settings.readSettingsFromStream(in);
3737
}
3838

39-
ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
39+
public ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
4040
super(acknowledged);
4141
this.persistentSettings = persistentSettings;
4242
this.transientSettings = transientSettings;

x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
import org.elasticsearch.action.ActionRequest;
1111
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.cluster.metadata.Metadata;
1213
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.common.component.LifecycleListener;
1315
import org.elasticsearch.common.settings.Setting;
1416
import org.elasticsearch.common.settings.Settings;
1517
import org.elasticsearch.index.IndexSettingProvider;
@@ -25,7 +27,10 @@
2527
import java.util.ArrayList;
2628
import java.util.Collection;
2729
import java.util.List;
30+
import java.util.function.Supplier;
2831

32+
import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE;
33+
import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD;
2934
import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING;
3035

3136
public class LogsDBPlugin extends Plugin implements ActionPlugin {
@@ -57,6 +62,19 @@ public Collection<?> createComponents(PluginServices services) {
5762
CLUSTER_LOGSDB_ENABLED,
5863
logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled
5964
);
65+
66+
var clusterService = services.clusterService();
67+
Supplier<Metadata> metadataSupplier = () -> clusterService.state().metadata();
68+
var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier);
69+
clusterService.addLocalNodeMasterListener(historicLogsUsageService);
70+
clusterService.addLifecycleListener(new LifecycleListener() {
71+
72+
@Override
73+
public void beforeStop() {
74+
historicLogsUsageService.offMaster();
75+
}
76+
});
77+
6078
// Nothing to share here:
6179
return super.createComponents(services);
6280
}
@@ -76,7 +94,7 @@ public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(Index
7694

7795
@Override
7896
public List<Setting<?>> getSettings() {
79-
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED);
97+
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE);
8098
}
8199

82100
@Override
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.logsdb;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
12+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.cluster.LocalNodeMasterListener;
15+
import org.elasticsearch.cluster.metadata.Metadata;
16+
import org.elasticsearch.common.regex.Regex;
17+
import org.elasticsearch.common.settings.Setting;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
20+
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.logging.LogManager;
22+
import org.elasticsearch.logging.Logger;
23+
import org.elasticsearch.threadpool.Scheduler;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.function.Supplier;
28+
29+
import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN;
30+
31+
/**
32+
* A component that checks in the background whether there are data streams that match <code>log-*-*</code> pattern and if so records this
33+
* as persistent setting in cluster state. If <code>logs-*-*</code> data stream usage has been found then this component will no longer
34+
* run in the background.
35+
* <p>
36+
* After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no <code>logs-*-*</code> data streams are
37+
* found, then the next check runs after 2 minutes. The schedule time will double if no data streams with <code>logs-*-*</code> pattern
38+
* are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours).
39+
* <p>
40+
* If during a check one or more <code>logs-*-*</code> data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set
41+
* as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings
42+
* of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the
43+
* <code>logs-*-*</code> pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting.
44+
*/
45+
final class LogsPatternUsageService implements LocalNodeMasterListener {
46+
47+
private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class);
48+
private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30);
49+
static final Setting<TimeValue> USAGE_CHECK_MAX_PERIOD = Setting.timeSetting(
50+
"logsdb.usage_check.max_period",
51+
new TimeValue(24, TimeUnit.HOURS),
52+
Setting.Property.NodeScope
53+
);
54+
static final Setting<Boolean> LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting(
55+
"logsdb.prior_logs_usage",
56+
false,
57+
Setting.Property.Dynamic,
58+
Setting.Property.NodeScope
59+
);
60+
61+
private final Client client;
62+
private final Settings nodeSettings;
63+
private final ThreadPool threadPool;
64+
private final Supplier<Metadata> metadataSupplier;
65+
66+
// Initializing to 30s, so first time will run with a delay of 60s:
67+
volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM;
68+
volatile boolean isMaster;
69+
volatile boolean hasPriorLogsUsage;
70+
volatile Scheduler.Cancellable cancellable;
71+
72+
LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier<Metadata> metadataSupplier) {
73+
this.client = client;
74+
this.nodeSettings = nodeSettings;
75+
this.threadPool = threadPool;
76+
this.metadataSupplier = metadataSupplier;
77+
}
78+
79+
@Override
80+
public void onMaster() {
81+
if (cancellable == null || cancellable.isCancelled()) {
82+
isMaster = true;
83+
nextWaitTime = USAGE_CHECK_MINIMUM;
84+
scheduleNext();
85+
}
86+
}
87+
88+
@Override
89+
public void offMaster() {
90+
isMaster = false;
91+
if (cancellable != null && cancellable.isCancelled() == false) {
92+
cancellable.cancel();
93+
cancellable = null;
94+
}
95+
}
96+
97+
void scheduleNext() {
98+
TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings);
99+
nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis()));
100+
scheduleNext(nextWaitTime);
101+
}
102+
103+
void scheduleNext(TimeValue waitTime) {
104+
if (isMaster && hasPriorLogsUsage == false) {
105+
try {
106+
cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic());
107+
} catch (EsRejectedExecutionException e) {
108+
if (e.isExecutorShutdown()) {
109+
LOGGER.debug("Failed to check; Shutting down", e);
110+
} else {
111+
throw e;
112+
}
113+
}
114+
} else {
115+
LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage);
116+
}
117+
}
118+
119+
void check() {
120+
LOGGER.debug("Starting logs-*-* usage check");
121+
if (isMaster) {
122+
var metadata = metadataSupplier.get();
123+
if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) {
124+
LOGGER.debug("Using persistent logs-*-* usage check");
125+
hasPriorLogsUsage = true;
126+
return;
127+
}
128+
129+
if (hasLogsUsage(metadata)) {
130+
updateSetting();
131+
} else {
132+
LOGGER.debug("No usage found; Skipping check");
133+
scheduleNext();
134+
}
135+
} else {
136+
LOGGER.debug("No longer master; Skipping check");
137+
}
138+
}
139+
140+
static boolean hasLogsUsage(Metadata metadata) {
141+
for (var dataStream : metadata.dataStreams().values()) {
142+
if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) {
143+
return true;
144+
}
145+
}
146+
return false;
147+
}
148+
149+
void updateSetting() {
150+
var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build();
151+
var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE);
152+
request.persistentSettings(settingsToUpdate);
153+
client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> {
154+
if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) {
155+
hasPriorLogsUsage = true;
156+
cancellable = null;
157+
} else {
158+
LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]");
159+
scheduleNext(TimeValue.ONE_MINUTE);
160+
}
161+
}, e -> {
162+
LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e);
163+
scheduleNext(TimeValue.ONE_MINUTE);
164+
}));
165+
}
166+
}

x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider {
4848
private static final Logger LOGGER = LogManager.getLogger(LogsdbIndexModeSettingsProvider.class);
49-
private static final String LOGS_PATTERN = "logs-*-*";
49+
static final String LOGS_PATTERN = "logs-*-*";
5050
private static final Set<String> MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects");
5151

5252
private final SyntheticSourceLicenseService syntheticSourceLicenseService;
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.logsdb;
9+
10+
import org.elasticsearch.action.DocWriteResponse;
11+
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
12+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
13+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
14+
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
15+
import org.elasticsearch.action.index.IndexRequest;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
18+
import org.elasticsearch.cluster.metadata.Template;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.datastreams.DataStreamsPlugin;
22+
import org.elasticsearch.plugins.Plugin;
23+
import org.elasticsearch.test.ESSingleNodeTestCase;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.elasticsearch.threadpool.ThreadPoolStats;
26+
27+
import java.util.Collection;
28+
import java.util.List;
29+
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
33+
import static org.hamcrest.Matchers.nullValue;
34+
35+
public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase {
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> getPlugins() {
39+
return List.of(LogsDBPlugin.class, DataStreamsPlugin.class);
40+
}
41+
42+
@Override
43+
protected Settings nodeSettings() {
44+
return Settings.builder().put("logsdb.usage_check.max_period", "1s").build();
45+
}
46+
47+
@Override
48+
protected boolean resetNodeAfterTest() {
49+
return true;
50+
}
51+
52+
public void testLogsPatternUsage() throws Exception {
53+
var template = ComposableIndexTemplate.builder()
54+
.indexPatterns(List.of("logs-*-*"))
55+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
56+
.build();
57+
assertAcked(
58+
client().execute(
59+
TransportPutComposableIndexTemplateAction.TYPE,
60+
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
61+
).actionGet()
62+
);
63+
64+
IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value");
65+
var indexResponse = client().index(indexRequest).actionGet();
66+
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
67+
68+
{
69+
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
70+
.actionGet();
71+
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
72+
}
73+
74+
indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
75+
indexResponse = client().index(indexRequest).actionGet();
76+
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
77+
78+
assertBusy(() -> {
79+
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
80+
.actionGet();
81+
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true"));
82+
});
83+
}
84+
85+
public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception {
86+
var template = ComposableIndexTemplate.builder()
87+
.indexPatterns(List.of("log-*-*"))
88+
.template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null))
89+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
90+
.build();
91+
assertAcked(
92+
client().execute(
93+
TransportPutComposableIndexTemplateAction.TYPE,
94+
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
95+
).actionGet()
96+
);
97+
98+
var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
99+
var indexResponse = client().index(indexRequest).actionGet();
100+
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
101+
102+
ensureGreen("log-myapp-prod");
103+
// Check that LogsPatternUsageService checked three times by checking generic threadpool stats.
104+
// (the LogsPatternUsageService's check is scheduled via the generic threadpool)
105+
var threadPool = getInstanceFromNode(ThreadPool.class);
106+
var beforeStat = getGenericThreadpoolStat(threadPool);
107+
assertBusy(() -> {
108+
var stat = getGenericThreadpoolStat(threadPool);
109+
assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3));
110+
});
111+
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
112+
.actionGet();
113+
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
114+
}
115+
116+
private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) {
117+
var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList();
118+
assertThat(result.size(), equalTo(1));
119+
return result.get(0);
120+
}
121+
122+
@Override
123+
public void tearDown() throws Exception {
124+
// Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave
125+
// persistent cluster settings around.
126+
127+
var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*");
128+
deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
129+
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest));
130+
131+
var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build();
132+
client().admin()
133+
.cluster()
134+
.updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings))
135+
.actionGet();
136+
137+
super.tearDown();
138+
}
139+
}

0 commit comments

Comments
 (0)