Skip to content

Commit 735e635

Browse files
authored
Parse bulk lines in individual steps (#114086) (#116210)
Currently our incremental bulk parsing framework only parses once both the action line and document line are available. In addition, it will re-search lines for line delimiters as data is received. This commit ensures that the state is not lost in between parse attempts.
1 parent 0703294 commit 735e635

File tree

5 files changed

+311
-130
lines changed

5 files changed

+311
-130
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 186 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiV
8686
.withRestApiVersion(restApiVersion);
8787
}
8888

89-
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
89+
private static int findNextMarker(byte marker, int from, BytesReference data, boolean lastData) {
9090
final int res = data.indexOf(marker, from);
9191
if (res != -1) {
9292
assert res >= 0;
9393
return res;
9494
}
95-
if (from != data.length() && isIncremental == false) {
95+
if (from != data.length() && lastData) {
9696
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
9797
}
9898
return res;
@@ -137,13 +137,7 @@ public void parse(
137137
Consumer<UpdateRequest> updateRequestConsumer,
138138
Consumer<DeleteRequest> deleteRequestConsumer
139139
) throws IOException {
140-
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
141-
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
142-
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
143-
final Map<String, String> stringDeduplicator = new HashMap<>();
144-
145-
incrementalParse(
146-
data,
140+
IncrementalParser incrementalParser = new IncrementalParser(
147141
defaultIndex,
148142
defaultRouting,
149143
defaultFetchSourceContext,
@@ -155,53 +149,164 @@ public void parse(
155149
xContentType,
156150
indexRequestConsumer,
157151
updateRequestConsumer,
158-
deleteRequestConsumer,
159-
false,
160-
stringDeduplicator
152+
deleteRequestConsumer
161153
);
154+
155+
incrementalParser.parse(data, true);
162156
}
163157

164-
public int incrementalParse(
165-
BytesReference data,
166-
String defaultIndex,
167-
String defaultRouting,
168-
FetchSourceContext defaultFetchSourceContext,
169-
String defaultPipeline,
170-
Boolean defaultRequireAlias,
171-
Boolean defaultRequireDataStream,
172-
Boolean defaultListExecutedPipelines,
158+
public IncrementalParser incrementalParser(
159+
@Nullable String defaultIndex,
160+
@Nullable String defaultRouting,
161+
@Nullable FetchSourceContext defaultFetchSourceContext,
162+
@Nullable String defaultPipeline,
163+
@Nullable Boolean defaultRequireAlias,
164+
@Nullable Boolean defaultRequireDataStream,
165+
@Nullable Boolean defaultListExecutedPipelines,
173166
boolean allowExplicitIndex,
174167
XContentType xContentType,
175168
BiConsumer<IndexRequest, String> indexRequestConsumer,
176169
Consumer<UpdateRequest> updateRequestConsumer,
177-
Consumer<DeleteRequest> deleteRequestConsumer,
178-
boolean isIncremental,
179-
Map<String, String> stringDeduplicator
180-
) throws IOException {
181-
XContent xContent = xContentType.xContent();
182-
byte marker = xContent.bulkSeparator();
170+
Consumer<DeleteRequest> deleteRequestConsumer
171+
) {
172+
return new IncrementalParser(
173+
defaultIndex,
174+
defaultRouting,
175+
defaultFetchSourceContext,
176+
defaultPipeline,
177+
defaultRequireAlias,
178+
defaultRequireDataStream,
179+
defaultListExecutedPipelines,
180+
allowExplicitIndex,
181+
xContentType,
182+
indexRequestConsumer,
183+
updateRequestConsumer,
184+
deleteRequestConsumer
185+
);
186+
}
187+
188+
public class IncrementalParser {
189+
190+
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
191+
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
192+
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
193+
private final Map<String, String> stringDeduplicator = new HashMap<>();
194+
195+
private final String defaultIndex;
196+
private final String defaultRouting;
197+
private final FetchSourceContext defaultFetchSourceContext;
198+
private final String defaultPipeline;
199+
private final Boolean defaultRequireAlias;
200+
private final Boolean defaultRequireDataStream;
201+
private final Boolean defaultListExecutedPipelines;
202+
private final boolean allowExplicitIndex;
203+
204+
private final XContentType xContentType;
205+
private final byte marker;
206+
private final BiConsumer<IndexRequest, String> indexRequestConsumer;
207+
private final Consumer<UpdateRequest> updateRequestConsumer;
208+
private final Consumer<DeleteRequest> deleteRequestConsumer;
209+
210+
private Exception failure = null;
211+
private int incrementalFromOffset = 0;
212+
private int line = 0;
183213
boolean typesDeprecationLogged = false;
184214

185-
int line = 0;
186-
int from = 0;
187-
int consumed = 0;
215+
private DocWriteRequest<?> currentRequest = null;
216+
private String currentType = null;
217+
private String currentPipeline = null;
218+
private boolean currentListExecutedPipelines = false;
219+
private FetchSourceContext currentFetchSourceContext = null;
188220

189-
while (true) {
190-
int nextMarker = findNextMarker(marker, from, data, isIncremental);
191-
if (nextMarker == -1) {
192-
break;
221+
private IncrementalParser(
222+
@Nullable String defaultIndex,
223+
@Nullable String defaultRouting,
224+
@Nullable FetchSourceContext defaultFetchSourceContext,
225+
@Nullable String defaultPipeline,
226+
@Nullable Boolean defaultRequireAlias,
227+
@Nullable Boolean defaultRequireDataStream,
228+
@Nullable Boolean defaultListExecutedPipelines,
229+
boolean allowExplicitIndex,
230+
XContentType xContentType,
231+
BiConsumer<IndexRequest, String> indexRequestConsumer,
232+
Consumer<UpdateRequest> updateRequestConsumer,
233+
Consumer<DeleteRequest> deleteRequestConsumer
234+
) {
235+
this.defaultIndex = defaultIndex;
236+
this.defaultRouting = defaultRouting;
237+
this.defaultFetchSourceContext = defaultFetchSourceContext;
238+
this.defaultPipeline = defaultPipeline;
239+
this.defaultRequireAlias = defaultRequireAlias;
240+
this.defaultRequireDataStream = defaultRequireDataStream;
241+
this.defaultListExecutedPipelines = defaultListExecutedPipelines;
242+
this.allowExplicitIndex = allowExplicitIndex;
243+
this.xContentType = xContentType;
244+
this.marker = xContentType.xContent().bulkSeparator();
245+
this.indexRequestConsumer = indexRequestConsumer;
246+
this.updateRequestConsumer = updateRequestConsumer;
247+
this.deleteRequestConsumer = deleteRequestConsumer;
248+
}
249+
250+
public int parse(BytesReference data, boolean lastData) throws IOException {
251+
if (failure != null) {
252+
assert false : failure.getMessage();
253+
throw new IllegalStateException("Parser has already encountered exception", failure);
254+
}
255+
try {
256+
return tryParse(data, lastData);
257+
} catch (Exception e) {
258+
failure = e;
259+
throw e;
193260
}
194-
line++;
261+
}
262+
263+
private int tryParse(BytesReference data, boolean lastData) throws IOException {
264+
int from = 0;
265+
int consumed = 0;
266+
267+
while (true) {
268+
int nextMarker = findNextMarker(marker, incrementalFromOffset, data, lastData);
269+
if (nextMarker == -1) {
270+
incrementalFromOffset = data.length() - consumed;
271+
break;
272+
}
273+
incrementalFromOffset = nextMarker + 1;
274+
line++;
275+
276+
if (currentRequest == null) {
277+
if (parseActionLine(data, from, nextMarker)) {
278+
if (currentRequest instanceof DeleteRequest deleteRequest) {
279+
deleteRequestConsumer.accept(deleteRequest);
280+
currentRequest = null;
281+
}
282+
}
283+
} else {
284+
parseAndConsumeDocumentLine(data, from, nextMarker);
285+
currentRequest = null;
286+
}
195287

196-
// now parse the action
197-
try (XContentParser parser = createParser(xContent, data, from, nextMarker)) {
198-
// move pointers
199288
from = nextMarker + 1;
289+
consumed = from;
290+
}
291+
292+
return lastData ? from : consumed;
293+
}
294+
295+
private boolean parseActionLine(BytesReference data, int from, int to) throws IOException {
296+
assert currentRequest == null;
297+
298+
// Reset the fields which are accessed during document line parsing
299+
currentType = null;
300+
currentPipeline = defaultPipeline;
301+
currentListExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
302+
currentFetchSourceContext = defaultFetchSourceContext;
303+
304+
try (XContentParser parser = createParser(xContentType.xContent(), data, from, to)) {
200305

201306
// Move to START_OBJECT
202307
XContentParser.Token token = parser.nextToken();
203308
if (token == null) {
204-
continue;
309+
return false;
205310
}
206311
if (token != XContentParser.Token.START_OBJECT) {
207312
throw new IllegalArgumentException(
@@ -239,20 +344,16 @@ public int incrementalParse(
239344
}
240345

241346
String index = defaultIndex;
242-
String type = null;
243347
String id = null;
244348
String routing = defaultRouting;
245-
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
246349
String opType = null;
247350
long version = Versions.MATCH_ANY;
248351
VersionType versionType = VersionType.INTERNAL;
249352
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
250353
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
251354
int retryOnConflict = 0;
252-
String pipeline = defaultPipeline;
253355
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
254356
boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
255-
boolean listExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
256357
Map<String, String> dynamicTemplates = Map.of();
257358

258359
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
@@ -283,7 +384,7 @@ public int incrementalParse(
283384
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
284385
);
285386
}
286-
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
387+
currentType = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
287388
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
288389
id = parser.text();
289390
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -301,15 +402,15 @@ public int incrementalParse(
301402
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
302403
retryOnConflict = parser.intValue();
303404
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
304-
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
405+
currentPipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
305406
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
306-
fetchSourceContext = FetchSourceContext.fromXContent(parser);
407+
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
307408
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
308409
requireAlias = parser.booleanValue();
309410
} else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) {
310411
requireDataStream = parser.booleanValue();
311412
} else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
312-
listExecutedPipelines = parser.booleanValue();
413+
currentListExecutedPipelines = parser.booleanValue();
313414
} else {
314415
throw new IllegalArgumentException(
315416
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
@@ -330,7 +431,7 @@ public int incrementalParse(
330431
dynamicTemplates = parser.mapStrings();
331432
} else if (token == XContentParser.Token.START_OBJECT
332433
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
333-
fetchSourceContext = FetchSourceContext.fromXContent(parser);
434+
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
334435
} else if (token != XContentParser.Token.VALUE_NULL) {
335436
throw new IllegalArgumentException(
336437
"Malformed action/metadata line ["
@@ -364,43 +465,33 @@ public int incrementalParse(
364465
"Delete request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
365466
);
366467
}
367-
deleteRequestConsumer.accept(
368-
new DeleteRequest(index).id(id)
369-
.routing(routing)
370-
.version(version)
371-
.versionType(versionType)
372-
.setIfSeqNo(ifSeqNo)
373-
.setIfPrimaryTerm(ifPrimaryTerm)
374-
);
375-
consumed = from;
468+
currentRequest = new DeleteRequest(index).id(id)
469+
.routing(routing)
470+
.version(version)
471+
.versionType(versionType)
472+
.setIfSeqNo(ifSeqNo)
473+
.setIfPrimaryTerm(ifPrimaryTerm);
376474
} else {
377-
nextMarker = findNextMarker(marker, from, data, isIncremental);
378-
if (nextMarker == -1) {
379-
break;
380-
}
381-
line++;
382-
383475
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
384476
// of index request.
385477
if ("index".equals(action) || "create".equals(action)) {
386478
var indexRequest = new IndexRequest(index).id(id)
387479
.routing(routing)
388480
.version(version)
389481
.versionType(versionType)
390-
.setPipeline(pipeline)
482+
.setPipeline(currentPipeline)
391483
.setIfSeqNo(ifSeqNo)
392484
.setIfPrimaryTerm(ifPrimaryTerm)
393-
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
394485
.setDynamicTemplates(dynamicTemplates)
395486
.setRequireAlias(requireAlias)
396487
.setRequireDataStream(requireDataStream)
397-
.setListExecutedPipelines(listExecutedPipelines);
488+
.setListExecutedPipelines(currentListExecutedPipelines);
398489
if ("create".equals(action)) {
399490
indexRequest = indexRequest.create(true);
400491
} else if (opType != null) {
401492
indexRequest = indexRequest.create("create".equals(opType));
402493
}
403-
indexRequestConsumer.accept(indexRequest, type);
494+
currentRequest = indexRequest;
404495
} else if ("update".equals(action)) {
405496
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
406497
throw new IllegalArgumentException(
@@ -427,31 +518,38 @@ public int incrementalParse(
427518
.setIfPrimaryTerm(ifPrimaryTerm)
428519
.setRequireAlias(requireAlias)
429520
.routing(routing);
430-
try (
431-
XContentParser sliceParser = createParser(
432-
xContent,
433-
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType)
434-
)
435-
) {
436-
updateRequest.fromXContent(sliceParser);
437-
}
438-
if (fetchSourceContext != null) {
439-
updateRequest.fetchSource(fetchSourceContext);
440-
}
441-
IndexRequest upsertRequest = updateRequest.upsertRequest();
442-
if (upsertRequest != null) {
443-
upsertRequest.setPipeline(pipeline).setListExecutedPipelines(listExecutedPipelines);
444-
}
445-
446-
updateRequestConsumer.accept(updateRequest);
521+
currentRequest = updateRequest;
447522
}
448-
// move pointers
449-
from = nextMarker + 1;
450-
consumed = from;
451523
}
452524
}
525+
return true;
453526
}
454-
return isIncremental ? consumed : from;
527+
528+
private void parseAndConsumeDocumentLine(BytesReference data, int from, int to) throws IOException {
529+
assert currentRequest != null && currentRequest instanceof DeleteRequest == false;
530+
if (currentRequest instanceof IndexRequest indexRequest) {
531+
indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType), xContentType);
532+
indexRequestConsumer.accept(indexRequest, currentType);
533+
} else if (currentRequest instanceof UpdateRequest updateRequest) {
534+
try (
535+
XContentParser sliceParser = createParser(
536+
xContentType.xContent(),
537+
sliceTrimmingCarriageReturn(data, from, to, xContentType)
538+
)
539+
) {
540+
updateRequest.fromXContent(sliceParser);
541+
}
542+
if (currentFetchSourceContext != null) {
543+
updateRequest.fetchSource(currentFetchSourceContext);
544+
}
545+
IndexRequest upsertRequest = updateRequest.upsertRequest();
546+
if (upsertRequest != null) {
547+
upsertRequest.setPipeline(currentPipeline).setListExecutedPipelines(currentListExecutedPipelines);
548+
}
549+
updateRequestConsumer.accept(updateRequest);
550+
}
551+
}
552+
455553
}
456554

457555
@UpdateForV9

0 commit comments

Comments
 (0)