Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
} // else default value is true

if (in.getTransportVersion().supports(INDEX_REQUEST_INCLUDE_TSID)) {
tsid = in.readBytesRefOrNullIfEmpty();
tsid = in.readOptionalBytesRef();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public BytesRef readBytesRef() throws IOException {
return readBytesRef(length);
}

public @Nullable BytesRef readBytesRefOrNullIfEmpty() throws IOException {
public @Nullable BytesRef readOptionalBytesRef() throws IOException {
int length = readArraySize();
if (length == 0) {
return null;
Expand Down
22 changes: 16 additions & 6 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.index.mapper.RoutingFields;
import org.elasticsearch.index.mapper.RoutingPathFields;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
Expand Down Expand Up @@ -112,7 +113,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings) {
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
return RoutingFields.Noop.INSTANCE;
}

Expand Down Expand Up @@ -216,10 +217,19 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings) {
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
IndexRouting indexRouting = settings.getIndexRouting();
if (indexRouting instanceof IndexRouting.ExtractFromSource.ForRoutingPath forRoutingPath) {
return new RoutingPathFields(forRoutingPath.builder());
if (source.tsid() != null) {
// If we already have a tsid, do not extract it from the source again.
// This happens during translog operation replay where we get the tsid value from the translog.
// We don't want to re-create the tsid to avoid that it's different from the original one.
// That could happen, for example, if a new dimension field was added to a non-dynamic mapping
// after the translog operation was created.
return RoutingFields.Noop.INSTANCE;
} else {
return new RoutingPathFields(forRoutingPath.builder());
}
} else if (indexRouting instanceof IndexRouting.ExtractFromSource.ForIndexDimensions) {
return RoutingFields.Noop.INSTANCE;
} else {
Expand Down Expand Up @@ -303,7 +313,7 @@ public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings) {
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
return RoutingFields.Noop.INSTANCE;
}

Expand Down Expand Up @@ -384,7 +394,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
}

@Override
public RoutingFields buildRoutingFields(IndexSettings settings) {
public RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source) {
return RoutingFields.Noop.INSTANCE;
}

Expand Down Expand Up @@ -540,7 +550,7 @@ public String getName() {
/**
* How {@code time_series_dimension} fields are handled by indices in this mode.
*/
public abstract RoutingFields buildRoutingFields(IndexSettings settings);
public abstract RoutingFields buildRoutingFields(IndexSettings settings, SourceToParse source);

/**
* @return Whether timestamps should be validated for being withing the time range of an index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,11 @@ public String id() {
return this.doc.id();
}

@Nullable
public BytesRef tsid() {
return this.doc.tsid();
}

@Override
public TYPE operationType() {
return TYPE.INDEX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
}
// TODO: pass the latest timestamp from engine.
final long autoGeneratedIdTimestamp = -1;
op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp);
// TODO get the tsid?
op = new Translog.Index(id, seqNo, primaryTerm, version, source, fields.routing(), autoGeneratedIdTimestamp, null);
}
}
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ private Translog.Operation createOperation(
docRecord.version(),
source.internalSourceRef(),
fieldLoader.routing(),
-1 // autogenerated timestamp
-1, // autogenerated timestamp
null
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
context.sourceToParse().source(),
context.sourceToParse().getXContentType(),
dynamicUpdate,
meteringParserDecorator.meteredDocumentSize()
meteringParserDecorator.meteredDocumentSize(),
context.getTsid()
) {
@Override
public String documentDescription() {
Expand Down Expand Up @@ -1072,7 +1073,7 @@ private static class RootDocumentParserContext extends DocumentParserContext {
private final long maxAllowedNumNestedDocs;
private long numNestedDocs;
private boolean docsReversed = false;
private final BytesRef tsid;
private BytesRef tsid;

RootDocumentParserContext(
MappingLookup mappingLookup,
Expand All @@ -1092,8 +1093,9 @@ private static class RootDocumentParserContext extends DocumentParserContext {
if (tsid == null
&& indexSettings.getMode() == IndexMode.TIME_SERIES
&& indexSettings.getIndexRouting() instanceof IndexRouting.ExtractFromSource.ForIndexDimensions forIndexDimensions) {
// the tsid is normally set on the coordinating node during shard routing and passed to the data node via the index request
// but when applying a translog operation, shard routing is not happening, and we have to create the tsid from source
// The tsid is normally set on the coordinating node during shard routing and passed to the data node via the index request.
// When applying a translog operation from an older version that didn't include the tsid in the translog, yet,
// we have to re-create the tsid from source.
tsid = forIndexDimensions.buildTsid(source.getXContentType(), source.source());
}
this.tsid = tsid;
Expand Down Expand Up @@ -1158,6 +1160,9 @@ protected void addDoc(LuceneDocument doc) {

@Override
public BytesRef getTsid() {
if (tsid == null && getRoutingFields() instanceof RoutingPathFields routingPathFields) {
tsid = routingPathFields.buildHash().toBytesRef();
}
return this.tsid;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ protected DocumentParserContext(
null,
null,
SeqNoFieldMapper.SequenceIDFields.emptySeqID(mappingParserContext.getIndexSettings().seqNoIndexOptions()),
RoutingFields.fromIndexSettings(mappingParserContext.getIndexSettings()),
RoutingFields.fromIndexSettings(mappingParserContext.getIndexSettings(), source),
parent,
dynamic,
new HashSet<>(),
Expand Down Expand Up @@ -871,6 +871,10 @@ public final MapperBuilderContext createDynamicMapperBuilderContext() {

protected abstract void addDoc(LuceneDocument doc);

/**
* Gets or creates the time series id for this document, or null if the index is not a time series index.
* This method must only be called after document parsing has completed, so that all dimension fields have been extracted.
*/
@Nullable
public abstract BytesRef getTsid();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -36,6 +37,8 @@ public class ParsedDocument {
private final List<LuceneDocument> documents;

private final long normalizedSize;
@Nullable
private final BytesRef tsid;

private BytesReference source;
private XContentType xContentType;
Expand Down Expand Up @@ -63,7 +66,8 @@ public static ParsedDocument noopTombstone(SeqNoFieldMapper.SeqNoIndexOptions se
new BytesArray("{}"),
XContentType.JSON,
null,
XContentMeteringParserDecorator.UNKNOWN_SIZE
XContentMeteringParserDecorator.UNKNOWN_SIZE,
null
);
}

Expand All @@ -88,7 +92,8 @@ public static ParsedDocument deleteTombstone(SeqNoFieldMapper.SeqNoIndexOptions
new BytesArray("{}"),
XContentType.JSON,
null,
XContentMeteringParserDecorator.UNKNOWN_SIZE
XContentMeteringParserDecorator.UNKNOWN_SIZE,
null
);
}

Expand All @@ -101,7 +106,8 @@ public ParsedDocument(
BytesReference source,
XContentType xContentType,
Mapping dynamicMappingsUpdate,
long normalizedSize
long normalizedSize,
@Nullable BytesRef tsid
) {
this.version = version;
this.seqID = seqID;
Expand All @@ -112,6 +118,7 @@ public ParsedDocument(
this.dynamicMappingsUpdate = dynamicMappingsUpdate;
this.xContentType = xContentType;
this.normalizedSize = normalizedSize;
this.tsid = tsid;
}

public String id() {
Expand Down Expand Up @@ -150,6 +157,10 @@ public XContentType getXContentType() {
return this.xContentType;
}

public @Nullable BytesRef tsid() {
return tsid;
}

public void setSource(BytesReference source, XContentType xContentType) {
this.source = source;
this.xContentType = xContentType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public interface RoutingFields {
/**
* Collect routing fields from index settings
*/
static RoutingFields fromIndexSettings(IndexSettings indexSettings) {
return indexSettings.getMode().buildRoutingFields(indexSettings);
static RoutingFields fromIndexSettings(IndexSettings indexSettings, SourceToParse source) {
return indexSettings.getMode().buildRoutingFields(indexSettings, source);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,24 @@ public void postParse(DocumentParserContext context) throws IOException {

final BytesRef timeSeriesId;
final RoutingPathFields routingPathFields;
if (context.getRoutingFields() instanceof RoutingPathFields routingPathFieldsFromContext) {
routingPathFields = routingPathFieldsFromContext;
} else {
routingPathFields = null;
}
if (getIndexVersionCreated(context).before(IndexVersions.TIME_SERIES_ID_HASHING)) {
routingPathFields = (RoutingPathFields) context.getRoutingFields();
assert routingPathFields != null : "routing path fields are required for legacy indices";
long limit = context.indexSettings().getValue(MapperService.INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING);
int size = routingPathFields.routingValues().size();
if (size > limit) {
throw new MapperException("Too many dimension fields [" + size + "], max [" + limit + "] dimension fields allowed");
}
timeSeriesId = buildLegacyTsid(routingPathFields).toBytesRef();
} else if (context.getRoutingFields() instanceof RoutingPathFields routingPathFieldsFromContext) {
routingPathFields = routingPathFieldsFromContext;
timeSeriesId = routingPathFields.buildHash().toBytesRef();
} else {
routingPathFields = null;
assert context.getTsid() != null;
timeSeriesId = context.getTsid();
if (timeSeriesId == null) {
throw new IllegalStateException("tsid is missing from the context" + context.documentDescription());
}
}

if (this.useDocValuesSkipper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2130,7 +2130,14 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
index.getAutoGeneratedIdTimestamp(),
true,
origin,
new SourceToParse(index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing())
new SourceToParse(
index.id(),
index.source(),
XContentHelper.xContentType(index.source()),
index.routing(),
Map.of(),
index.tsid()
)
);
}
case DELETE -> {
Expand Down
Loading