Skip to content

Commit e61c66c

Browse files
committed
Fix synchronization in ShardFollowNodeTask (#60490)
The leader mapping, settings, and aliases versions in a shard follow-task are updated without proper synchronization and can go backward.
1 parent 54f1e4a commit e61c66c

File tree

2 files changed

+107
-4
lines changed

2 files changed

+107
-4
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,11 @@ void start(
150150
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
151151
updateMapping(0L, leaderMappingVersion -> {
152152
synchronized (ShardFollowNodeTask.this) {
153-
currentMappingVersion = leaderMappingVersion;
153+
currentMappingVersion = Math.max(currentMappingVersion, leaderMappingVersion);
154154
}
155155
updateSettings(leaderSettingsVersion -> {
156156
synchronized (ShardFollowNodeTask.this) {
157-
currentSettingsVersion = leaderSettingsVersion;
157+
currentSettingsVersion = Math.max(currentSettingsVersion, leaderSettingsVersion);
158158
}
159159
LOGGER.info(
160160
"{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]",
@@ -431,7 +431,9 @@ private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion,
431431
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
432432
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
433433
updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
434-
currentMappingVersion = mappingVersion;
434+
synchronized (ShardFollowNodeTask.this) {
435+
currentMappingVersion = Math.max(currentMappingVersion, mappingVersion);
436+
}
435437
task.run();
436438
});
437439
}
@@ -446,7 +448,9 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
446448
LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]",
447449
params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
448450
updateSettings(settingsVersion -> {
449-
currentSettingsVersion = settingsVersion;
451+
synchronized (ShardFollowNodeTask.this) {
452+
currentSettingsVersion = Math.max(currentSettingsVersion, settingsVersion);
453+
}
450454
task.run();
451455
});
452456
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.elasticsearch.index.translog.Translog;
2121
import org.elasticsearch.test.ESTestCase;
2222
import org.elasticsearch.threadpool.Scheduler;
23+
import org.elasticsearch.threadpool.TestThreadPool;
24+
import org.elasticsearch.threadpool.ThreadPool;
2325
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
2426
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
2527

@@ -33,6 +35,7 @@
3335
import java.util.Map;
3436
import java.util.Queue;
3537
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.Phaser;
3639
import java.util.concurrent.ScheduledFuture;
3740
import java.util.concurrent.ScheduledThreadPoolExecutor;
3841
import java.util.concurrent.TimeUnit;
@@ -1035,6 +1038,102 @@ public void testRetentionLeaseRenewal() throws InterruptedException {
10351038
}
10361039
}
10371040

1041+
public void testUpdateMappingSettingsAndAliasesConcurrently() throws Exception {
1042+
final ShardFollowTask followTask = new ShardFollowTask(
1043+
"test",
1044+
new ShardId("leader_index", "", 0),
1045+
new ShardId("follow_index", "", 0),
1046+
Integer.MAX_VALUE,
1047+
Integer.MAX_VALUE,
1048+
Integer.MAX_VALUE,
1049+
Integer.MAX_VALUE,
1050+
new ByteSizeValue(Long.MAX_VALUE),
1051+
new ByteSizeValue(Long.MAX_VALUE),
1052+
Integer.MAX_VALUE,
1053+
new ByteSizeValue(Long.MAX_VALUE),
1054+
TimeValue.ZERO,
1055+
TimeValue.ZERO,
1056+
Collections.emptyMap()
1057+
);
1058+
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName());
1059+
final AtomicLong leaderMappingVersion = new AtomicLong(0L);
1060+
final AtomicLong followerMappingVersion = new AtomicLong(0L);
1061+
final AtomicLong leaderSettingsVersion = new AtomicLong(0L);
1062+
final AtomicLong followerSettingsVersion = new AtomicLong(0L);
1063+
1064+
final Phaser updates = new Phaser(1);
1065+
final ShardFollowNodeTask shardFollowNodeTask = new ShardFollowNodeTask(
1066+
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {
1067+
@Override
1068+
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
1069+
updates.register();
1070+
final long fetchedVersion = randomLongBetween(minRequiredMappingVersion, leaderMappingVersion.get());
1071+
followerMappingVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
1072+
threadPool.generic().execute(() -> {
1073+
handler.accept(fetchedVersion);
1074+
updates.arriveAndDeregister();
1075+
});
1076+
}
1077+
1078+
@Override
1079+
protected void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler) {
1080+
updates.register();
1081+
final long fetchedVersion = randomLongBetween(0L, leaderSettingsVersion.get());
1082+
followerSettingsVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
1083+
threadPool.generic().execute(() -> {
1084+
handler.accept(fetchedVersion);
1085+
updates.arriveAndDeregister();
1086+
});
1087+
}
1088+
1089+
@Override
1090+
protected void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
1091+
List<Translog.Operation> operations,
1092+
long leaderMaxSeqNoOfUpdatesOrDeletes,
1093+
Consumer<BulkShardOperationsResponse> handler,
1094+
Consumer<Exception> errorHandler) {
1095+
1096+
}
1097+
1098+
@Override
1099+
protected void innerSendShardChangesRequest(long from, int maxOperationCount,
1100+
Consumer<ShardChangesAction.Response> handler,
1101+
Consumer<Exception> errorHandler) {
1102+
1103+
}
1104+
1105+
@Override
1106+
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint) {
1107+
return null;
1108+
}
1109+
1110+
@Override
1111+
synchronized void coordinateReads() {
1112+
1113+
}
1114+
};
1115+
int responses = between(10, 5000);
1116+
for (int i = 0; i < responses; i++) {
1117+
ShardChangesAction.Response response = new ShardChangesAction.Response(
1118+
leaderMappingVersion.addAndGet(between(0, Integer.MAX_VALUE)),
1119+
leaderSettingsVersion.addAndGet(between(0, Integer.MAX_VALUE)),
1120+
SequenceNumbers.NO_OPS_PERFORMED,
1121+
SequenceNumbers.NO_OPS_PERFORMED,
1122+
-1,
1123+
new Translog.Operation[0],
1124+
randomLong()
1125+
);
1126+
shardFollowNodeTask.handleReadResponse(0, -1, response);
1127+
}
1128+
try {
1129+
updates.arriveAndAwaitAdvance();
1130+
final ShardFollowNodeTaskStatus status = shardFollowNodeTask.getStatus();
1131+
assertThat(status.followerMappingVersion(), equalTo(followerMappingVersion.get()));
1132+
assertThat(status.followerSettingsVersion(), equalTo(followerSettingsVersion.get()));
1133+
} finally {
1134+
terminate(threadPool);
1135+
}
1136+
}
10381137

10391138
static final class ShardFollowTaskParams {
10401139
private String remoteCluster = null;

0 commit comments

Comments
 (0)