Skip to content

Commit c8a153f

Browse files
committed
Fix unnecessary refreshes during update retry conflicts
When update operations with retry_on_conflict encounter version conflicts, each retry attempt was triggering the original refresh policy (IMMEDIATE, WAIT_UNTIL), causing unnecessary refresh operations that degrade performance. This change suppresses the refresh policy to NONE for retry attempts while preserving the original policy for the initial attempt. The refresh will still happen once when the update eventually succeeds or fails permanently. Fixes opensearch-project#15261 Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent cff74ff commit c8a153f

File tree

6 files changed

+196
-4
lines changed

6 files changed

+196
-4
lines changed

server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.action.delete.DeleteResponse;
4242
import org.opensearch.action.get.GetResponse;
4343
import org.opensearch.action.index.IndexResponse;
44+
import org.opensearch.action.support.WriteRequest;
4445
import org.opensearch.action.update.UpdateRequest;
4546
import org.opensearch.action.update.UpdateRequestBuilder;
4647
import org.opensearch.action.update.UpdateResponse;
@@ -898,6 +899,29 @@ private void waitForOutstandingRequests(TimeValue timeOut, Semaphore requestsOut
898899
}
899900
}
900901

902+
public void testUpdateRetryOnConflictDoesNotRefreshOnFailedAttempts() throws Exception {
903+
assertAcked(prepareCreate("test"));
904+
905+
client().prepareIndex("test").setId("1")
906+
.setSource(XContentFactory.jsonBuilder().startObject().field("field", "value1").endObject())
907+
.get();
908+
909+
client().prepareUpdate("test", "1")
910+
.setDoc(XContentFactory.jsonBuilder().startObject().field("field", "value2").endObject())
911+
.setRetryOnConflict(3)
912+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
913+
.get();
914+
915+
GetResponse getResponse = client().prepareGet("test", "1").get();
916+
assertTrue(getResponse.isExists());
917+
assertEquals("value2", getResponse.getSourceAsMap().get("field"));
918+
919+
UpdateRequest updateRequest = new UpdateRequest("test", "1");
920+
assertFalse(updateRequest.isRetryAttempt());
921+
updateRequest.markAsRetryAttempt();
922+
assertTrue(updateRequest.isRetryAttempt());
923+
}
924+
901925
private static String indexOrAlias() {
902926
return randomBoolean() ? "test" : "alias";
903927
}

server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ private void handleUpdateFailureWithRetry(
381381
request.getShardId(),
382382
request.id()
383383
);
384+
// Mark this as a retry attempt to suppress refresh on failure
385+
request.markAsRetryAttempt();
384386
threadPool.executor(executor(request.getShardId()))
385387
.execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1)));
386388
return;

server/src/main/java/org/opensearch/action/update/UpdateHelper.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.action.DocWriteResponse;
3939
import org.opensearch.action.delete.DeleteRequest;
4040
import org.opensearch.action.index.IndexRequest;
41+
import org.opensearch.action.support.WriteRequest;
4142
import org.opensearch.common.Nullable;
4243
import org.opensearch.common.collect.Tuple;
4344
import org.opensearch.common.io.stream.BytesStreamOutput;
@@ -180,7 +181,9 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
180181

181182
indexRequest.index(request.index())
182183
.id(request.id())
183-
.setRefreshPolicy(request.getRefreshPolicy())
184+
.setRefreshPolicy(request.isRetryAttempt() ?
185+
WriteRequest.RefreshPolicy.NONE :
186+
request.getRefreshPolicy())
184187
.routing(request.routing())
185188
.timeout(request.timeout())
186189
.waitForActiveShards(request.waitForActiveShards())
@@ -255,7 +258,9 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
255258
.setIfPrimaryTerm(getResult.getPrimaryTerm())
256259
.waitForActiveShards(request.waitForActiveShards())
257260
.timeout(request.timeout())
258-
.setRefreshPolicy(request.getRefreshPolicy());
261+
.setRefreshPolicy(request.isRetryAttempt() ?
262+
WriteRequest.RefreshPolicy.NONE :
263+
request.getRefreshPolicy());
259264
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
260265
}
261266
}
@@ -298,7 +303,9 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
298303
.setIfPrimaryTerm(getResult.getPrimaryTerm())
299304
.waitForActiveShards(request.waitForActiveShards())
300305
.timeout(request.timeout())
301-
.setRefreshPolicy(request.getRefreshPolicy());
306+
.setRefreshPolicy(request.isRetryAttempt() ?
307+
WriteRequest.RefreshPolicy.NONE :
308+
request.getRefreshPolicy());
302309
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
303310
case DELETE:
304311
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
@@ -308,7 +315,9 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
308315
.setIfPrimaryTerm(getResult.getPrimaryTerm())
309316
.waitForActiveShards(request.waitForActiveShards())
310317
.timeout(request.timeout())
311-
.setRefreshPolicy(request.getRefreshPolicy());
318+
.setRefreshPolicy(request.isRetryAttempt() ?
319+
WriteRequest.RefreshPolicy.NONE :
320+
request.getRefreshPolicy());
312321
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
313322
default:
314323
// If it was neither an INDEX or DELETE operation, treat it as a noop

server/src/main/java/org/opensearch/action/update/UpdateRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
145145
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
146146

147147
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
148+
private transient boolean isRetryAttempt = false;
148149

149150
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
150151

@@ -616,6 +617,15 @@ public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
616617
public RefreshPolicy getRefreshPolicy() {
617618
return refreshPolicy;
618619
}
620+
621+
public UpdateRequest markAsRetryAttempt() {
622+
this.isRetryAttempt = true;
623+
return this;
624+
}
625+
626+
public boolean isRetryAttempt() {
627+
return this.isRetryAttempt;
628+
}
619629

620630
public ActiveShardCount waitForActiveShards() {
621631
return this.waitForActiveShards;
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.update;
10+
11+
import org.opensearch.action.delete.DeleteRequest;
12+
import org.opensearch.action.index.IndexRequest;
13+
import org.opensearch.action.support.WriteRequest;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
/**
17+
* Tests for refresh policy behavior during update retries (issue #15261).
18+
*/
19+
public class UpdateRefreshPolicyTests extends OpenSearchTestCase {
20+
21+
public void testRefreshPolicySuppression() {
22+
// Normal request should preserve refresh policy
23+
UpdateRequest normalRequest = new UpdateRequest("test", "1");
24+
normalRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
25+
26+
IndexRequest normalIndexRequest = new IndexRequest("test");
27+
normalIndexRequest.setRefreshPolicy(
28+
normalRequest.isRetryAttempt() ?
29+
WriteRequest.RefreshPolicy.NONE :
30+
normalRequest.getRefreshPolicy()
31+
);
32+
33+
assertEquals(
34+
"Normal request should preserve IMMEDIATE refresh policy",
35+
WriteRequest.RefreshPolicy.IMMEDIATE,
36+
normalIndexRequest.getRefreshPolicy()
37+
);
38+
39+
// Retry request should suppress refresh policy
40+
UpdateRequest retryRequest = new UpdateRequest("test", "1");
41+
retryRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
42+
retryRequest.markAsRetryAttempt();
43+
44+
IndexRequest retryIndexRequest = new IndexRequest("test");
45+
retryIndexRequest.setRefreshPolicy(
46+
retryRequest.isRetryAttempt() ?
47+
WriteRequest.RefreshPolicy.NONE :
48+
retryRequest.getRefreshPolicy()
49+
);
50+
51+
assertEquals(
52+
"Retry request should suppress refresh policy to NONE",
53+
WriteRequest.RefreshPolicy.NONE,
54+
retryIndexRequest.getRefreshPolicy()
55+
);
56+
}
57+
58+
public void testRefreshPolicySuppressionForDeleteRequest() {
59+
// Normal delete should preserve refresh policy
60+
UpdateRequest normalRequest = new UpdateRequest("test", "1");
61+
normalRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
62+
63+
DeleteRequest normalDeleteRequest = new DeleteRequest("test", "1");
64+
normalDeleteRequest.setRefreshPolicy(
65+
normalRequest.isRetryAttempt() ?
66+
WriteRequest.RefreshPolicy.NONE :
67+
normalRequest.getRefreshPolicy()
68+
);
69+
70+
assertEquals(
71+
"Normal delete request should preserve WAIT_UNTIL refresh policy",
72+
WriteRequest.RefreshPolicy.WAIT_UNTIL,
73+
normalDeleteRequest.getRefreshPolicy()
74+
);
75+
76+
// Retry delete should suppress refresh policy
77+
UpdateRequest retryRequest = new UpdateRequest("test", "1");
78+
retryRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
79+
retryRequest.markAsRetryAttempt();
80+
81+
DeleteRequest retryDeleteRequest = new DeleteRequest("test", "1");
82+
retryDeleteRequest.setRefreshPolicy(
83+
retryRequest.isRetryAttempt() ?
84+
WriteRequest.RefreshPolicy.NONE :
85+
retryRequest.getRefreshPolicy()
86+
);
87+
88+
assertEquals(
89+
"Retry delete request should suppress refresh policy to NONE",
90+
WriteRequest.RefreshPolicy.NONE,
91+
retryDeleteRequest.getRefreshPolicy()
92+
);
93+
}
94+
95+
public void testAllRefreshPolicyValues() {
96+
WriteRequest.RefreshPolicy[] allPolicies = {
97+
WriteRequest.RefreshPolicy.IMMEDIATE,
98+
WriteRequest.RefreshPolicy.WAIT_UNTIL,
99+
WriteRequest.RefreshPolicy.NONE
100+
};
101+
102+
for (WriteRequest.RefreshPolicy originalPolicy : allPolicies) {
103+
// Normal request - should preserve original policy
104+
UpdateRequest normalRequest = new UpdateRequest("test", "1");
105+
normalRequest.setRefreshPolicy(originalPolicy);
106+
107+
WriteRequest.RefreshPolicy normalResultPolicy = normalRequest.isRetryAttempt() ?
108+
WriteRequest.RefreshPolicy.NONE :
109+
normalRequest.getRefreshPolicy();
110+
111+
assertEquals(
112+
"Normal request should preserve original policy: " + originalPolicy,
113+
originalPolicy,
114+
normalResultPolicy
115+
);
116+
117+
// Retry request - should always be NONE regardless of original policy
118+
UpdateRequest retryRequest = new UpdateRequest("test", "1");
119+
retryRequest.setRefreshPolicy(originalPolicy);
120+
retryRequest.markAsRetryAttempt();
121+
122+
WriteRequest.RefreshPolicy retryResultPolicy = retryRequest.isRetryAttempt() ?
123+
WriteRequest.RefreshPolicy.NONE :
124+
retryRequest.getRefreshPolicy();
125+
126+
assertEquals(
127+
"Retry request should always suppress to NONE, original was: " + originalPolicy,
128+
WriteRequest.RefreshPolicy.NONE,
129+
retryResultPolicy
130+
);
131+
}
132+
}
133+
134+
}

server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,4 +709,17 @@ public void testGetChildIndexRequests() {
709709
assertEquals(childRequests.size(), 1);
710710
assertEquals(childRequests.get(0), docRequest);
711711
}
712+
713+
public void testRetryAttemptMarking() {
714+
UpdateRequest request = new UpdateRequest("test", "1");
715+
assertFalse(request.isRetryAttempt());
716+
717+
UpdateRequest returnedRequest = request.markAsRetryAttempt();
718+
assertSame(request, returnedRequest);
719+
assertTrue(request.isRetryAttempt());
720+
721+
UpdateRequest newRequest = new UpdateRequest("test", "1");
722+
newRequest.markAsRetryAttempt();
723+
assertTrue(newRequest.isRetryAttempt());
724+
}
712725
}

0 commit comments

Comments
 (0)