Skip to content

Commit 9545aec

Browse files
committed
Add _data_stream field to Search Hits response
1 parent 311a722 commit 9545aec

File tree

5 files changed

+179
-9
lines changed

5 files changed

+179
-9
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
import java.util.function.Function;
7575
import java.util.function.LongSupplier;
7676
import java.util.function.Predicate;
77+
import java.util.regex.Matcher;
78+
import java.util.regex.Pattern;
7779
import java.util.stream.Collectors;
7880

7981
import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
@@ -97,6 +99,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
9799
public static final String FAILURE_STORE_PREFIX = ".fs-";
98100
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
99101
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
102+
public static final Pattern DS_BACKING_PATTERN = Pattern.compile(
103+
"^(.*?" + BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$"
104+
);
100105

101106
// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
102107
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
@@ -1365,6 +1370,24 @@ public static String getDefaultFailureStoreName(String dataStreamName, long gene
13651370
return getDefaultIndexName(FAILURE_STORE_PREFIX, dataStreamName, generation, epochMillis);
13661371
}
13671372

1373+
/**
1374+
* Parses the name of the data stream for a backing index if it matches a data stream backing index name format.
1375+
*
1376+
* @param indexName name of the index
1377+
* @return name of the data stream if applicable or else null otherwise
1378+
*/
1379+
public static String getDataStreamNameFromIndex(String indexName) {
1380+
if (indexName == null) {
1381+
return null;
1382+
}
1383+
1384+
Matcher matcher = DataStream.DS_BACKING_PATTERN.matcher(indexName);
1385+
if (matcher.matches()) {
1386+
return matcher.group(2);
1387+
}
1388+
return null;
1389+
}
1390+
13681391
/**
13691392
* Generates the name of the index that conforms to the default naming convention for indices
13701393
* on data streams given the specified prefix, data stream name, generation, and time.

server/src/main/java/org/elasticsearch/search/SearchHit.java

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.search.Explanation;
1313
import org.elasticsearch.ElasticsearchParseException;
1414
import org.elasticsearch.TransportVersions;
15+
import org.elasticsearch.cluster.metadata.DataStream;
1516
import org.elasticsearch.common.Strings;
1617
import org.elasticsearch.common.bytes.BytesArray;
1718
import org.elasticsearch.common.bytes.BytesReference;
@@ -98,11 +99,13 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted
9899
@Nullable
99100
private SearchShardTarget shard;
100101

101-
// These two fields normally get set when setting the shard target, so they hold the same values as the target thus don't get
102+
// These three fields normally get set when setting the shard target, so they hold the same values as the target thus don't get
102103
// serialized over the wire. When parsing hits back from xcontent though, in most of the cases (whenever explanation is disabled)
103104
// we can't rebuild the shard target object so we need to set these manually for users retrieval.
104105
private transient String index;
105106
private transient String clusterAlias;
107+
@Nullable
108+
private transient String dataStream;
106109

107110
// For asserting that the method #getSourceAsMap is called just once on the lifetime of this object
108111
private boolean sourceAsMapCalled = false;
@@ -169,6 +172,54 @@ public SearchHit(
169172
Map<String, DocumentField> documentFields,
170173
Map<String, DocumentField> metaFields,
171174
@Nullable RefCounted refCounted
175+
) {
176+
this(
177+
docId,
178+
score,
179+
rank,
180+
id,
181+
nestedIdentity,
182+
version,
183+
seqNo,
184+
primaryTerm,
185+
source,
186+
highlightFields,
187+
sortValues,
188+
matchedQueries,
189+
explanation,
190+
shard,
191+
index,
192+
clusterAlias,
193+
innerHits,
194+
documentFields,
195+
metaFields,
196+
refCounted,
197+
null
198+
);
199+
}
200+
201+
public SearchHit(
202+
int docId,
203+
float score,
204+
int rank,
205+
Text id,
206+
NestedIdentity nestedIdentity,
207+
long version,
208+
long seqNo,
209+
long primaryTerm,
210+
BytesReference source,
211+
Map<String, HighlightField> highlightFields,
212+
SearchSortValues sortValues,
213+
Map<String, Float> matchedQueries,
214+
Explanation explanation,
215+
SearchShardTarget shard,
216+
String index,
217+
String clusterAlias,
218+
Map<String, SearchHits> innerHits,
219+
Map<String, DocumentField> documentFields,
220+
Map<String, DocumentField> metaFields,
221+
@Nullable RefCounted refCounted,
222+
@Nullable String dataStream
172223
) {
173224
this.docId = docId;
174225
this.score = score;
@@ -190,6 +241,7 @@ public SearchHit(
190241
this.documentFields = documentFields;
191242
this.metaFields = metaFields;
192243
this.refCounted = refCounted == null ? LeakTracker.wrap(new SimpleRefCounted()) : refCounted;
244+
this.dataStream = dataStream;
193245
}
194246

195247
public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOException {
@@ -252,6 +304,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept
252304
clusterAlias = shardTarget.getClusterAlias();
253305
}
254306

307+
final String dataStream = DataStream.getDataStreamNameFromIndex(index);
308+
255309
boolean isPooled = pooled && source != null;
256310
final Map<String, SearchHits> innerHits;
257311
int size = in.readVInt();
@@ -286,7 +340,8 @@ public static SearchHit readFrom(StreamInput in, boolean pooled) throws IOExcept
286340
innerHits,
287341
documentFields,
288342
metaFields,
289-
isPooled ? null : ALWAYS_REFERENCED
343+
isPooled ? null : ALWAYS_REFERENCED,
344+
dataStream
290345
);
291346
}
292347

@@ -420,6 +475,13 @@ public String getIndex() {
420475
return this.index;
421476
}
422477

478+
/**
479+
* The index of the hit.
480+
*/
481+
public String getDataStream() {
482+
return this.dataStream;
483+
}
484+
423485
/**
424486
* The id of the document.
425487
*/
@@ -662,6 +724,7 @@ public void shard(SearchShardTarget target) {
662724
if (target != null) {
663725
this.index = target.getIndex();
664726
this.clusterAlias = target.getClusterAlias();
727+
this.dataStream = DataStream.getDataStreamNameFromIndex(target.getIndex());
665728
}
666729
}
667730

@@ -786,7 +849,8 @@ public SearchHit asUnpooled() {
786849
: innerHits.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().asUnpooled())),
787850
cloneIfHashMap(documentFields),
788851
cloneIfHashMap(metaFields),
789-
ALWAYS_REFERENCED
852+
ALWAYS_REFERENCED,
853+
dataStream
790854
);
791855
}
792856

@@ -818,6 +882,7 @@ public static class Fields {
818882
static final String INNER_HITS = "inner_hits";
819883
static final String _SHARD = "_shard";
820884
static final String _NODE = "_node";
885+
static final String DATA_STREAM = "_data_stream";
821886
}
822887

823888
// Following are the keys for storing the metadata fields and regular fields in the aggregation map.
@@ -846,6 +911,9 @@ public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) t
846911
if (index != null) {
847912
builder.field(Fields._INDEX, RemoteClusterAware.buildRemoteIndexName(clusterAlias, index));
848913
}
914+
if (dataStream != null) {
915+
builder.field(Fields.DATA_STREAM, dataStream);
916+
}
849917
if (id != null) {
850918
builder.field(Fields._ID, id);
851919
}

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2715,6 +2715,36 @@ public void testGetEffectiveIndexTemplateDataStreamMappingsOnly() throws IOExcep
27152715
assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate));
27162716
}
27172717

2718+
public void testGetDataStreamNameFromValidBackingIndex() {
2719+
// Test a valid backing index name with correct format: .ds-<data-stream>-<yyyy.MM.dd>-<generation>
2720+
String indexName = ".ds-my-service-logs-2024.02.05-000001";
2721+
String dataStreamName = DataStream.getDataStreamNameFromIndex(indexName);
2722+
2723+
assertEquals("my-service-logs", dataStreamName);
2724+
}
2725+
2726+
public void testGetDataStreamNameFromInvalidBackingIndex() {
2727+
// Test cases that should not be recognized as valid backing indices
2728+
String[] invalidNames = {
2729+
"not-a-backing-index", // No .ds- prefix
2730+
".ds-", // Missing data stream name
2731+
".ds-logs", // Missing date and generation
2732+
".ds-logs-2024.02.05", // Missing generation
2733+
};
2734+
2735+
for (String invalidName : invalidNames) {
2736+
assertNull(
2737+
"Should return null for invalid backing index name: " + invalidName,
2738+
DataStream.getDataStreamNameFromIndex(invalidName)
2739+
);
2740+
}
2741+
}
2742+
2743+
public void testGetDataStreamNameFromNullIndex() {
2744+
// should return null given null index name
2745+
assertNull(DataStream.getDataStreamNameFromIndex(null));
2746+
}
2747+
27182748
private static CompressedXContent randomMappings() {
27192749
try {
27202750
return new CompressedXContent("{\"_doc\": {\"properties\":{\"" + randomAlphaOfLength(5) + "\":{\"type\":\"keyword\"}}}}");

server/src/test/java/org/elasticsearch/search/SearchHitTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.search.Explanation;
1313
import org.apache.lucene.search.TotalHits;
1414
import org.elasticsearch.TransportVersion;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1516
import org.elasticsearch.common.Strings;
1617
import org.elasticsearch.common.bytes.BytesArray;
1718
import org.elasticsearch.common.bytes.BytesReference;
@@ -476,4 +477,57 @@ static Explanation createExplanation(int depth) {
476477
}
477478
return Explanation.match(value, description, details);
478479
}
480+
481+
public void testShardTargetSetsDataStreamName() {
482+
// Create a SearchHit
483+
SearchHit hit = new SearchHit(1);
484+
485+
// Create a backing index name following the pattern: .ds-<data-stream>-<yyyy.MM.dd>-<generation>
486+
String backingIndexName = ".ds-my-service-logs-2024.02.05-000001";
487+
String nodeId = "node1";
488+
int shardId = 0;
489+
String clusterAlias = null;
490+
491+
// Create SearchShardTarget with the backing index name
492+
SearchShardTarget target = new SearchShardTarget(
493+
nodeId,
494+
new ShardId(new Index(backingIndexName, IndexMetadata.INDEX_UUID_NA_VALUE), shardId),
495+
clusterAlias
496+
);
497+
498+
// Set the shard target
499+
hit.shard(target);
500+
501+
// Verify the data-stream field is set correctly
502+
assertEquals("my-service-logs", hit.getDataStream());
503+
assertEquals(backingIndexName, hit.getIndex());
504+
assertNull(hit.getClusterAlias());
505+
}
506+
507+
public void testShardTargetWithNonDataStreamIndex() {
508+
// Create a SearchHit
509+
SearchHit hit = new SearchHit(1);
510+
511+
// Create a regular index name (not a backing index)
512+
String regularIndexName = "regular-index";
513+
String nodeId = "node1";
514+
int shardId = 0;
515+
String clusterAlias = null;
516+
517+
// Create SearchShardTarget with a non-data-stream-backed index name
518+
SearchShardTarget target = new SearchShardTarget(
519+
nodeId,
520+
new ShardId(new Index(regularIndexName, IndexMetadata.INDEX_UUID_NA_VALUE), shardId),
521+
clusterAlias
522+
);
523+
524+
// Set the shard target
525+
hit.shard(target);
526+
527+
// Verify the data-stream field is null for non-backing indices
528+
assertNull(hit.getDataStream());
529+
assertEquals(regularIndexName, hit.getIndex());
530+
assertNull(hit.getClusterAlias());
531+
}
532+
479533
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import java.util.function.LongSupplier;
6565
import java.util.function.Supplier;
6666
import java.util.regex.Matcher;
67-
import java.util.regex.Pattern;
6867
import java.util.stream.Collectors;
6968

7069
import static org.elasticsearch.core.Strings.format;
@@ -90,10 +89,6 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
9089
private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
9190
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
9291

93-
private static final Pattern DS_BACKING_PATTERN = Pattern.compile(
94-
"^(.*?" + DataStream.BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$"
95-
);
96-
9792
private final Client client;
9893
private final ClusterService clusterService;
9994
private final CcrLicenseChecker ccrLicenseChecker;
@@ -852,7 +847,7 @@ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String l
852847
// follow a parseable pattern. Instead it would be better to rename it as though
853848
// the data stream name was the leader index name, ending up with
854849
// ".ds-logs-foo-bar_copy-2022-02-02-000001" as the final index name.
855-
Matcher m = DS_BACKING_PATTERN.matcher(leaderIndexName);
850+
Matcher m = DataStream.DS_BACKING_PATTERN.matcher(leaderIndexName);
856851
if (m.find()) {
857852
return m.group(1) + // Prefix including ".ds-"
858853
followPattern.replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, m.group(2)) + // Data stream name changed

0 commit comments

Comments
 (0)