Skip to content

Commit 2911363

Browse files
authored
Store dynamic mappings updates as simple CompressedXContent (elastic#142941)
In the past we needed to merge additional mappings into dynamic updates, to handle things like default mappings or multiple types. These are no longer an issue, and so we can change ParsedDocument to hold its dynamic mappings updates as plain CompressedXContent. This also removes one of the requirements for Mapping to implement ToXContent.
1 parent a3fcc33 commit 2911363

File tree

28 files changed

+222
-238
lines changed

28 files changed

+222
-238
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/mapper/DynamicMapperBenchmark.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
import org.elasticsearch.common.UUIDs;
1313
import org.elasticsearch.common.bytes.BytesArray;
14-
import org.elasticsearch.common.compress.CompressedXContent;
15-
import org.elasticsearch.common.xcontent.XContentHelper;
1614
import org.elasticsearch.index.mapper.DocumentMapper;
1715
import org.elasticsearch.index.mapper.LuceneDocument;
1816
import org.elasticsearch.index.mapper.MapperService;
@@ -185,21 +183,16 @@ public List<LuceneDocument> benchmarkDynamicallyCreatedFields() throws Exception
185183
MapperService mapperService = MapperServiceFactory.create("{}");
186184
for (int i = 0; i < 25; i++) {
187185
DocumentMapper documentMapper = mapperService.documentMapper();
188-
Mapping mapping = null;
189-
if (documentMapper == null) {
186+
boolean noMappings = documentMapper == null;
187+
if (noMappings) {
190188
documentMapper = DocumentMapper.createEmpty(mapperService);
191-
mapping = documentMapper.mapping();
192189
}
193190
ParsedDocument doc = documentMapper.parse(randomFrom(sources));
194-
if (mapping != null) {
195-
doc.addDynamicMappingsUpdate(mapping);
191+
if (noMappings) {
192+
doc.addDynamicMappingsUpdate(Mapping.emptyCompressed());
196193
}
197194
if (doc.dynamicMappingsUpdate() != null) {
198-
mapperService.merge(
199-
"_doc",
200-
new CompressedXContent(XContentHelper.toXContent(doc.dynamicMappingsUpdate(), XContentType.JSON, false)),
201-
MapperService.MergeReason.MAPPING_UPDATE
202-
);
195+
mapperService.merge("_doc", doc.dynamicMappingsUpdate(), MapperService.MergeReason.MAPPING_UPDATE);
203196
}
204197
}
205198
return mapperService.documentMapper().parse(randomFrom(sources)).docs();

server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
package org.elasticsearch.action.bulk;
1111

1212
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.index.mapper.Mapping;
13+
import org.elasticsearch.common.compress.CompressedXContent;
1414
import org.elasticsearch.index.shard.ShardId;
1515

1616
public interface MappingUpdatePerformer {
1717

1818
/**
1919
* Update the mappings on the master.
2020
*/
21-
void updateMappings(Mapping update, ShardId shardId, ActionListener<Void> listener);
21+
void updateMappings(CompressedXContent update, ShardId shardId, ActionListener<Void> listener);
2222

2323
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ private static boolean handleMappingUpdateRequired(
480480
try {
481481
CompressedXContent mergedSource = mapperService.merge(
482482
MapperService.SINGLE_MAPPING_NAME,
483-
new CompressedXContent(result.getRequiredMappingUpdate()),
483+
result.getRequiredMappingUpdate(),
484484
MapperService.MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT
485485
).mappingSource();
486486
final DocumentMapper existingDocumentMapper = mapperService.documentMapper();

server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.support.master.MasterNodeRequest;
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.client.internal.IndicesAdminClient;
18+
import org.elasticsearch.common.compress.CompressedXContent;
1819
import org.elasticsearch.common.settings.ClusterSettings;
1920
import org.elasticsearch.common.settings.Setting;
2021
import org.elasticsearch.common.settings.Setting.Property;
@@ -23,7 +24,6 @@
2324
import org.elasticsearch.common.util.concurrent.RunOnce;
2425
import org.elasticsearch.core.TimeValue;
2526
import org.elasticsearch.index.Index;
26-
import org.elasticsearch.index.mapper.Mapping;
2727
import org.elasticsearch.injection.guice.Inject;
2828
import org.elasticsearch.xcontent.XContentType;
2929

@@ -79,7 +79,7 @@ public void setClient(Client client) {
7979
* {@code timeout} is the master node timeout ({@link MasterNodeRequest#masterNodeTimeout()}),
8080
* potentially waiting for a master node to be available.
8181
*/
82-
public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
82+
public void updateMappingOnMaster(Index index, CompressedXContent mappingUpdate, ActionListener<Void> listener) {
8383
final RunOnce release = new RunOnce(semaphore::release);
8484
try {
8585
semaphore.acquire();
@@ -105,10 +105,10 @@ int blockedThreads() {
105105
}
106106

107107
// can be overridden by tests
108-
protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
108+
protected void sendUpdateMapping(Index index, CompressedXContent mappingUpdate, ActionListener<Void> listener) {
109109
PutMappingRequest putMappingRequest = new PutMappingRequest();
110110
putMappingRequest.setConcreteIndex(index);
111-
putMappingRequest.source(mappingUpdate.toString(), XContentType.JSON);
111+
putMappingRequest.source(mappingUpdate.string(), XContentType.JSON);
112112
putMappingRequest.masterNodeTimeout(dynamicMappingUpdateTimeout);
113113
putMappingRequest.ackTimeout(TimeValue.ZERO);
114114
putMappingRequest.origin("bulk");

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
4949
import org.elasticsearch.cluster.service.ClusterApplierService;
5050
import org.elasticsearch.common.bytes.BytesReference;
51+
import org.elasticsearch.common.compress.CompressedXContent;
5152
import org.elasticsearch.common.logging.Loggers;
5253
import org.elasticsearch.common.lucene.Lucene;
5354
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@@ -72,7 +73,6 @@
7273
import org.elasticsearch.index.mapper.DocumentParser;
7374
import org.elasticsearch.index.mapper.LuceneDocument;
7475
import org.elasticsearch.index.mapper.Mapper;
75-
import org.elasticsearch.index.mapper.Mapping;
7676
import org.elasticsearch.index.mapper.MappingLookup;
7777
import org.elasticsearch.index.mapper.ParsedDocument;
7878
import org.elasticsearch.index.mapper.Uid;
@@ -722,7 +722,7 @@ public abstract static class Result {
722722
private final long seqNo;
723723
private final Exception failure;
724724
private final SetOnce<Boolean> freeze = new SetOnce<>();
725-
private final Mapping requiredMappingUpdate;
725+
private final CompressedXContent requiredMappingUpdate;
726726
private final String id;
727727
private Translog.Location translogLocation;
728728
private long took;
@@ -749,7 +749,7 @@ protected Result(Operation.TYPE operationType, long version, long term, long seq
749749
this.id = id;
750750
}
751751

752-
protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate, String id) {
752+
protected Result(Operation.TYPE operationType, CompressedXContent requiredMappingUpdate, String id) {
753753
this.operationType = operationType;
754754
this.version = Versions.NOT_FOUND;
755755
this.seqNo = UNASSIGNED_SEQ_NO;
@@ -785,9 +785,9 @@ public long getTerm() {
785785

786786
/**
787787
* If the operation was aborted due to missing mappings, this method will return the mappings
788-
* that are required to complete the operation.
788+
* that are required to complete the operation as serialized {@link CompressedXContent}.
789789
*/
790-
public Mapping getRequiredMappingUpdate() {
790+
public CompressedXContent getRequiredMappingUpdate() {
791791
return requiredMappingUpdate;
792792
}
793793

@@ -862,7 +862,7 @@ public IndexResult(Exception failure, long version, long term, long seqNo, Strin
862862
this.created = false;
863863
}
864864

865-
public IndexResult(Mapping requiredMappingUpdate, String id) {
865+
public IndexResult(CompressedXContent requiredMappingUpdate, String id) {
866866
super(Operation.TYPE.INDEX, requiredMappingUpdate, id);
867867
this.created = false;
868868
}

server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.util.BytesRef;
1616
import org.elasticsearch.cluster.routing.IndexRouting;
1717
import org.elasticsearch.common.Explicit;
18+
import org.elasticsearch.common.compress.CompressedXContent;
1819
import org.elasticsearch.common.regex.Regex;
1920
import org.elasticsearch.common.xcontent.XContentHelper;
2021
import org.elasticsearch.core.Nullable;
@@ -112,7 +113,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
112113
}
113114
assert context.path.pathAsText("").isEmpty() : "found leftover path elements: " + context.path.pathAsText("");
114115

115-
Mapping dynamicUpdate = createDynamicUpdate(context);
116+
CompressedXContent dynamicUpdate = createDynamicUpdate(context);
116117

117118
return new ParsedDocument(
118119
context.version(),
@@ -283,7 +284,7 @@ private static DocumentParsingException wrapInDocumentParsingException(DocumentP
283284
return new DocumentParsingException(context.parser().getTokenLocation(), "failed to parse: " + e.getMessage(), e);
284285
}
285286

286-
static Mapping createDynamicUpdate(DocumentParserContext context) {
287+
static CompressedXContent createDynamicUpdate(DocumentParserContext context) {
287288
if (context.hasDynamicMappersOrRuntimeFields() == false) {
288289
return null;
289290
}
@@ -293,8 +294,7 @@ static Mapping createDynamicUpdate(DocumentParserContext context) {
293294
for (RuntimeField runtimeField : context.getDynamicRuntimeFields()) {
294295
rootBuilder.addRuntimeField(runtimeField);
295296
}
296-
RootObjectMapper root = rootBuilder.build(MapperBuilderContext.root(context.mappingLookup().isSourceSynthetic(), false));
297-
return context.mappingLookup().getMapping().mappingUpdate(root);
297+
return new MappingBuilder(rootBuilder, Map.of(), null).build(MapperService.MergeReason.MAPPING_UPDATE).toCompressedXContent();
298298
}
299299

300300
static void parseObjectOrNested(DocumentParserContext context) throws IOException {

server/src/main/java/org/elasticsearch/index/mapper/Mapping.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,19 @@ public final class Mapping implements ToXContentFragment {
3838
null
3939
);
4040

41+
/**
42+
* Holder class for lazy initialization of EMPTY_COMPRESSED. Creating a CompressedXContent calls
43+
* MessageDigests.sha256() which resets the thread-local SHA-256 digest, so we must not do this
44+
* during Mapping class loading (which can happen at unpredictable times via XContentHelper.parseToType).
45+
*/
46+
private static class EmptyCompressedHolder {
47+
static final CompressedXContent INSTANCE = EMPTY.toCompressedXContent();
48+
}
49+
50+
public static CompressedXContent emptyCompressed() {
51+
return EmptyCompressedHolder.INSTANCE;
52+
}
53+
4154
private final RootObjectMapper root;
4255
private final Map<String, Object> meta;
4356
private final MetadataFieldMapper[] metadataMappers;

server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.lucene.util.BytesRef;
1818
import org.elasticsearch.common.bytes.BytesArray;
1919
import org.elasticsearch.common.bytes.BytesReference;
20+
import org.elasticsearch.common.compress.CompressedXContent;
2021
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
2122
import org.elasticsearch.xcontent.XContentType;
2223

@@ -41,7 +42,7 @@ public class ParsedDocument {
4142

4243
private BytesReference source;
4344
private XContentType xContentType;
44-
private Mapping dynamicMappingsUpdate;
45+
private CompressedXContent dynamicMappingsUpdate;
4546

4647
/**
4748
* Create a no-op tombstone document
@@ -146,7 +147,7 @@ public ParsedDocument(
146147
List<LuceneDocument> documents,
147148
BytesReference source,
148149
XContentType xContentType,
149-
Mapping dynamicMappingsUpdate,
150+
CompressedXContent dynamicMappingsUpdate,
150151
long normalizedSize
151152
) {
152153
this.version = version;
@@ -205,11 +206,11 @@ public void setSource(BytesReference source, XContentType xContentType) {
205206
* Return dynamic updates to mappings or {@code null} if there were no
206207
* updates to the mappings.
207208
*/
208-
public Mapping dynamicMappingsUpdate() {
209+
public CompressedXContent dynamicMappingsUpdate() {
209210
return dynamicMappingsUpdate;
210211
}
211212

212-
public void addDynamicMappingsUpdate(Mapping update) {
213+
public void addDynamicMappingsUpdate(CompressedXContent update) {
213214
if (dynamicMappingsUpdate == null) {
214215
dynamicMappingsUpdate = update;
215216
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.cluster.service.MasterService;
5151
import org.elasticsearch.common.CheckedBiConsumer;
5252
import org.elasticsearch.common.UUIDs;
53+
import org.elasticsearch.common.compress.CompressedXContent;
5354
import org.elasticsearch.common.io.stream.BytesStreamOutput;
5455
import org.elasticsearch.common.lucene.Lucene;
5556
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@@ -1037,7 +1038,7 @@ private Engine.IndexResult applyIndexOperation(
10371038
ifPrimaryTerm,
10381039
getRelativeTimeInNanos()
10391040
);
1040-
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
1041+
CompressedXContent update = operation.parsedDoc().dynamicMappingsUpdate();
10411042
if (update != null) {
10421043
return new Engine.IndexResult(update, operation.parsedDoc().id());
10431044
}
@@ -1093,18 +1094,17 @@ public static Engine.Index prepareIndex(
10931094
assert source.dynamicTemplateParams().isEmpty() || origin == Engine.Operation.Origin.PRIMARY
10941095
: "dynamic_template_params parameter can only be associated with primary operations";
10951096
DocumentMapper documentMapper = mapperService.documentMapper();
1096-
Mapping mapping = null;
1097-
if (documentMapper == null) {
1097+
boolean noMappings = documentMapper == null;
1098+
if (noMappings) {
10981099
documentMapper = DocumentMapper.createEmpty(mapperService);
1099-
mapping = documentMapper.mapping();
11001100
}
11011101
ParsedDocument doc = documentMapper.parse(source);
1102-
if (mapping != null) {
1102+
if (noMappings) {
11031103
// If we are indexing but there is no mapping we create one. This is to ensure that whenever at least a document is indexed
11041104
// some mappings do exist. It covers for the case of indexing an empty doc (`{}`).
11051105
// TODO this can be removed if we eagerly create mappings as soon as a new index is created, regardless of
11061106
// whether mappings were provided or not.
1107-
doc.addDynamicMappingsUpdate(mapping);
1107+
doc.addDynamicMappingsUpdate(Mapping.emptyCompressed());
11081108
}
11091109
return new Engine.Index(
11101110
Uid.encodeId(doc.id()),

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import org.elasticsearch.index.mapper.MapperService;
4343
import org.elasticsearch.index.mapper.Mapping;
4444
import org.elasticsearch.index.mapper.MappingLookup;
45-
import org.elasticsearch.index.mapper.MetadataFieldMapper;
46-
import org.elasticsearch.index.mapper.RootObjectMapper;
4745
import org.elasticsearch.index.shard.IndexShard;
4846
import org.elasticsearch.index.shard.IndexShardTestCase;
4947
import org.elasticsearch.index.shard.ShardId;
@@ -276,10 +274,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
276274
items[0] = new BulkItemRequest(0, writeRequest);
277275
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
278276

279-
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
280-
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
281-
"id"
282-
);
277+
Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id");
283278
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
284279
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation, "id");
285280

@@ -899,10 +894,7 @@ public void testRetries() throws Exception {
899894

900895
Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>");
901896
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0, "id");
902-
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
903-
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
904-
"id"
905-
);
897+
Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id");
906898
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
907899
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation, "id");
908900

@@ -991,10 +983,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
991983
items[1] = new BulkItemRequest(1, writeRequest2);
992984
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
993985

994-
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
995-
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
996-
"id"
997-
);
986+
Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id");
998987
Translog.Location resultLocation1 = new Translog.Location(42, 36, 36);
999988
Translog.Location resultLocation2 = new Translog.Location(42, 42, 42);
1000989
Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1, "id");
@@ -1119,10 +1108,7 @@ public void testPerformOnPrimaryReportsBulkStats() throws Exception {
11191108
}
11201109

11211110
public void testNoopMappingUpdateInfiniteLoopPrevention() throws Exception {
1122-
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
1123-
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
1124-
"id"
1125-
);
1111+
Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id");
11261112

11271113
MapperService mapperService = mock(MapperService.class);
11281114
DocumentMapper documentMapper = mock(DocumentMapper.class);
@@ -1173,10 +1159,7 @@ public void testNoopMappingUpdateInfiniteLoopPrevention() throws Exception {
11731159
}
11741160

11751161
public void testNoopMappingUpdateSuccessOnRetry() throws Exception {
1176-
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
1177-
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
1178-
"id"
1179-
);
1162+
Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id");
11801163
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
11811164
Engine.IndexResult successfulResult = new FakeIndexResult(1, 1, 10, true, resultLocation, "id");
11821165

@@ -1322,7 +1305,7 @@ public Translog.Location getTranslogLocation() {
13221305
/** Doesn't perform any mapping updates */
13231306
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
13241307
@Override
1325-
public void updateMappings(Mapping update, ShardId shardId, ActionListener<Void> listener) {
1308+
public void updateMappings(CompressedXContent update, ShardId shardId, ActionListener<Void> listener) {
13261309
listener.onResponse(null);
13271310
}
13281311
}
@@ -1336,7 +1319,7 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer {
13361319
}
13371320

13381321
@Override
1339-
public void updateMappings(Mapping update, ShardId shardId, ActionListener<Void> listener) {
1322+
public void updateMappings(CompressedXContent update, ShardId shardId, ActionListener<Void> listener) {
13401323
listener.onFailure(e);
13411324
}
13421325
}

0 commit comments

Comments
 (0)