diff --git a/docs/changelog/132476.yaml b/docs/changelog/132476.yaml new file mode 100644 index 0000000000000..d4aa1eb4e05b2 --- /dev/null +++ b/docs/changelog/132476.yaml @@ -0,0 +1,5 @@ +pr: 132476 +summary: Add `_data_stream` metadata to `SearchHit` for `_search` response +area: Data streams +type: feature +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index c318159c9ca4c..1fcc3acc51de8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -74,6 +74,8 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS; @@ -97,6 +99,9 @@ public final class DataStream implements SimpleDiffable, ToXContentO public static final String FAILURE_STORE_PREFIX = ".fs-"; public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); public static final String TIMESTAMP_FIELD_NAME = "@timestamp"; + public static final Pattern DS_BACKING_PATTERN = Pattern.compile( + "^(.*?" + BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)$" + ); // Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations public static final Comparator TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> { @@ -1365,6 +1370,24 @@ public static String getDefaultFailureStoreName(String dataStreamName, long gene return getDefaultIndexName(FAILURE_STORE_PREFIX, dataStreamName, generation, epochMillis); } + /** + * Parses the name of the data stream for a backing index if it matches a data stream backing index name format. + * + * @param indexName name of the index + * @return name of the data stream if applicable or else null otherwise + */ + public static String getDataStreamNameFromIndex(String indexName) { + if (indexName == null) { + return null; + } + + Matcher matcher = DataStream.DS_BACKING_PATTERN.matcher(indexName); + if (matcher.matches()) { + return matcher.group(2); + } + return null; + } + /** * Generates the name of the index that conforms to the default naming convention for indices * on data streams given the specified prefix, data stream name, generation, and time. diff --git a/server/src/main/java/org/elasticsearch/search/SearchHit.java b/server/src/main/java/org/elasticsearch/search/SearchHit.java index a9c8e01fa32ac..2712b1a237860 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchHit.java +++ b/server/src/main/java/org/elasticsearch/search/SearchHit.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.Explanation; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -98,11 +99,13 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted @Nullable private SearchShardTarget shard; - // These two fields normally get set when setting the shard target, so they hold the same values as the target thus don't get + // These three fields normally get set when setting the shard target, so they hold the same values as the target thus don't get // serialized over the wire. When parsing hits back from xcontent though, in most of the cases (whenever explanation is disabled) // we can't rebuild the shard target object so we need to set these manually for users retrieval. private transient String index; private transient String clusterAlias; + @Nullable + private transient String dataStream; // For asserting that the method #getSourceAsMap is called just once on the lifetime of this object private boolean sourceAsMapCalled = false; @@ -169,6 +172,54 @@ public SearchHit( Map documentFields, Map metaFields, @Nullable RefCounted refCounted + ) { + this( + docId, + score, + rank, + id, + nestedIdentity, + version, + seqNo, + primaryTerm, + source, + highlightFields, + sortValues, + matchedQueries, + explanation, + shard, + index, + clusterAlias, + innerHits, + documentFields, + metaFields, + refCounted, + null + ); + } + + public SearchHit( + int docId, + float score, + int rank, + Text id, + NestedIdentity nestedIdentity, + long version, + long seqNo, + long primaryTerm, + BytesReference source, + Map highlightFields, + SearchSortValues sortValues, + Map matchedQueries, + Explanation explanation, + SearchShardTarget shard, + String index, + String clusterAlias, + Map innerHits, + Map documentFields, + Map metaFields, + @Nullable RefCounted refCounted, + @Nullable String dataStream ) { this.docId = docId; this.score = score; @@ -190,6 +241,7 @@ public SearchHit( this.documentFields = documentFields; this.metaFields = metaFields; this.refCounted = refCounted == null ? LeakTracker.wrap(new SimpleRefCounted()) : refCounted; + this.dataStream = dataStream; } public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOException { @@ -252,6 +304,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept clusterAlias = shardTarget.getClusterAlias(); } + final String dataStream = DataStream.getDataStreamNameFromIndex(index); + boolean isPooled = pooled && source != null; final Map innerHits; int size = in.readVInt(); @@ -286,7 +340,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept innerHits, documentFields, metaFields, - isPooled ? null : ALWAYS_REFERENCED + isPooled ? null : ALWAYS_REFERENCED, + dataStream ); } @@ -420,6 +475,13 @@ public String getIndex() { return this.index; } + /** + * The index of the hit. + */ + public String getDataStream() { + return this.dataStream; + } + /** * The id of the document. */ @@ -662,6 +724,7 @@ public void shard(SearchShardTarget target) { if (target != null) { this.index = target.getIndex(); this.clusterAlias = target.getClusterAlias(); + this.dataStream = DataStream.getDataStreamNameFromIndex(target.getIndex()); } } @@ -786,7 +849,8 @@ public SearchHit asUnpooled() { : innerHits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().asUnpooled())), cloneIfHashMap(documentFields), cloneIfHashMap(metaFields), - ALWAYS_REFERENCED + ALWAYS_REFERENCED, + dataStream ); } @@ -818,6 +882,7 @@ public static class Fields { static final String INNER_HITS = "inner_hits"; static final String _SHARD = "_shard"; static final String _NODE = "_node"; + static final String DATA_STREAM = "_data_stream"; } // Following are the keys for storing the metadata fields and regular fields in the aggregation map. @@ -846,6 +911,9 @@ public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) t if (index != null) { builder.field(Fields._INDEX, RemoteClusterAware.buildRemoteIndexName(clusterAlias, index)); } + if (dataStream != null) { + builder.field(Fields.DATA_STREAM, dataStream); + } if (id != null) { builder.field(Fields._ID, id); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 63ebe1a855211..4e7353d2c58df 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2715,6 +2715,43 @@ public void testGetEffectiveIndexTemplateDataStreamMappingsOnly() throws IOExcep assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate)); } + public void testGetDataStreamNameFromValidBackingIndices() { + // Test a valid backing index name with correct format: .ds--- + String indexName = ".ds-my-service-logs-2024.02.05-000001"; + String dataStreamName = DataStream.getDataStreamNameFromIndex(indexName); + + assertEquals("my-service-logs", dataStreamName); + + // Test valid backing index with extra '-' dash in the name + indexName = ".ds-my-service-logs-two-2024.02.05-000001"; + dataStreamName = DataStream.getDataStreamNameFromIndex(indexName); + + assertEquals("my-service-logs-two", dataStreamName); + + } + + public void testGetDataStreamNameFromInvalidBackingIndex() { + // Test cases that should not be recognized as valid backing indices + String[] invalidNames = { + "not-a-backing-index", // No .ds- prefix + ".ds-", // Missing data stream name + ".ds-my-service-logs", // Missing date and generation + ".ds-my-service-logs-2024.02.05", // Missing generation + }; + + for (String invalidName : invalidNames) { + assertNull( + "Should return null for invalid backing index name: " + invalidName, + DataStream.getDataStreamNameFromIndex(invalidName) + ); + } + } + + public void testGetDataStreamNameFromNullIndex() { + // should return null given null index name + assertNull(DataStream.getDataStreamNameFromIndex(null)); + } + private static CompressedXContent randomMappings() { try { return new CompressedXContent("{\"_doc\": {\"properties\":{\"" + randomAlphaOfLength(5) + "\":{\"type\":\"keyword\"}}}}"); diff --git a/server/src/test/java/org/elasticsearch/search/SearchHitTests.java b/server/src/test/java/org/elasticsearch/search/SearchHitTests.java index 25a71d04b321d..9495dc0b55342 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchHitTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchHitTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.Explanation; import org.apache.lucene.search.TotalHits; import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -476,4 +477,65 @@ static Explanation createExplanation(int depth) { } return Explanation.match(value, description, details); } + + public void testShardTargetSetsDataStreamName() { + // Create a SearchHit + SearchHit hit = new SearchHit(1); + + try { + // Create a backing index name following the pattern: .ds--- + String backingIndexName = ".ds-my-service-logs-2024.02.05-000001"; + String nodeId = "node1"; + int shardId = 0; + String clusterAlias = null; + + // Create SearchShardTarget with the backing index name + SearchShardTarget target = new SearchShardTarget( + nodeId, + new ShardId(new Index(backingIndexName, IndexMetadata.INDEX_UUID_NA_VALUE), shardId), + clusterAlias + ); + + // Set the shard target + hit.shard(target); + + // Verify the data-stream field is set correctly + assertEquals("my-service-logs", hit.getDataStream()); + assertEquals(backingIndexName, hit.getIndex()); + assertNull(hit.getClusterAlias()); + } finally { + hit.decRef(); + } + } + + public void testShardTargetWithNonDataStreamIndex() { + // Create a SearchHit + SearchHit hit = new SearchHit(1); + + try { + // Create a regular index name (not a backing index) + String regularIndexName = "regular-index"; + String nodeId = "node1"; + int shardId = 0; + String clusterAlias = null; + + // Create SearchShardTarget with a non-data-stream-backed index name + SearchShardTarget target = new SearchShardTarget( + nodeId, + new ShardId(new Index(regularIndexName, IndexMetadata.INDEX_UUID_NA_VALUE), shardId), + clusterAlias + ); + + // Set the shard target + hit.shard(target); + + // Verify the data-stream field is null for non-backing indices + assertNull(hit.getDataStream()); + assertEquals(regularIndexName, hit.getIndex()); + assertNull(hit.getClusterAlias()); + } finally { + hit.decRef(); + } + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 28abebea7fedb..2c8dcfbb3835e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -64,7 +64,6 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.core.Strings.format; @@ -90,10 +89,6 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; - private static final Pattern DS_BACKING_PATTERN = Pattern.compile( - "^(.*?" + DataStream.BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$" - ); - private final Client client; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; @@ -852,7 +847,7 @@ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String l // follow a parseable pattern. Instead it would be better to rename it as though // the data stream name was the leader index name, ending up with // ".ds-logs-foo-bar_copy-2022-02-02-000001" as the final index name. - Matcher m = DS_BACKING_PATTERN.matcher(leaderIndexName); + Matcher m = DataStream.DS_BACKING_PATTERN.matcher(leaderIndexName); if (m.find()) { return m.group(1) + // Prefix including ".ds-" followPattern.replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, m.group(2)) + // Data stream name changed