Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132476.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132476
summary: Add `_data_stream` metadata to `SearchHit` for `_search` response
area: Data streams
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
Expand All @@ -97,6 +99,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
public static final String FAILURE_STORE_PREFIX = ".fs-";
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
public static final Pattern DS_BACKING_PATTERN = Pattern.compile(
"^(.*?" + BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)$"
);

// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
Expand Down Expand Up @@ -1365,6 +1370,24 @@ public static String getDefaultFailureStoreName(String dataStreamName, long gene
return getDefaultIndexName(FAILURE_STORE_PREFIX, dataStreamName, generation, epochMillis);
}

/**
* Parses the name of the data stream for a backing index if it matches a data stream backing index name format.
*
* @param indexName name of the index
* @return name of the data stream if applicable or else null otherwise
*/
public static String getDataStreamNameFromIndex(String indexName) {
if (indexName == null) {
return null;
}

Matcher matcher = DataStream.DS_BACKING_PATTERN.matcher(indexName);
if (matcher.matches()) {
return matcher.group(2);
}
return null;
}

/**
* Generates the name of the index that conforms to the default naming convention for indices
* on data streams given the specified prefix, data stream name, generation, and time.
Expand Down
74 changes: 71 additions & 3 deletions server/src/main/java/org/elasticsearch/search/SearchHit.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.search.Explanation;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -98,11 +99,13 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted
@Nullable
private SearchShardTarget shard;

// These two fields normally get set when setting the shard target, so they hold the same values as the target thus don't get
// These three fields normally get set when setting the shard target, so they hold the same values as the target thus don't get
// serialized over the wire. When parsing hits back from xcontent though, in most of the cases (whenever explanation is disabled)
// we can't rebuild the shard target object so we need to set these manually for users retrieval.
private transient String index;
private transient String clusterAlias;
@Nullable
private transient String dataStream;

// For asserting that the method #getSourceAsMap is called just once on the lifetime of this object
private boolean sourceAsMapCalled = false;
Expand Down Expand Up @@ -169,6 +172,54 @@ public SearchHit(
Map<String, DocumentField> documentFields,
Map<String, DocumentField> metaFields,
@Nullable RefCounted refCounted
) {
this(
docId,
score,
rank,
id,
nestedIdentity,
version,
seqNo,
primaryTerm,
source,
highlightFields,
sortValues,
matchedQueries,
explanation,
shard,
index,
clusterAlias,
innerHits,
documentFields,
metaFields,
refCounted,
null
);
}

public SearchHit(
int docId,
float score,
int rank,
Text id,
NestedIdentity nestedIdentity,
long version,
long seqNo,
long primaryTerm,
BytesReference source,
Map<String, HighlightField> highlightFields,
SearchSortValues sortValues,
Map<String, Float> matchedQueries,
Explanation explanation,
SearchShardTarget shard,
String index,
String clusterAlias,
Map<String, SearchHits> innerHits,
Map<String, DocumentField> documentFields,
Map<String, DocumentField> metaFields,
@Nullable RefCounted refCounted,
@Nullable String dataStream
) {
this.docId = docId;
this.score = score;
Expand All @@ -190,6 +241,7 @@ public SearchHit(
this.documentFields = documentFields;
this.metaFields = metaFields;
this.refCounted = refCounted == null ? LeakTracker.wrap(new SimpleRefCounted()) : refCounted;
this.dataStream = dataStream;
}

public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOException {
Expand Down Expand Up @@ -252,6 +304,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept
clusterAlias = shardTarget.getClusterAlias();
}

final String dataStream = DataStream.getDataStreamNameFromIndex(index);

boolean isPooled = pooled && source != null;
final Map<String, SearchHits> innerHits;
int size = in.readVInt();
Expand Down Expand Up @@ -286,7 +340,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept
innerHits,
documentFields,
metaFields,
isPooled ? null : ALWAYS_REFERENCED
isPooled ? null : ALWAYS_REFERENCED,
dataStream
);
}

Expand Down Expand Up @@ -420,6 +475,13 @@ public String getIndex() {
return this.index;
}

/**
* The index of the hit.
*/
public String getDataStream() {
return this.dataStream;
}

/**
* The id of the document.
*/
Expand Down Expand Up @@ -662,6 +724,7 @@ public void shard(SearchShardTarget target) {
if (target != null) {
this.index = target.getIndex();
this.clusterAlias = target.getClusterAlias();
this.dataStream = DataStream.getDataStreamNameFromIndex(target.getIndex());
}
}

Expand Down Expand Up @@ -786,7 +849,8 @@ public SearchHit asUnpooled() {
: innerHits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().asUnpooled())),
cloneIfHashMap(documentFields),
cloneIfHashMap(metaFields),
ALWAYS_REFERENCED
ALWAYS_REFERENCED,
dataStream
);
}

Expand Down Expand Up @@ -818,6 +882,7 @@ public static class Fields {
static final String INNER_HITS = "inner_hits";
static final String _SHARD = "_shard";
static final String _NODE = "_node";
static final String DATA_STREAM = "_data_stream";
}

// Following are the keys for storing the metadata fields and regular fields in the aggregation map.
Expand Down Expand Up @@ -846,6 +911,9 @@ public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) t
if (index != null) {
builder.field(Fields._INDEX, RemoteClusterAware.buildRemoteIndexName(clusterAlias, index));
}
if (dataStream != null) {
builder.field(Fields.DATA_STREAM, dataStream);
}
if (id != null) {
builder.field(Fields._ID, id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2715,6 +2715,43 @@ public void testGetEffectiveIndexTemplateDataStreamMappingsOnly() throws IOExcep
assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate));
}

public void testGetDataStreamNameFromValidBackingIndices() {
// Test a valid backing index name with correct format: .ds-<data-stream>-<yyyy.MM.dd>-<generation>
String indexName = ".ds-my-service-logs-2024.02.05-000001";
String dataStreamName = DataStream.getDataStreamNameFromIndex(indexName);

assertEquals("my-service-logs", dataStreamName);

// Test valid backing index with extra '-' dash in the name
indexName = ".ds-my-service-logs-two-2024.02.05-000001";
dataStreamName = DataStream.getDataStreamNameFromIndex(indexName);

assertEquals("my-service-logs-two", dataStreamName);

}

public void testGetDataStreamNameFromInvalidBackingIndex() {
// Test cases that should not be recognized as valid backing indices
String[] invalidNames = {
"not-a-backing-index", // No .ds- prefix
".ds-", // Missing data stream name
".ds-my-service-logs", // Missing date and generation
".ds-my-service-logs-2024.02.05", // Missing generation
};

for (String invalidName : invalidNames) {
assertNull(
"Should return null for invalid backing index name: " + invalidName,
DataStream.getDataStreamNameFromIndex(invalidName)
);
}
}

public void testGetDataStreamNameFromNullIndex() {
// should return null given null index name
assertNull(DataStream.getDataStreamNameFromIndex(null));
}

private static CompressedXContent randomMappings() {
try {
return new CompressedXContent("{\"_doc\": {\"properties\":{\"" + randomAlphaOfLength(5) + "\":{\"type\":\"keyword\"}}}}");
Expand Down
62 changes: 62 additions & 0 deletions server/src/test/java/org/elasticsearch/search/SearchHitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -476,4 +477,65 @@ static Explanation createExplanation(int depth) {
}
return Explanation.match(value, description, details);
}

public void testShardTargetSetsDataStreamName() {
// Create a SearchHit
SearchHit hit = new SearchHit(1);

try {
// Create a backing index name following the pattern: .ds-<data-stream>-<yyyy.MM.dd>-<generation>
String backingIndexName = ".ds-my-service-logs-2024.02.05-000001";
String nodeId = "node1";
int shardId = 0;
String clusterAlias = null;

// Create SearchShardTarget with the backing index name
SearchShardTarget target = new SearchShardTarget(
nodeId,
new ShardId(new Index(backingIndexName, IndexMetadata.INDEX_UUID_NA_VALUE), shardId),
clusterAlias
);

// Set the shard target
hit.shard(target);

// Verify the data-stream field is set correctly
assertEquals("my-service-logs", hit.getDataStream());
assertEquals(backingIndexName, hit.getIndex());
assertNull(hit.getClusterAlias());
} finally {
hit.decRef();
}
}

public void testShardTargetWithNonDataStreamIndex() {
// Create a SearchHit
SearchHit hit = new SearchHit(1);

try {
// Create a regular index name (not a backing index)
String regularIndexName = "regular-index";
String nodeId = "node1";
int shardId = 0;
String clusterAlias = null;

// Create SearchShardTarget with a non-data-stream-backed index name
SearchShardTarget target = new SearchShardTarget(
nodeId,
new ShardId(new Index(regularIndexName, IndexMetadata.INDEX_UUID_NA_VALUE), shardId),
clusterAlias
);

// Set the shard target
hit.shard(target);

// Verify the data-stream field is null for non-backing indices
assertNull(hit.getDataStream());
assertEquals(regularIndexName, hit.getIndex());
assertNull(hit.getClusterAlias());
} finally {
hit.decRef();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
Expand All @@ -90,10 +89,6 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;

private static final Pattern DS_BACKING_PATTERN = Pattern.compile(
"^(.*?" + DataStream.BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$"
);

private final Client client;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;
Expand Down Expand Up @@ -852,7 +847,7 @@ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String l
// follow a parseable pattern. Instead it would be better to rename it as though
// the data stream name was the leader index name, ending up with
// ".ds-logs-foo-bar_copy-2022-02-02-000001" as the final index name.
Matcher m = DS_BACKING_PATTERN.matcher(leaderIndexName);
Matcher m = DataStream.DS_BACKING_PATTERN.matcher(leaderIndexName);
if (m.find()) {
return m.group(1) + // Prefix including ".ds-"
followPattern.replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, m.group(2)) + // Data stream name changed
Expand Down