Skip to content

Commit 2f73866

Browse files
committed
Adding data loss detection mechamism
1 parent 99e74d0 commit 2f73866

File tree

6 files changed

+95
-16
lines changed

6 files changed

+95
-16
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/mapper/PatternedTextParserBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.benchmark.index.mapper;
1111

1212
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.Parser;
13+
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.ParseException;
1314
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.ParserFactory;
1415
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.Argument;
1516
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.IPv4Argument;
@@ -28,7 +29,6 @@
2829
import org.openjdk.jmh.annotations.Warmup;
2930
import org.openjdk.jmh.infra.Blackhole;
3031

31-
import java.text.ParseException;
3232
import java.text.SimpleDateFormat;
3333
import java.time.format.DateTimeFormatter;
3434
import java.util.ArrayList;
@@ -66,7 +66,7 @@ public void setup() {
6666
}
6767

6868
@Benchmark
69-
public void parseWithCharParser(Blackhole blackhole) {
69+
public void parseWithCharParser(Blackhole blackhole) throws ParseException {
7070
PatternedMessage result = parser.parse(testMessageNoComma);
7171
blackhole.consume(result);
7272
// long timestamp = TimestampFormat.parseTimestamp(dateTimeFormatter, "Oct 05 2023 02:48:00 PM");
@@ -140,7 +140,7 @@ public PatternedMessage parse(String rawMessage) {
140140
Date date = usedFormatter.parse(tsString);
141141
timestampArg = new Timestamp(date.getTime(), usedFormatter.toPattern());
142142
patternBuilder.append("%T");
143-
} catch (ParseException e) {
143+
} catch (java.text.ParseException e) {
144144
patternBuilder.append(tsString);
145145
}
146146
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.logsdb.patternedtext.charparser.api;
9+
10+
import java.util.Locale;
11+
12+
/**
13+
* Exception thrown when there is a potential data loss during parsing.
14+
* This indicates that the actual length of the resulting data (pattern and arguments) does not match the expected length,
15+
* based on the input message.
16+
*/
17+
public class DataLossParseException extends ParseException {
18+
19+
private final int expectedLength;
20+
private final int actualLength;
21+
22+
public DataLossParseException(String message, int expectedLength, int actualLength) {
23+
super(String.format(Locale.ROOT, "%s (expected length: %d, actual length: %d)", message, expectedLength, actualLength));
24+
this.expectedLength = expectedLength;
25+
this.actualLength = actualLength;
26+
}
27+
28+
public int getExpectedLength() {
29+
return expectedLength;
30+
}
31+
32+
public int getActualLength() {
33+
return actualLength;
34+
}
35+
}

x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/patternedtext/charparser/api/Parser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public interface Parser {
4343
* @param rawMessage the input text message to parse, must not be null
4444
* @return a {@link PatternedMessage} containing the extracted template, timestamp, and typed arguments
4545
* @throws IllegalArgumentException if rawMessage is null
46+
* @throws ParseException if a parsing error occurs
4647
*/
47-
PatternedMessage parse(String rawMessage);
48+
PatternedMessage parse(String rawMessage) throws ParseException;
4849
}

x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/patternedtext/charparser/api/UUIDArgument.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
*/
2020
public final class UUIDArgument extends ByteEncodedArgument {
2121

22-
public UUIDArgument(String s, int start, int end) {
22+
public UUIDArgument(String s, int start, int length) {
2323
super(16);
24-
int length = end - start;
2524
if (length == 36) {
2625
// UUID in standard format (e.g., "123e4567-e89b-12d3-a456-426614174000")
27-
UUID uuid = UUID.fromString(s.substring(start, end));
26+
UUID uuid = UUID.fromString(s.substring(start, start + length));
2827
ByteUtils.writeLongLE(uuid.getMostSignificantBits(), encodedBytes, 0);
2928
ByteUtils.writeLongLE(uuid.getLeastSignificantBits(), encodedBytes, 8);
3029
} else if (length == 32) {

x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/patternedtext/charparser/parser/CharParser.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.logsdb.patternedtext.charparser.parser;
99

10+
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.DataLossParseException;
11+
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.ParseException;
1012
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.api.Parser;
1113
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.common.EncodingType;
1214
import org.elasticsearch.xpack.logsdb.patternedtext.charparser.compiler.CompiledSchema;
@@ -78,6 +80,8 @@ public final class CharParser implements Parser {
7880
private final StringBuilder patternedMessage = new StringBuilder();
7981
private final List<Argument<?>> arguments = new ArrayList<>();
8082
private Timestamp timestamp = null;
83+
// a character counter that is used to verify data integrity
84+
int totalCharsAttributedToArgs;
8185

8286
// current subToken state
8387
private int currentSubTokenStartIndex;
@@ -97,10 +101,13 @@ public final class CharParser implements Parser {
97101
private final char[] bufferedSubTokenDelimiters;
98102

99103
// current multi-token state
104+
private int currentMultiTokenStartIndex;
100105
int currentMultiTokenBitmask;
101106
int currentTokenIndex;
102107
final TokenType[] bufferedTokens;
103108
final int[] bufferedTokenBitmasks;
109+
final int[] bufferedTokenStartIndexes;
110+
final int[] bufferedTokenLengths;
104111
private final char[] bufferedTokenDelimiters;
105112
private final int[] currentMultiTokenSubTokenValues;
106113
private int currentMultiTokenSubTokenIndex;
@@ -126,6 +133,8 @@ public CharParser(CompiledSchema compiledSchema) {
126133
bufferedSubTokenLengths = new int[compiledSchema.maxSubTokensPerToken + 1];
127134
bufferedTokens = new TokenType[compiledSchema.maxTokensPerMultiToken + 1];
128135
bufferedTokenBitmasks = new int[compiledSchema.maxTokensPerMultiToken + 1];
136+
bufferedTokenStartIndexes = new int[compiledSchema.maxTokensPerMultiToken + 1];
137+
bufferedTokenLengths = new int[compiledSchema.maxTokensPerMultiToken + 1];
129138
bufferedTokenDelimiters = new char[compiledSchema.maxTokensPerMultiToken + 1];
130139
}
131140

@@ -145,6 +154,7 @@ private void resetTokenState() {
145154
}
146155

147156
private void resetMultiTokenState() {
157+
currentMultiTokenStartIndex = -1;
148158
currentTokenIndex = -1;
149159
currentMultiTokenBitmask = allMultiTokenBitmask;
150160
currentMultiTokenSubTokenIndex = 0;
@@ -155,6 +165,7 @@ private void reset() {
155165
patternedMessage.setLength(0);
156166
arguments.clear();
157167
timestamp = null;
168+
totalCharsAttributedToArgs = 0;
158169
resetSubTokenState();
159170
resetTokenState();
160171
resetMultiTokenState();
@@ -215,7 +226,7 @@ private void reset() {
215226
* @param rawMessage the input message to parse
216227
* @return a {@link PatternedMessage} containing the pattern template, timestamp, and typed arguments
217228
*/
218-
public PatternedMessage parse(String rawMessage) {
229+
public PatternedMessage parse(String rawMessage) throws ParseException {
219230
if (rawMessage == null || rawMessage.isEmpty()) {
220231
return new PatternedMessage("", null, new Argument<?>[0]);
221232
}
@@ -344,6 +355,13 @@ public PatternedMessage parse(String rawMessage) {
344355

345356
// handle token finalization
346357
int formerMultiTokenBitmask = currentMultiTokenBitmask;
358+
int formerMultiTokenEndIndex;
359+
if (currentTokenIndex >= 0) {
360+
formerMultiTokenEndIndex = bufferedTokenStartIndexes[currentTokenIndex] + bufferedTokenLengths[currentTokenIndex];
361+
} else {
362+
formerMultiTokenEndIndex = indexWithinRawMessage;
363+
}
364+
347365
if (charType == TOKEN_DELIMITER_CHAR_CODE || charType == LINE_END_CODE) {
348366
int currentTokenLength = indexWithinRawMessage - currentTokenStartIndex;
349367
if (currentTokenLength == 0) {
@@ -361,8 +379,12 @@ public PatternedMessage parse(String rawMessage) {
361379
bufferedTokens[currentTokenIndex] = currentToken;
362380
bufferedTokenDelimiters[currentTokenIndex] = currentChar;
363381
bufferedTokenBitmasks[currentTokenIndex] = currentTokenBitmask;
382+
bufferedTokenStartIndexes[currentTokenIndex] = currentTokenStartIndex;
383+
bufferedTokenLengths[currentTokenIndex] = currentTokenLength;
364384

365-
if (currentTokenIndex == compiledSchema.maxTokensPerMultiToken) {
385+
if (currentTokenIndex == 0) {
386+
currentMultiTokenStartIndex = currentTokenStartIndex;
387+
} else if (currentTokenIndex == compiledSchema.maxTokensPerMultiToken) {
366388
// we already passed the maximum number of tokens for any known multi-token
367389
currentMultiTokenBitmask = 0;
368390
} else {
@@ -436,11 +458,17 @@ public PatternedMessage parse(String rawMessage) {
436458
if (multiTokenType.encodingType() == EncodingType.TIMESTAMP) {
437459
createAndStoreTimestamp(multiTokenType);
438460
} else {
439-
throw new IllegalStateException("Unknown multi-token type: " + multiTokenType.name());
461+
throw new ParseException("Unknown multi-token type: " + multiTokenType.name());
440462
}
463+
464+
int multiTokenLength = formerMultiTokenEndIndex - currentMultiTokenStartIndex;
465+
totalCharsAttributedToArgs += multiTokenLength - 2; // deducting 2 for the %T placeholder
466+
441467
// now fixing the buffers so that the last token becomes the only buffered token
442468
bufferedTokens[0] = bufferedTokens[currentTokenIndex];
443469
bufferedTokenBitmasks[0] = bufferedTokenBitmasks[currentTokenIndex];
470+
bufferedTokenStartIndexes[0] = bufferedTokenStartIndexes[currentTokenIndex];
471+
bufferedTokenLengths[0] = bufferedTokenLengths[currentTokenIndex];
444472
bufferedTokenDelimiters[0] = bufferedTokenDelimiters[currentTokenIndex];
445473
currentTokenIndex = 0;
446474
currentMultiTokenSubTokenIndex = 0;
@@ -461,16 +489,26 @@ public PatternedMessage parse(String rawMessage) {
461489
case INTEGER -> new IntegerArgument(bufferedSubTokenIntValues[0]);
462490
case HEX -> new HexadecimalArgument(
463491
rawMessage,
464-
bufferedSubTokenStartIndexes[0],
465-
bufferedSubTokenLengths[0]
492+
bufferedTokenStartIndexes[i],
493+
bufferedTokenLengths[i]
466494
);
467-
case IPV4 -> new IPv4Argument(bufferedSubTokenIntValues);
468-
case UUID -> new UUIDArgument(rawMessage, currentTokenStartIndex, indexWithinRawMessage);
495+
case IPV4 -> {
496+
if (currentTokenIndex == 0) {
497+
// IPv4 tokens can only be part of a single token, so we can safely create an IPv4 argument
498+
yield new IPv4Argument(bufferedSubTokenIntValues);
499+
} else {
500+
throw new ParseException(
501+
"IPV4 token cannot be part of a multi-token, but found at position " + i
502+
);
503+
}
504+
}
505+
case UUID -> new UUIDArgument(rawMessage, bufferedTokenStartIndexes[i], bufferedTokenLengths[i]);
469506
default -> null;
470507
};
471508
if (argument != null) {
472509
arguments.add(argument);
473510
patternedMessage.append(ARGUMENT_PLACEHOLDER_PREFIX).append(argument.type().getSymbol());
511+
totalCharsAttributedToArgs += bufferedTokenLengths[i] - 2; // 2 for symbols
474512
} else {
475513
// todo
476514
}
@@ -511,6 +549,7 @@ public PatternedMessage parse(String rawMessage) {
511549
if (argument != null) {
512550
arguments.add(argument);
513551
patternedMessage.append(ARGUMENT_PLACEHOLDER_PREFIX).append(argument.type().getSymbol());
552+
totalCharsAttributedToArgs += bufferedSubTokenLengths[i] - 2; // deducting 2 for the % prefix and symbol
514553
} else {
515554
patternedMessage.append(rawMessage, bufferedSubTokenStartIndexes[i], indexWithinRawMessage);
516555
}
@@ -534,6 +573,11 @@ public PatternedMessage parse(String rawMessage) {
534573
default:
535574
}
536575
}
576+
577+
int consumedCharsForPatternedMessage = patternedMessage.length() + totalCharsAttributedToArgs;
578+
if (consumedCharsForPatternedMessage != rawMessage.length()) {
579+
throw new DataLossParseException("Data loss detected during parsing", rawMessage.length(), consumedCharsForPatternedMessage);
580+
}
537581
return new PatternedMessage(patternedMessage.toString(), timestamp, arguments.toArray(new Argument<?>[0]));
538582
}
539583

x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/patternedtext/charparser/api/ParserTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public void setUp() throws Exception {
2222
parser = ParserFactory.createParser();
2323
}
2424

25-
public void testSimpleIpAndNumber() {
25+
public void testSimpleIpAndNumber() throws ParseException {
2626
PatternedMessage patternedMessage = parser.parse("Response from 127.0.0.1 took 2000 ms");
2727
assertEquals("Response from %4 took %I ms", patternedMessage.pattern());
2828
Argument<?>[] arguments = patternedMessage.arguments();
@@ -31,7 +31,7 @@ public void testSimpleIpAndNumber() {
3131
assertEquals("INTEGER", arguments[1].type().name());
3232
}
3333

34-
public void testTimestampAndIpAndNumber() {
34+
public void testTimestampAndIpAndNumber() throws ParseException {
3535
PatternedMessage patternedMessage = parser.parse("Oct 05 2023 02:48:07 PM INFO Response from 146.10.10.133 took 2000 ms");
3636
assertEquals("%T INFO Response from %4 took %I ms", patternedMessage.pattern());
3737
assertEquals(1696517287000L, patternedMessage.timestamp().getTimestamp());

0 commit comments

Comments
 (0)