Skip to content

Commit 0392de0

Browse files
Expose the add retention lease api as REST API in snapshot builds (#120299)
In this PR, we are exposing the "add retention lease" API as a REST API in snapshot builds as well. Once available we will use it in the Rally track to properly add a retention lease before calling the shard changes API. As a result, all required sequence numbers will be retained resulting in the shard changes API running correctly.
1 parent f9ccb89 commit 0392de0

File tree

3 files changed

+678
-0
lines changed

3 files changed

+678
-0
lines changed
Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.logsdb.seqno;
9+
10+
import org.apache.http.util.EntityUtils;
11+
import org.elasticsearch.client.Request;
12+
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.client.ResponseException;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.common.UUIDs;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.xcontent.XContentHelper;
18+
import org.elasticsearch.rest.RestStatus;
19+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
20+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.elasticsearch.xcontent.json.JsonXContent;
23+
import org.junit.ClassRule;
24+
25+
import java.io.IOException;
26+
import java.time.Instant;
27+
import java.time.ZoneOffset;
28+
import java.time.format.DateTimeFormatter;
29+
import java.util.List;
30+
import java.util.Locale;
31+
import java.util.Map;
32+
33+
public class RetentionLeaseRestIT extends ESRestTestCase {
34+
private static final String ADD_RETENTION_LEASE_ENDPOINT = "/%s/seq_no/add_retention_lease";
35+
private static final String BULK_INDEX_ENDPOINT = "/%s/_bulk";
36+
private static final String[] DOCUMENT_NAMES = { "alpha", "beta", "gamma", "delta" };
37+
38+
@ClassRule
39+
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
40+
.distribution(DistributionType.DEFAULT)
41+
.setting("xpack.security.enabled", "false")
42+
.setting("xpack.license.self_generated.type", "trial")
43+
.build();
44+
45+
@Override
46+
protected String getTestRestCluster() {
47+
return cluster.getHttpAddresses();
48+
}
49+
50+
public void testAddRetentionLeaseSuccessfully() throws IOException {
51+
final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
52+
createIndex(
53+
indexName,
54+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
55+
);
56+
assertTrue(indexExists(indexName));
57+
58+
assertOK(bulkIndex(indexName, randomIntBetween(10, 20)));
59+
60+
final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName));
61+
final String retentionLeaseId = randomAlphaOfLength(6);
62+
final String retentionLeaseSource = randomAlphaOfLength(8);
63+
retentionLeaseRequest.addParameter("id", retentionLeaseId);
64+
retentionLeaseRequest.addParameter("source", retentionLeaseSource);
65+
66+
final Response response = client().performRequest(retentionLeaseRequest);
67+
assertOK(response);
68+
69+
assertRetentionLeaseResponseContent(response, indexName, indexName, retentionLeaseId, retentionLeaseSource);
70+
assertRetentionLeaseExists(indexName, retentionLeaseId, retentionLeaseSource);
71+
}
72+
73+
public void testAddRetentionLeaseWithoutIdAndSource() throws IOException {
74+
final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
75+
createIndex(
76+
indexName,
77+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
78+
);
79+
assertTrue(indexExists(indexName));
80+
81+
assertOK(bulkIndex(indexName, randomIntBetween(10, 20)));
82+
83+
final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName));
84+
85+
final Response response = client().performRequest(retentionLeaseRequest);
86+
assertOK(response);
87+
88+
assertRetentionLeaseResponseContent(response, indexName, indexName, null, null);
89+
}
90+
91+
public void testAddRetentionLeaseToDataStream() throws IOException {
92+
final String templateName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT);
93+
assertOK(createIndexTemplate(templateName, """
94+
{
95+
"index_patterns": [ "test-*-*" ],
96+
"data_stream": {},
97+
"priority": 100,
98+
"template": {
99+
"settings": {
100+
"number_of_shards": 1,
101+
"number_of_replicas": 0
102+
},
103+
"mappings": {
104+
"properties": {
105+
"@timestamp": {
106+
"type": "date"
107+
},
108+
"name": {
109+
"type": "keyword"
110+
}
111+
}
112+
}
113+
}
114+
}
115+
"""));
116+
117+
final String dataStreamName = "test-"
118+
+ randomAlphanumericOfLength(5).toLowerCase(Locale.ROOT)
119+
+ "-"
120+
+ randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
121+
assertOK(createDataStream(dataStreamName));
122+
assertOK(bulkIndex(dataStreamName, randomIntBetween(10, 20)));
123+
124+
final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, dataStreamName));
125+
final String retentionLeaseId = randomAlphaOfLength(6);
126+
final String retentionLeaseSource = randomAlphaOfLength(8);
127+
retentionLeaseRequest.addParameter("id", retentionLeaseId);
128+
retentionLeaseRequest.addParameter("source", retentionLeaseSource);
129+
130+
final Response response = client().performRequest(retentionLeaseRequest);
131+
assertOK(response);
132+
133+
final String dataStreamBackingIndex = getFirstBackingIndex(dataStreamName);
134+
assertRetentionLeaseResponseContent(response, dataStreamName, dataStreamBackingIndex, retentionLeaseId, retentionLeaseSource);
135+
assertRetentionLeaseExists(dataStreamBackingIndex, retentionLeaseId, retentionLeaseSource);
136+
}
137+
138+
public void testAddRetentionLeaseUsingAlias() throws IOException {
139+
final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
140+
createIndex(
141+
indexName,
142+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
143+
);
144+
assertTrue(indexExists(indexName));
145+
146+
final String aliasName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT);
147+
final Request putAliasRequest = new Request("PUT", "/" + indexName + "/_alias/" + aliasName);
148+
assertOK(client().performRequest(putAliasRequest));
149+
150+
assertOK(bulkIndex(aliasName, randomIntBetween(10, 20)));
151+
152+
final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, aliasName));
153+
final String retentionLeaseId = randomAlphaOfLength(6);
154+
final String retentionLeaseSource = randomAlphaOfLength(8);
155+
retentionLeaseRequest.addParameter("id", retentionLeaseId);
156+
retentionLeaseRequest.addParameter("source", retentionLeaseSource);
157+
158+
final Response response = client().performRequest(retentionLeaseRequest);
159+
assertOK(response);
160+
161+
assertRetentionLeaseResponseContent(response, aliasName, indexName, retentionLeaseId, retentionLeaseSource);
162+
assertRetentionLeaseExists(indexName, retentionLeaseId, retentionLeaseSource);
163+
}
164+
165+
public void testAddRetentionLeaseMissingIndex() throws IOException {
166+
final String missingIndexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
167+
assertFalse(indexExists(missingIndexName));
168+
169+
final Request retentionLeaseRequest = new Request(
170+
"PUT",
171+
String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, missingIndexName)
172+
);
173+
final ResponseException exception = assertThrows(ResponseException.class, () -> client().performRequest(retentionLeaseRequest));
174+
assertResponseException(exception, RestStatus.BAD_REQUEST, "Error adding retention lease for [" + missingIndexName + "]");
175+
}
176+
177+
public void testAddRetentionLeaseInvalidParameters() throws IOException {
178+
final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
179+
createIndex(
180+
indexName,
181+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
182+
);
183+
assertTrue(indexExists(indexName));
184+
assertOK(bulkIndex(indexName, randomIntBetween(10, 20)));
185+
186+
final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName));
187+
retentionLeaseRequest.addParameter("id", null);
188+
retentionLeaseRequest.addParameter("source", randomBoolean() ? UUIDs.randomBase64UUID() : "test-source");
189+
190+
final ResponseException exception = assertThrows(ResponseException.class, () -> client().performRequest(retentionLeaseRequest));
191+
assertResponseException(exception, RestStatus.BAD_REQUEST, "retention lease ID can not be empty");
192+
}
193+
194+
public void testAddMultipleRetentionLeasesForSameShard() throws IOException {
195+
final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
196+
createIndex(
197+
indexName,
198+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
199+
);
200+
assertTrue(indexExists(indexName));
201+
assertOK(bulkIndex(indexName, randomIntBetween(10, 20)));
202+
203+
int numberOfLeases = randomIntBetween(2, 5);
204+
for (int i = 0; i < numberOfLeases; i++) {
205+
final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName));
206+
retentionLeaseRequest.addParameter("id", "lease-" + i);
207+
retentionLeaseRequest.addParameter("source", "test-source-" + i);
208+
209+
final Response response = client().performRequest(retentionLeaseRequest);
210+
assertOK(response);
211+
212+
assertRetentionLeaseResponseContent(response, indexName, indexName, "lease-" + i, "test-source-" + i);
213+
}
214+
215+
for (int i = 0; i < numberOfLeases; i++) {
216+
assertRetentionLeaseExists(indexName, "lease-" + i, "test-source-" + i);
217+
}
218+
}
219+
220+
private static Response bulkIndex(final String indexName, int numberOfDocuments) throws IOException {
221+
final StringBuilder sb = new StringBuilder();
222+
long timestamp = System.currentTimeMillis();
223+
224+
for (int i = 0; i < numberOfDocuments; i++) {
225+
sb.append(
226+
String.format(
227+
Locale.ROOT,
228+
"{ \"index\": {} }\n{ \"@timestamp\": \"%s\", \"name\": \"%s\" }\n",
229+
Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
230+
randomFrom(DOCUMENT_NAMES)
231+
)
232+
);
233+
timestamp += 1000;
234+
}
235+
236+
final Request request = new Request("POST", String.format(Locale.ROOT, BULK_INDEX_ENDPOINT, indexName));
237+
request.setJsonEntity(sb.toString());
238+
request.addParameter("refresh", "true");
239+
return client().performRequest(request);
240+
}
241+
242+
private void assertResponseException(final ResponseException exception, final RestStatus expectedStatus, final String expectedMessage) {
243+
assertEquals(expectedStatus.getStatus(), exception.getResponse().getStatusLine().getStatusCode());
244+
assertTrue(exception.getMessage().contains(expectedMessage));
245+
}
246+
247+
private Map<String, Object> getRetentionLeases(final String indexName) throws IOException {
248+
final Request statsRequest = new Request("GET", "/" + indexName + "/_stats");
249+
statsRequest.addParameter("level", "shards");
250+
251+
final Response response = client().performRequest(statsRequest);
252+
assertOK(response);
253+
254+
final Map<String, Object> responseMap = XContentHelper.convertToMap(
255+
JsonXContent.jsonXContent,
256+
EntityUtils.toString(response.getEntity()),
257+
false
258+
);
259+
260+
@SuppressWarnings("unchecked")
261+
final Map<String, Object> indices = (Map<String, Object>) responseMap.get("indices");
262+
if (indices == null || indices.containsKey(indexName) == false) {
263+
throw new IllegalArgumentException("No shard stats found for: " + indexName);
264+
}
265+
266+
@SuppressWarnings("unchecked")
267+
final Map<String, Object> shards = (Map<String, Object>) ((Map<String, Object>) indices.get(indexName)).get("shards");
268+
269+
@SuppressWarnings("unchecked")
270+
final List<Map<String, Object>> shardList = (List<Map<String, Object>>) shards.get("0");
271+
272+
return getRetentionLeases(indexName, shardList);
273+
}
274+
275+
private static Map<String, Object> getRetentionLeases(final String indexName, final List<Map<String, Object>> shardList) {
276+
final Map<String, Object> shardStats = shardList.getFirst();
277+
278+
@SuppressWarnings("unchecked")
279+
final Map<String, Object> retentionLeases = (Map<String, Object>) shardStats.get("retention_leases");
280+
if (retentionLeases == null) {
281+
throw new IllegalArgumentException("No retention leases found for shard 0 of index: " + indexName);
282+
}
283+
return retentionLeases;
284+
}
285+
286+
private void assertRetentionLeaseExists(
287+
final String indexAbstractionName,
288+
final String expectedRetentionLeaseId,
289+
final String expectedRetentionLeaseSource
290+
) throws IOException {
291+
final Map<String, Object> retentionLeases = getRetentionLeases(indexAbstractionName);
292+
293+
@SuppressWarnings("unchecked")
294+
final List<Map<String, Object>> leases = (List<Map<String, Object>>) retentionLeases.get("leases");
295+
296+
boolean retentionLeaseExists = leases.stream().anyMatch(lease -> {
297+
final String id = (String) lease.get("id");
298+
final String source = (String) lease.get("source");
299+
return expectedRetentionLeaseId.equals(id) && expectedRetentionLeaseSource.equals(source);
300+
});
301+
302+
assertTrue(
303+
"Retention lease with ID [" + expectedRetentionLeaseId + "] and source [" + expectedRetentionLeaseSource + "] does not exist.",
304+
retentionLeaseExists
305+
);
306+
}
307+
308+
private Response createDataStream(final String dataStreamName) throws IOException {
309+
return client().performRequest(new Request("PUT", "/_data_stream/" + dataStreamName));
310+
}
311+
312+
private String getFirstBackingIndex(final String dataStreamName) throws IOException {
313+
final Response response = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
314+
315+
final Map<String, Object> responseMap = XContentHelper.convertToMap(
316+
JsonXContent.jsonXContent,
317+
EntityUtils.toString(response.getEntity()),
318+
false
319+
);
320+
321+
@SuppressWarnings("unchecked")
322+
final List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) responseMap.get("data_streams");
323+
324+
if (dataStreams == null || dataStreams.isEmpty()) {
325+
throw new IllegalArgumentException("No data stream found for name: " + dataStreamName);
326+
}
327+
328+
@SuppressWarnings("unchecked")
329+
final List<Map<String, Object>> backingIndices = (List<Map<String, Object>>) dataStreams.get(0).get("indices");
330+
331+
if (backingIndices == null || backingIndices.isEmpty()) {
332+
throw new IllegalArgumentException("No backing indices found for data stream: " + dataStreamName);
333+
}
334+
335+
return (String) backingIndices.getFirst().get("index_name");
336+
}
337+
338+
private static Response createIndexTemplate(final String templateName, final String mappings) throws IOException {
339+
final Request request = new Request("PUT", "/_index_template/" + templateName);
340+
request.setJsonEntity(mappings);
341+
return client().performRequest(request);
342+
}
343+
344+
private void assertRetentionLeaseResponseContent(
345+
final Response response,
346+
final String expectedIndexAbstraction,
347+
final String expectedConcreteIndex,
348+
final String expectedLeaseId,
349+
final String expectedLeaseSource
350+
) throws IOException {
351+
final Map<String, Object> responseBody = XContentHelper.convertToMap(
352+
JsonXContent.jsonXContent,
353+
EntityUtils.toString(response.getEntity()),
354+
false
355+
);
356+
357+
assertEquals("Unexpected index abstraction in response", expectedIndexAbstraction, responseBody.get("index_abstraction"));
358+
assertEquals("Unexpected concrete index in response", expectedConcreteIndex, responseBody.get("index"));
359+
assertNotNull("Shard ID missing in response", responseBody.get("shard_id"));
360+
361+
if (expectedLeaseId != null) {
362+
assertEquals("Unexpected lease ID in response", expectedLeaseId, responseBody.get("id"));
363+
}
364+
if (expectedLeaseSource != null) {
365+
assertEquals("Unexpected lease source in response", expectedLeaseSource, responseBody.get("source"));
366+
}
367+
}
368+
}

0 commit comments

Comments
 (0)