Skip to content

Commit b017356

Browse files
authored
Doc parsing error logging with throttling (#117828)
* Throttled doc parsing error logging * add test * move throttler to separate class * small changes * refactor unittest * fix test
1 parent 31508f0 commit b017356

File tree

4 files changed

+136
-4
lines changed

4 files changed

+136
-4
lines changed

server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.List;
2222

2323
public class DocumentMapper {
24+
static final NodeFeature INDEX_SORTING_ON_NESTED = new NodeFeature("mapper.index_sorting_on_nested");
25+
2426
private final String type;
2527
private final CompressedXContent mappingSource;
2628
private final MappingLookup mappingLookup;
@@ -29,8 +31,6 @@ public class DocumentMapper {
2931
private final IndexVersion indexVersion;
3032
private final Logger logger;
3133

32-
static final NodeFeature INDEX_SORTING_ON_NESTED = new NodeFeature("mapper.index_sorting_on_nested");
33-
3434
/**
3535
* Create a new {@link DocumentMapper} that holds empty mappings.
3636
* @param mapperService the mapper service that holds the needed components
@@ -72,9 +72,11 @@ public static DocumentMapper createEmpty(MapperService mapperService) {
7272
: "provided source [" + source + "] differs from mapping [" + mapping.toCompressedXContent() + "]";
7373
}
7474

75-
private void maybeLogDebug(Exception ex) {
75+
private void maybeLog(Exception ex) {
7676
if (logger.isDebugEnabled()) {
7777
logger.debug("Error while parsing document: " + ex.getMessage(), ex);
78+
} else if (IntervalThrottler.DOCUMENT_PARSING_FAILURE.accept()) {
79+
logger.error("Error while parsing document: " + ex.getMessage(), ex);
7880
}
7981
}
8082

@@ -125,7 +127,7 @@ public ParsedDocument parse(SourceToParse source) throws DocumentParsingExceptio
125127
try {
126128
return documentParser.parseDocument(source, mappingLookup);
127129
} catch (Exception e) {
128-
maybeLogDebug(e);
130+
maybeLog(e);
129131
throw e;
130132
}
131133
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.mapper;
11+
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
14+
/**
15+
* Throttles tracked operations based on a time interval, restricting them to 1 per N seconds.
16+
*/
17+
enum IntervalThrottler {
18+
DOCUMENT_PARSING_FAILURE(60);
19+
20+
static final int MILLISECONDS_IN_SECOND = 1000;
21+
22+
private final Acceptor acceptor;
23+
24+
IntervalThrottler(long intervalSeconds) {
25+
acceptor = new Acceptor(intervalSeconds * MILLISECONDS_IN_SECOND);
26+
}
27+
28+
/**
29+
* @return true if the operation gets accepted, false if throttled.
30+
*/
31+
boolean accept() {
32+
return acceptor.accept();
33+
}
34+
35+
// Defined separately for testing.
36+
static class Acceptor {
37+
private final long intervalMillis;
38+
private final AtomicBoolean lastAcceptedGuard = new AtomicBoolean(false);
39+
private volatile long lastAcceptedTimeMillis = 0;
40+
41+
Acceptor(long intervalMillis) {
42+
this.intervalMillis = intervalMillis;
43+
}
44+
45+
boolean accept() {
46+
final long now = System.currentTimeMillis();
47+
// Check without guarding first, to reduce contention.
48+
if (now - lastAcceptedTimeMillis > intervalMillis) {
49+
// Check if another concurrent operation succeeded.
50+
if (lastAcceptedGuard.compareAndSet(false, true)) {
51+
try {
52+
// Repeat check under guard protection, so that only one message gets written per interval.
53+
if (now - lastAcceptedTimeMillis > intervalMillis) {
54+
lastAcceptedTimeMillis = now;
55+
return true;
56+
}
57+
} finally {
58+
// Reset guard.
59+
lastAcceptedGuard.set(false);
60+
}
61+
}
62+
}
63+
return false;
64+
}
65+
}
66+
}

server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,18 @@
99

1010
package org.elasticsearch.index.mapper;
1111

12+
import org.apache.logging.log4j.Level;
13+
import org.apache.logging.log4j.LogManager;
14+
import org.apache.logging.log4j.Logger;
15+
import org.apache.logging.log4j.core.LogEvent;
1216
import org.apache.lucene.analysis.Analyzer;
1317
import org.apache.lucene.analysis.core.KeywordAnalyzer;
1418
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
1519
import org.apache.lucene.analysis.standard.StandardAnalyzer;
1620
import org.elasticsearch.common.Strings;
1721
import org.elasticsearch.common.compress.CompressedXContent;
22+
import org.elasticsearch.common.logging.Loggers;
23+
import org.elasticsearch.common.logging.MockAppender;
1824
import org.elasticsearch.common.settings.Settings;
1925
import org.elasticsearch.index.IndexSettings;
2026
import org.elasticsearch.index.IndexVersion;
@@ -493,4 +499,35 @@ public void testDeeplyNestedMapping() throws Exception {
493499
}
494500
}
495501
}
502+
503+
public void testParsingErrorLogging() throws Exception {
504+
MockAppender appender = new MockAppender("mock_appender");
505+
appender.start();
506+
Logger testLogger = LogManager.getLogger(DocumentMapper.class);
507+
Loggers.addAppender(testLogger, appender);
508+
Level originalLogLevel = testLogger.getLevel();
509+
Loggers.setLevel(testLogger, Level.ERROR);
510+
511+
try {
512+
DocumentMapper doc = createDocumentMapper(mapping(b -> b.startObject("value").field("type", "integer").endObject()));
513+
514+
DocumentParsingException e = expectThrows(
515+
DocumentParsingException.class,
516+
() -> doc.parse(source(b -> b.field("value", "foo")))
517+
);
518+
assertThat(e.getMessage(), containsString("failed to parse field [value] of type [integer] in document with id '1'"));
519+
LogEvent event = appender.getLastEventAndReset();
520+
if (event != null) {
521+
assertThat(event.getMessage().getFormattedMessage(), containsString(e.getMessage()));
522+
}
523+
524+
e = expectThrows(DocumentParsingException.class, () -> doc.parse(source(b -> b.field("value", "foo"))));
525+
assertThat(e.getMessage(), containsString("failed to parse field [value] of type [integer] in document with id '1'"));
526+
assertThat(appender.getLastEventAndReset(), nullValue());
527+
} finally {
528+
Loggers.setLevel(testLogger, originalLogLevel);
529+
Loggers.removeAppender(testLogger, appender);
530+
appender.stop();
531+
}
532+
}
496533
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.mapper;
11+
12+
import org.elasticsearch.test.ESTestCase;
13+
14+
public class IntervalThrottlerTests extends ESTestCase {
15+
16+
public void testThrottling() throws Exception {
17+
var throttler = new IntervalThrottler.Acceptor(10);
18+
assertTrue(throttler.accept());
19+
assertFalse(throttler.accept());
20+
assertFalse(throttler.accept());
21+
22+
Thread.sleep(20);
23+
assertTrue(throttler.accept());
24+
assertFalse(throttler.accept());
25+
assertFalse(throttler.accept());
26+
}
27+
}

0 commit comments

Comments
 (0)