Skip to content

Commit 7284dd2

Browse files
Deduplicate Heavy CCR Repository CS Requests (#91398) (#95372)
We run the same request back to back for each put-follower call during the restore. Also, concurrent put-follower calls will all run the same full CS request concurrently. In older versions prior to #87235 the concurrency was limited by the size of the snapshot pool. With that fix though, they are run at almost arbitry concurrency when many put-follow requests are executed concurrently. -> fixed by using the existing deduplicator to only run a single remote CS request at a time for each CCR repository. Also, this removes the needless forking in the put-follower action that is not necessary any longer now that we have the CCR repository non-blocking (we do the same for normal restores that can safely be started from a transport thread), which should fix some bad-ux situations where the snapshot threads are busy on master, making the put-follower requests not go through in time.
1 parent 2b42144 commit 7284dd2

File tree

4 files changed

+126
-33
lines changed

4 files changed

+126
-33
lines changed

docs/changelog/91398.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 91398
2+
summary: Deduplicate Heavy CCR Repository CS Requests
3+
area: CCR
4+
type: bug
5+
issues: []
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.transport;
10+
11+
import org.apache.lucene.util.SetOnce;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.SingleResultDeduplicator;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.common.util.concurrent.ThreadContext;
16+
import org.elasticsearch.test.ESTestCase;
17+
18+
public class SingleResultDeduplicatorTests extends ESTestCase {
19+
20+
public void testDeduplicatesWithoutShowingStaleData() {
21+
final SetOnce<ActionListener<Object>> firstListenerRef = new SetOnce<>();
22+
final SetOnce<ActionListener<Object>> secondListenerRef = new SetOnce<>();
23+
final SingleResultDeduplicator<Object> deduplicator = new SingleResultDeduplicator<>(new ThreadContext(Settings.EMPTY), l -> {
24+
if (firstListenerRef.trySet(l) == false) {
25+
secondListenerRef.set(l);
26+
}
27+
});
28+
final Object result1 = new Object();
29+
final Object result2 = new Object();
30+
31+
final int totalListeners = randomIntBetween(2, 10);
32+
final boolean[] called = new boolean[totalListeners];
33+
deduplicator.execute(new ActionListener<Object>() {
34+
@Override
35+
public void onResponse(Object response) {
36+
assertFalse(called[0]);
37+
called[0] = true;
38+
assertEquals(result1, response);
39+
}
40+
41+
@Override
42+
public void onFailure(Exception e) {
43+
throw new AssertionError(e);
44+
}
45+
});
46+
47+
for (int i = 1; i < totalListeners; i++) {
48+
final int index = i;
49+
deduplicator.execute(new ActionListener<Object>() {
50+
51+
@Override
52+
public void onResponse(Object response) {
53+
assertFalse(called[index]);
54+
called[index] = true;
55+
assertEquals(result2, response);
56+
}
57+
58+
@Override
59+
public void onFailure(Exception e) {
60+
throw new AssertionError(e);
61+
}
62+
});
63+
}
64+
for (int i = 0; i < totalListeners; i++) {
65+
assertFalse(called[i]);
66+
}
67+
firstListenerRef.get().onResponse(result1);
68+
assertTrue(called[0]);
69+
for (int i = 1; i < totalListeners; i++) {
70+
assertFalse(called[i]);
71+
}
72+
secondListenerRef.get().onResponse(result2);
73+
for (int i = 0; i < totalListeners; i++) {
74+
assertTrue(called[i]);
75+
}
76+
}
77+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.ActionListener;
2121
import org.elasticsearch.action.ActionResponse;
2222
import org.elasticsearch.action.ActionRunnable;
23+
import org.elasticsearch.action.SingleResultDeduplicator;
2324
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2425
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
2526
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@@ -143,6 +144,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
143144

144145
private final CounterMetric throttledTime = new CounterMetric();
145146

147+
private final SingleResultDeduplicator<ClusterState> csDeduplicator;
148+
146149
public CcrRepository(
147150
RepositoryMetadata metadata,
148151
Client client,
@@ -159,6 +162,17 @@ public CcrRepository(
159162
this.ccrLicenseChecker = ccrLicenseChecker;
160163
this.client = client;
161164
this.threadPool = threadPool;
165+
csDeduplicator = new SingleResultDeduplicator<>(
166+
threadPool.getThreadContext(),
167+
l -> getRemoteClusterClient().admin()
168+
.cluster()
169+
.prepareState()
170+
.clear()
171+
.setMetadata(true)
172+
.setNodes(true)
173+
.setMasterNodeTimeout(TimeValue.MAX_VALUE)
174+
.execute(l.map(ClusterStateResponse::getState))
175+
);
162176
}
163177

164178
@Override
@@ -191,27 +205,22 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) {
191205
assert snapshotIds.size() == 1 && SNAPSHOT_ID.equals(snapshotIds.iterator().next())
192206
: "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds;
193207
try {
194-
getRemoteClusterClient().admin()
195-
.cluster()
196-
.prepareState()
197-
.clear()
198-
.setMetadata(true)
199-
.setNodes(true)
200-
// fork to the snapshot meta pool because the context expects to run on it and asserts that it does
201-
.execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> {
202-
Metadata responseMetadata = response.getState().metadata();
208+
csDeduplicator.execute(
209+
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> {
210+
Metadata responseMetadata = response.metadata();
203211
ImmutableOpenMap<String, IndexMetadata> indicesMap = responseMetadata.indices();
204-
List<String> indices = new ArrayList<>(indicesMap.keySet());
205212
return new SnapshotInfo(
206213
new Snapshot(this.metadata.name(), SNAPSHOT_ID),
207-
indices,
208-
new ArrayList<>(responseMetadata.dataStreams().keySet()),
209-
Collections.emptyList(),
210-
response.getState().getNodes().getMaxNodeVersion(),
214+
org.elasticsearch.core.List.copyOf(indicesMap.keySet()),
215+
org.elasticsearch.core.List.copyOf(responseMetadata.dataStreams().keySet()),
216+
org.elasticsearch.core.List.of(),
217+
response.getNodes().getMaxNodeVersion(),
211218
SnapshotState.SUCCESS
212219
);
213-
}), false));
220+
}), false)
221+
);
214222
} catch (Exception e) {
223+
assert false : e;
215224
context.onFailure(e);
216225
}
217226
}
@@ -275,8 +284,8 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
275284
@Override
276285
public void getRepositoryData(ActionListener<RepositoryData> listener) {
277286
try {
278-
getRemoteClusterClient().admin().cluster().prepareState().clear().setMetadata(true).execute(listener.map(response -> {
279-
final Metadata remoteMetadata = response.getState().getMetadata();
287+
csDeduplicator.execute(listener.map(response -> {
288+
final Metadata remoteMetadata = response.getMetadata();
280289
final String[] concreteAllIndices = remoteMetadata.getConcreteAllIndices();
281290
final Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
282291
final Map<String, RepositoryData.SnapshotDetails> snapshotsDetails = new HashMap<>();
@@ -308,6 +317,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
308317
);
309318
}));
310319
} catch (Exception e) {
320+
assert false;
311321
listener.onFailure(e);
312322
}
313323
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRepositoryRetentionLeaseTests.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.settings.ClusterSettings;
1616
import org.elasticsearch.common.settings.Setting;
1717
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1819
import org.elasticsearch.index.Index;
1920
import org.elasticsearch.index.seqno.RetentionLeaseActions;
2021
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
@@ -55,14 +56,7 @@ public void testWhenRetentionLeaseAlreadyExistsWeTryToRenewIt() {
5556
CcrSettings.getSettings().stream().filter(Setting::hasNodeScope)
5657
).collect(Collectors.toSet());
5758

58-
final CcrRepository repository = new CcrRepository(
59-
repositoryMetadata,
60-
mock(Client.class),
61-
new CcrLicenseChecker(() -> true, () -> true),
62-
Settings.EMPTY,
63-
new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
64-
mock(ThreadPool.class)
65-
);
59+
final CcrRepository repository = createCcrRepository(repositoryMetadata, settings);
6660

6761
final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0);
6862
final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0);
@@ -112,6 +106,20 @@ public void testWhenRetentionLeaseAlreadyExistsWeTryToRenewIt() {
112106
verifyNoMoreInteractions(remoteClient);
113107
}
114108

109+
private static CcrRepository createCcrRepository(RepositoryMetadata repositoryMetadata, Set<Setting<?>> settings) {
110+
final ThreadPool threadPool = mock(ThreadPool.class);
111+
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
112+
when(threadPool.getThreadContext()).thenReturn(threadContext);
113+
return new CcrRepository(
114+
repositoryMetadata,
115+
mock(Client.class),
116+
new CcrLicenseChecker(() -> true, () -> true),
117+
Settings.EMPTY,
118+
new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
119+
threadPool
120+
);
121+
}
122+
115123
public void testWhenRetentionLeaseExpiresBeforeWeCanRenewIt() {
116124
final RepositoryMetadata repositoryMetadata = mock(RepositoryMetadata.class);
117125
when(repositoryMetadata.name()).thenReturn(CcrRepository.NAME_PREFIX);
@@ -120,14 +128,7 @@ public void testWhenRetentionLeaseExpiresBeforeWeCanRenewIt() {
120128
CcrSettings.getSettings().stream().filter(Setting::hasNodeScope)
121129
).collect(Collectors.toSet());
122130

123-
final CcrRepository repository = new CcrRepository(
124-
repositoryMetadata,
125-
mock(Client.class),
126-
new CcrLicenseChecker(() -> true, () -> true),
127-
Settings.EMPTY,
128-
new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
129-
mock(ThreadPool.class)
130-
);
131+
final CcrRepository repository = createCcrRepository(repositoryMetadata, settings);
131132

132133
final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0);
133134
final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0);

0 commit comments

Comments
 (0)