Skip to content

Commit b7f013f

Browse files
Adding version checks to remote entities using bytestream ser/de (#20080)
* Adding version checks to remote entities using bytestream ser/de Signed-off-by: Harsh Garg <[email protected]>
1 parent d47931e commit b7f013f

23 files changed

+284
-111
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9595
- Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948))
9696
- Keep track and release Reactor Netty 4 Transport accepted Http Channels during the Node shutdown ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106))
9797
- Fix deletion failure/error of unused index template; case when an index template matches a data stream but has a lower priority. ([#20102](https://github.com/opensearch-project/OpenSearch/pull/20102))
98+
- Fixed version incompatibility in remote state entities using bytestream for ser/de ([#20080](https://github.com/opensearch-project/OpenSearch/pull/20080))
9899
- Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([#20105](https://github.com/opensearch-project/OpenSearch/pull/20105))
99100
- Fixed handling of property index in BulkRequest during deserialization ([#20132](https://github.com/opensearch-project/OpenSearch/pull/20132))
100101
- Fix negative CPU usage values in node stats ([#19120](https://github.com/opensearch-project/OpenSearch/issues/19120))

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.Version;
1415
import org.opensearch.action.LatchedActionListener;
1516
import org.opensearch.cluster.Diff;
1617
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -182,15 +183,16 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
182183
public void getAsyncIndexRoutingReadAction(
183184
String clusterUUID,
184185
String uploadedFilename,
185-
LatchedActionListener<IndexRoutingTable> latchedActionListener
186+
LatchedActionListener<IndexRoutingTable> latchedActionListener,
187+
Version version
186188
) {
187189

188190
ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
189191
latchedActionListener::onResponse,
190192
latchedActionListener::onFailure
191193
);
192194

193-
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
195+
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor, version);
194196

195197
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
196198
}
@@ -199,14 +201,15 @@ public void getAsyncIndexRoutingReadAction(
199201
public void getAsyncIndexRoutingTableDiffReadAction(
200202
String clusterUUID,
201203
String uploadedFilename,
202-
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
204+
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
205+
Version version
203206
) {
204207
ActionListener<Diff<RoutingTable>> actionListener = ActionListener.wrap(
205208
latchedActionListener::onResponse,
206209
latchedActionListener::onFailure
207210
);
208211

209-
RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor);
212+
RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor, version);
210213
remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener);
211214
}
212215

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.cluster.routing.remote;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.action.LatchedActionListener;
1213
import org.opensearch.cluster.Diff;
1314
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -71,7 +72,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
7172
public void getAsyncIndexRoutingReadAction(
7273
String clusterUUID,
7374
String uploadedFilename,
74-
LatchedActionListener<IndexRoutingTable> latchedActionListener
75+
LatchedActionListener<IndexRoutingTable> latchedActionListener,
76+
Version version
7577
) {
7678
// noop
7779
}
@@ -80,7 +82,8 @@ public void getAsyncIndexRoutingReadAction(
8082
public void getAsyncIndexRoutingTableDiffReadAction(
8183
String clusterUUID,
8284
String uploadedFilename,
83-
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
85+
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
86+
Version version
8487
) {
8588
// noop
8689
}

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.cluster.routing.remote;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.action.LatchedActionListener;
1213
import org.opensearch.cluster.Diff;
1314
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -31,13 +32,15 @@ public interface RemoteRoutingTableService extends LifecycleComponent {
3132
void getAsyncIndexRoutingReadAction(
3233
String clusterUUID,
3334
String uploadedFilename,
34-
LatchedActionListener<IndexRoutingTable> latchedActionListener
35+
LatchedActionListener<IndexRoutingTable> latchedActionListener,
36+
Version version
3537
);
3638

3739
void getAsyncIndexRoutingTableDiffReadAction(
3840
String clusterUUID,
3941
String uploadedFilename,
40-
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
42+
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
43+
Version version
4144
);
4245

4346
List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.Version;
1415
import org.opensearch.action.LatchedActionListener;
1516
import org.opensearch.cluster.ClusterName;
1617
import org.opensearch.cluster.ClusterState;
@@ -1260,6 +1261,14 @@ ClusterState readClusterStateInParallel(
12601261
AtomicReference<Diff<RoutingTable>> readIndexRoutingTableDiffResults = new AtomicReference<>();
12611262
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));
12621263

1264+
if (manifest.getOpensearchVersion() != Version.CURRENT) {
1265+
logger.info(
1266+
"Reading cluster state on version {} from manifest uploaded with version {}",
1267+
Version.CURRENT,
1268+
manifest.getOpensearchVersion()
1269+
);
1270+
}
1271+
12631272
LatchedActionListener<RemoteReadResult> listener = new LatchedActionListener<>(ActionListener.wrap(response -> {
12641273
logger.debug("Successfully read cluster state component from remote");
12651274
readResults.add(response);
@@ -1296,7 +1305,8 @@ ClusterState readClusterStateInParallel(
12961305
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
12971306
clusterUUID,
12981307
indexRouting.getUploadedFilename(),
1299-
routingTableLatchedActionListener
1308+
routingTableLatchedActionListener,
1309+
manifest.getOpensearchVersion()
13001310
);
13011311
}
13021312

@@ -1315,7 +1325,8 @@ ClusterState readClusterStateInParallel(
13151325
remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction(
13161326
clusterUUID,
13171327
manifest.getDiffManifest().getIndicesRoutingDiffPath(),
1318-
routingTableDiffLatchedActionListener
1328+
routingTableDiffLatchedActionListener,
1329+
manifest.getOpensearchVersion()
13191330
);
13201331
}
13211332

@@ -1392,7 +1403,8 @@ ClusterState readClusterStateInParallel(
13921403
new RemoteDiscoveryNodes(
13931404
manifest.getDiscoveryNodesMetadata().getUploadedFilename(),
13941405
clusterUUID,
1395-
blobStoreRepository.getCompressor()
1406+
blobStoreRepository.getCompressor(),
1407+
manifest.getOpensearchVersion()
13961408
),
13971409
listener
13981410
);
@@ -1404,7 +1416,8 @@ ClusterState readClusterStateInParallel(
14041416
new RemoteClusterBlocks(
14051417
manifest.getClusterBlocksMetadata().getUploadedFilename(),
14061418
clusterUUID,
1407-
blobStoreRepository.getCompressor()
1419+
blobStoreRepository.getCompressor(),
1420+
manifest.getOpensearchVersion()
14081421
),
14091422
listener
14101423
);
@@ -1416,7 +1429,8 @@ ClusterState readClusterStateInParallel(
14161429
new RemoteHashesOfConsistentSettings(
14171430
manifest.getHashesOfConsistentSettings().getUploadedFilename(),
14181431
clusterUUID,
1419-
blobStoreRepository.getCompressor()
1432+
blobStoreRepository.getCompressor(),
1433+
manifest.getOpensearchVersion()
14201434
),
14211435
listener
14221436
);
@@ -1431,7 +1445,8 @@ ClusterState readClusterStateInParallel(
14311445
entry.getValue().getAttributeName(),
14321446
clusterUUID,
14331447
blobStoreRepository.getCompressor(),
1434-
namedWriteableRegistry
1448+
namedWriteableRegistry,
1449+
manifest.getOpensearchVersion()
14351450
),
14361451
listener
14371452
);

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.block.ClusterBlocks;
1213
import org.opensearch.common.io.Streams;
1314
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
@@ -32,10 +33,7 @@
3233
public class RemoteClusterBlocks extends AbstractClusterMetadataWriteableBlobEntity<ClusterBlocks> {
3334

3435
public static final String CLUSTER_BLOCKS = "blocks";
35-
public static final ChecksumWritableBlobStoreFormat<ClusterBlocks> CLUSTER_BLOCKS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
36-
"blocks",
37-
ClusterBlocks::readFrom
38-
);
36+
public final ChecksumWritableBlobStoreFormat<ClusterBlocks> clusterBlocksFormat;
3937

4038
private ClusterBlocks clusterBlocks;
4139
private long stateVersion;
@@ -44,11 +42,13 @@ public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion,
4442
super(clusterUUID, compressor, null);
4543
this.clusterBlocks = clusterBlocks;
4644
this.stateVersion = stateVersion;
45+
this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom);
4746
}
4847

49-
public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor) {
48+
public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) {
5049
super(clusterUUID, compressor, null);
5150
this.blobName = blobName;
51+
this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom, version);
5252
}
5353

5454
@Override
@@ -83,11 +83,11 @@ public UploadedMetadata getUploadedMetadata() {
8383

8484
@Override
8585
public InputStream serialize() throws IOException {
86-
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
86+
return this.clusterBlocksFormat.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
8787
}
8888

8989
@Override
9090
public ClusterBlocks deserialize(final InputStream inputStream) throws IOException {
91-
return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
91+
return this.clusterBlocksFormat.deserialize(blobName, Streams.readFully(inputStream));
9292
}
9393
}

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.ClusterState;
1213
import org.opensearch.cluster.ClusterState.Custom;
1314
import org.opensearch.common.io.Streams;
@@ -65,15 +66,17 @@ public RemoteClusterStateCustoms(
6566
final String customType,
6667
final String clusterUUID,
6768
final Compressor compressor,
68-
final NamedWriteableRegistry namedWriteableRegistry
69+
final NamedWriteableRegistry namedWriteableRegistry,
70+
final Version version
6971
) {
7072
super(clusterUUID, compressor, null);
7173
this.blobName = blobName;
7274
this.customType = customType;
7375
this.namedWriteableRegistry = namedWriteableRegistry;
7476
this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>(
7577
"cluster-state-custom",
76-
is -> readFrom(is, namedWriteableRegistry, customType)
78+
is -> readFrom(is, namedWriteableRegistry, customType),
79+
version
7780
);
7881
}
7982

server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,11 @@ public RemoteCustomMetadata(
7575
this.blobName = blobName;
7676
this.customType = customType;
7777
this.namedWriteableRegistry = namedWriteableRegistry;
78-
this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>("custom", is -> {
79-
is.setVersion(version);
80-
return readFrom(is, namedWriteableRegistry, customType);
81-
});
78+
this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>(
79+
"custom",
80+
is -> readFrom(is, namedWriteableRegistry, customType),
81+
version
82+
);
8283
}
8384

8485
@Override

server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.node.DiscoveryNodes;
1213
import org.opensearch.common.io.Streams;
1314
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
@@ -32,10 +33,7 @@
3233
public class RemoteDiscoveryNodes extends AbstractClusterMetadataWriteableBlobEntity<DiscoveryNodes> {
3334

3435
public static final String DISCOVERY_NODES = "nodes";
35-
public static final ChecksumWritableBlobStoreFormat<DiscoveryNodes> DISCOVERY_NODES_FORMAT = new ChecksumWritableBlobStoreFormat<>(
36-
"nodes",
37-
is -> DiscoveryNodes.readFrom(is, null)
38-
);
36+
public final ChecksumWritableBlobStoreFormat<DiscoveryNodes> discoveryNodesFormat;
3937

4038
private DiscoveryNodes discoveryNodes;
4139
private long stateVersion;
@@ -49,11 +47,13 @@ public RemoteDiscoveryNodes(
4947
super(clusterUUID, compressor, null);
5048
this.discoveryNodes = discoveryNodes;
5149
this.stateVersion = stateVersion;
50+
this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null));
5251
}
5352

54-
public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor) {
53+
public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) {
5554
super(clusterUUID, compressor, null);
5655
this.blobName = blobName;
56+
this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null), version);
5757
}
5858

5959
@Override
@@ -88,7 +88,7 @@ public UploadedMetadata getUploadedMetadata() {
8888

8989
@Override
9090
public InputStream serialize() throws IOException {
91-
return DISCOVERY_NODES_FORMAT.serialize(
91+
return discoveryNodesFormat.serialize(
9292
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
9393
discoveryNodes,
9494
generateBlobFileName(),
@@ -98,6 +98,6 @@ public InputStream serialize() throws IOException {
9898

9999
@Override
100100
public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException {
101-
return DISCOVERY_NODES_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
101+
return discoveryNodesFormat.deserialize(blobName, Streams.readFully(inputStream));
102102
}
103103
}

server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.cluster.metadata.DiffableStringMap;
1213
import org.opensearch.common.io.Streams;
1314
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
@@ -30,8 +31,7 @@
3031
*/
3132
public class RemoteHashesOfConsistentSettings extends AbstractClusterMetadataWriteableBlobEntity<DiffableStringMap> {
3233
public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings";
33-
public static final ChecksumWritableBlobStoreFormat<DiffableStringMap> HASHES_OF_CONSISTENT_SETTINGS_FORMAT =
34-
new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", DiffableStringMap::readFrom);
34+
public final ChecksumWritableBlobStoreFormat<DiffableStringMap> hashesOfConsistentSettingsFormat;
3535

3636
private DiffableStringMap hashesOfConsistentSettings;
3737
private long metadataVersion;
@@ -45,11 +45,25 @@ public RemoteHashesOfConsistentSettings(
4545
super(clusterUUID, compressor, null);
4646
this.metadataVersion = metadataVersion;
4747
this.hashesOfConsistentSettings = hashesOfConsistentSettings;
48+
this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>(
49+
"hashes-of-consistent-settings",
50+
DiffableStringMap::readFrom
51+
);
4852
}
4953

50-
public RemoteHashesOfConsistentSettings(final String blobName, final String clusterUUID, final Compressor compressor) {
54+
public RemoteHashesOfConsistentSettings(
55+
final String blobName,
56+
final String clusterUUID,
57+
final Compressor compressor,
58+
final Version version
59+
) {
5160
super(clusterUUID, compressor, null);
5261
this.blobName = blobName;
62+
this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>(
63+
"hashes-of-consistent-settings",
64+
DiffableStringMap::readFrom,
65+
version
66+
);
5367
}
5468

5569
@Override
@@ -83,12 +97,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
8397

8498
@Override
8599
public InputStream serialize() throws IOException {
86-
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
100+
return hashesOfConsistentSettingsFormat.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
87101
.streamInput();
88102
}
89103

90104
@Override
91105
public DiffableStringMap deserialize(final InputStream inputStream) throws IOException {
92-
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
106+
return hashesOfConsistentSettingsFormat.deserialize(blobName, Streams.readFully(inputStream));
93107
}
94108
}

0 commit comments

Comments
 (0)