Skip to content

Commit 8c36f08

Browse files
authored
Improve EarlyDeprecationindexingIT test reliability backport(#105696) #107540
this test intends to test the bulkProcessor2's request consumer (see DeprecationIndexingComponent#getBulkProcessor) scheduling requests before the startup is completed (flush is enabled). To verify this behaviour the flush has to happen before the templates are loaded. To test this reliably the flush interval in the test should be as small as possible (not hardcoded 5s as of now) This commit introduces a setting (not meant to be exposed/documented) to allow for the flush interval to be configured. It also adds additional trace logging to help with troubleshooting. relates #104716
1 parent 46b4a73 commit 8c36f08

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

x-pack/plugin/deprecation/qa/early-deprecation-rest/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ restResources {
2727

2828
testClusters.configureEach {
2929
testDistribution = 'DEFAULT'
30-
setting 'cluster.deprecation_indexing.enabled', 'true'
3130
setting 'xpack.security.enabled', 'false'
3231
setting 'xpack.license.self_generated.type', 'trial'
32+
setting 'cluster.deprecation_indexing.enabled', 'true'
33+
setting 'cluster.deprecation_indexing.flush_interval', '1ms'
34+
setting 'logger.org.elasticsearch.xpack.deprecation','TRACE'
35+
setting 'logger.org.elasticsearch.xpack.deprecation.logging','TRACE'
3336
}
3437

3538
// Test clusters run with security disabled

x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.function.Supplier;
3535

3636
import static org.elasticsearch.xpack.deprecation.DeprecationChecks.SKIP_DEPRECATIONS_SETTING;
37+
import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.DEPRECATION_INDEXING_FLUSH_INTERVAL;
3738

3839
/**
3940
* The plugin class for the Deprecation API
@@ -110,6 +111,11 @@ public Collection<?> createComponents(PluginServices services) {
110111

111112
@Override
112113
public List<Setting<?>> getSettings() {
113-
return List.of(USE_X_OPAQUE_ID_IN_FILTERING, WRITE_DEPRECATION_LOGS_TO_INDEX, SKIP_DEPRECATIONS_SETTING);
114+
return List.of(
115+
USE_X_OPAQUE_ID_IN_FILTERING,
116+
WRITE_DEPRECATION_LOGS_TO_INDEX,
117+
SKIP_DEPRECATIONS_SETTING,
118+
DEPRECATION_INDEXING_FLUSH_INTERVAL
119+
);
114120
}
115121
}

x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingAppender.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.deprecation.logging;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.apache.logging.log4j.core.Appender;
1113
import org.apache.logging.log4j.core.Core;
1214
import org.apache.logging.log4j.core.Filter;
@@ -16,6 +18,7 @@
1618
import org.apache.logging.log4j.core.config.plugins.Plugin;
1719
import org.elasticsearch.action.DocWriteRequest;
1820
import org.elasticsearch.action.index.IndexRequest;
21+
import org.elasticsearch.core.Strings;
1922
import org.elasticsearch.xcontent.XContentType;
2023

2124
import java.util.Objects;
@@ -28,6 +31,7 @@
2831
*/
2932
@Plugin(name = "DeprecationIndexingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
3033
public class DeprecationIndexingAppender extends AbstractAppender {
34+
private static final Logger logger = LogManager.getLogger(DeprecationIndexingAppender.class);
3135
public static final String DEPRECATION_MESSAGES_DATA_STREAM = ".logs-deprecation.elasticsearch-default";
3236

3337
private final Consumer<IndexRequest> requestConsumer;
@@ -40,9 +44,10 @@ public class DeprecationIndexingAppender extends AbstractAppender {
4044

4145
/**
4246
* Creates a new appender.
43-
* @param name the appender's name
44-
* @param filter a filter to apply directly on the appender
45-
* @param layout the layout to use for formatting message. It must return a JSON string.
47+
*
48+
* @param name the appender's name
49+
* @param filter a filter to apply directly on the appender
50+
* @param layout the layout to use for formatting message. It must return a JSON string.
4651
* @param requestConsumer a callback to handle the actual indexing of the log message.
4752
*/
4853
public DeprecationIndexingAppender(String name, Filter filter, Layout<String> layout, Consumer<IndexRequest> requestConsumer) {
@@ -56,6 +61,13 @@ public DeprecationIndexingAppender(String name, Filter filter, Layout<String> la
5661
*/
5762
@Override
5863
public void append(LogEvent event) {
64+
logger.trace(
65+
() -> Strings.format(
66+
"Received deprecation log event. Appender is %s. message = %s",
67+
isEnabled ? "enabled" : "disabled",
68+
event.getMessage().getFormattedMessage()
69+
)
70+
);
5971
if (this.isEnabled == false) {
6072
return;
6173
}
@@ -71,6 +83,7 @@ public void append(LogEvent event) {
7183
/**
7284
* Sets whether this appender is enabled or disabled. When disabled, the appender will
7385
* not perform indexing operations.
86+
*
7487
* @param enabled the enabled status of the appender.
7588
*/
7689
public void setEnabled(boolean enabled) {

x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.logging.ECSJsonLayout;
2828
import org.elasticsearch.common.logging.Loggers;
2929
import org.elasticsearch.common.logging.RateLimitingFilter;
30+
import org.elasticsearch.common.settings.Setting;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.core.TimeValue;
3233
import org.elasticsearch.xpack.core.ClientHelper;
@@ -46,6 +47,13 @@
4647
* It also starts and stops the appender
4748
*/
4849
public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener {
50+
51+
public static final Setting<TimeValue> DEPRECATION_INDEXING_FLUSH_INTERVAL = Setting.timeSetting(
52+
"cluster.deprecation_indexing.flush_interval",
53+
TimeValue.timeValueSeconds(5),
54+
Setting.Property.NodeScope
55+
);
56+
4957
private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class);
5058

5159
private final DeprecationIndexingAppender appender;
@@ -190,6 +198,7 @@ public void enableDeprecationLogIndexing(boolean newEnabled) {
190198
* @return an initialised bulk processor
191199
*/
192200
private BulkProcessor2 getBulkProcessor(Client client, Settings settings) {
201+
TimeValue flushInterval = DEPRECATION_INDEXING_FLUSH_INTERVAL.get(settings);
193202
BulkProcessor2.Listener listener = new DeprecationBulkListener();
194203
return BulkProcessor2.builder((bulkRequest, actionListener) -> {
195204
/*
@@ -198,13 +207,16 @@ private BulkProcessor2 getBulkProcessor(Client client, Settings settings) {
198207
* in-flight-bytes limit has been exceeded. This means that we don't have to worry about bounding pendingRequestsBuffer.
199208
*/
200209
if (flushEnabled.get()) {
210+
logger.trace("Flush is enabled, sending a bulk request");
201211
client.bulk(bulkRequest, actionListener);
202212
flushBuffer(); // just in case something was missed after the first flush
203213
} else {
214+
logger.trace("Flush is disabled, scheduling a bulk request");
215+
204216
// this is an unbounded queue, so the entry will always be accepted
205217
pendingRequestsBuffer.offer(() -> client.bulk(bulkRequest, actionListener));
206218
}
207-
}, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(TimeValue.timeValueSeconds(5)).build();
219+
}, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(flushInterval).build();
208220
}
209221

210222
private static class DeprecationBulkListener implements BulkProcessor2.Listener {

0 commit comments

Comments
 (0)