20
20
import org .elasticsearch .common .settings .Settings ;
21
21
import org .elasticsearch .core .Strings ;
22
22
import org .elasticsearch .core .TimeValue ;
23
+ import org .elasticsearch .core .Tuple ;
23
24
import org .elasticsearch .reservedstate .action .ReservedClusterSettingsAction ;
24
25
import org .elasticsearch .snapshots .AbstractSnapshotIntegTestCase ;
25
26
import org .elasticsearch .snapshots .SnapshotState ;
@@ -85,9 +86,10 @@ private void writeJSONFile(String node, String json) throws Exception {
85
86
Files .move (tempFilePath , fileSettingsService .operatorSettingsFile (), StandardCopyOption .ATOMIC_MOVE );
86
87
}
87
88
88
- private CountDownLatch setupClusterStateListener (String node ) {
89
+ private Tuple < CountDownLatch , AtomicLong > setupClusterStateListener (String node ) {
89
90
ClusterService clusterService = internalCluster ().clusterService (node );
90
91
CountDownLatch savedClusterState = new CountDownLatch (1 );
92
+ AtomicLong metadataVersion = new AtomicLong (-1 );
91
93
clusterService .addListener (new ClusterStateListener () {
92
94
@ Override
93
95
public void clusterChanged (ClusterChangedEvent event ) {
@@ -99,20 +101,21 @@ public void clusterChanged(ClusterChangedEvent event) {
99
101
}
100
102
if (handlerMetadata .keys ().contains ("indices.recovery.max_bytes_per_sec" )) {
101
103
clusterService .removeListener (this );
104
+ metadataVersion .set (event .state ().metadata ().version ());
102
105
savedClusterState .countDown ();
103
106
}
104
107
}
105
108
}
106
109
});
107
110
108
- return savedClusterState ;
111
+ return new Tuple <>( savedClusterState , metadataVersion ) ;
109
112
}
110
113
111
- private ClusterStateResponse assertClusterStateSaveOK (CountDownLatch savedClusterState ) throws Exception {
114
+ private ClusterStateResponse assertClusterStateSaveOK (CountDownLatch savedClusterState , AtomicLong metadataVersion ) throws Exception {
112
115
boolean awaitSuccessful = savedClusterState .await (20 , TimeUnit .SECONDS );
113
116
assertTrue (awaitSuccessful );
114
117
115
- return clusterAdmin ().state (new ClusterStateRequest ()).actionGet ();
118
+ return clusterAdmin ().state (new ClusterStateRequest (). waitForMetadataVersion ( metadataVersion . get ()) ).actionGet ();
116
119
}
117
120
118
121
public void testRestoreWithRemovedFileSettings () throws Exception {
@@ -138,7 +141,7 @@ public void testRestoreWithRemovedFileSettings() throws Exception {
138
141
139
142
logger .info ("--> write some file based settings, putting some reserved state" );
140
143
writeJSONFile (masterNode , testFileSettingsJSON );
141
- final ClusterStateResponse savedStateResponse = assertClusterStateSaveOK (savedClusterState );
144
+ final ClusterStateResponse savedStateResponse = assertClusterStateSaveOK (savedClusterState . v1 (), savedClusterState . v2 () );
142
145
assertThat (
143
146
savedStateResponse .getState ().metadata ().persistentSettings ().get (INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING .getKey ()),
144
147
equalTo ("50mb" )
@@ -195,26 +198,29 @@ public void testRestoreWithRemovedFileSettings() throws Exception {
195
198
}
196
199
}
197
200
198
- private CountDownLatch removedReservedClusterStateListener (String node ) {
201
+ private Tuple < CountDownLatch , AtomicLong > removedReservedClusterStateListener (String node ) {
199
202
ClusterService clusterService = internalCluster ().clusterService (node );
200
203
CountDownLatch savedClusterState = new CountDownLatch (1 );
204
+ AtomicLong metadataVersion = new AtomicLong (-1 );
201
205
clusterService .addListener (new ClusterStateListener () {
202
206
@ Override
203
207
public void clusterChanged (ClusterChangedEvent event ) {
204
208
ReservedStateMetadata reservedState = event .state ().metadata ().reservedStateMetadata ().get (FileSettingsService .NAMESPACE );
205
209
if (reservedState != null && reservedState .version () == 0L ) {
206
210
clusterService .removeListener (this );
211
+ metadataVersion .set (event .state ().metadata ().version ());
207
212
savedClusterState .countDown ();
208
213
}
209
214
}
210
215
});
211
216
212
- return savedClusterState ;
217
+ return new Tuple <>( savedClusterState , metadataVersion ) ;
213
218
}
214
219
215
- private CountDownLatch cleanedClusterStateListener (String node ) {
220
+ private Tuple < CountDownLatch , AtomicLong > cleanedClusterStateListener (String node ) {
216
221
ClusterService clusterService = internalCluster ().clusterService (node );
217
222
CountDownLatch savedClusterState = new CountDownLatch (1 );
223
+ AtomicLong metadataVersion = new AtomicLong (-1 );
218
224
clusterService .addListener (new ClusterStateListener () {
219
225
@ Override
220
226
public void clusterChanged (ClusterChangedEvent event ) {
@@ -226,13 +232,14 @@ public void clusterChanged(ClusterChangedEvent event) {
226
232
}
227
233
if (handlerMetadata .keys ().isEmpty ()) {
228
234
clusterService .removeListener (this );
235
+ metadataVersion .set (event .state ().metadata ().version ());
229
236
savedClusterState .countDown ();
230
237
}
231
238
}
232
239
}
233
240
});
234
241
235
- return savedClusterState ;
242
+ return new Tuple <>( savedClusterState , metadataVersion ) ;
236
243
}
237
244
238
245
public void testRestoreWithPersistedFileSettings () throws Exception {
@@ -258,7 +265,7 @@ public void testRestoreWithPersistedFileSettings() throws Exception {
258
265
259
266
logger .info ("--> write some file based settings, putting some reserved state" );
260
267
writeJSONFile (masterNode , testFileSettingsJSON );
261
- final ClusterStateResponse savedStateResponse = assertClusterStateSaveOK (savedClusterState );
268
+ final ClusterStateResponse savedStateResponse = assertClusterStateSaveOK (savedClusterState . v1 (), savedClusterState . v2 () );
262
269
assertThat (
263
270
savedStateResponse .getState ().metadata ().persistentSettings ().get (INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING .getKey ()),
264
271
equalTo ("50mb" )
@@ -289,12 +296,14 @@ public void testRestoreWithPersistedFileSettings() throws Exception {
289
296
// cluster state for file based settings, but instead we reset the version to 0 and 'touch' the operator file
290
297
// so that it gets re-processed.
291
298
logger .info ("--> reserved state version will be reset to 0, because of snapshot restore" );
292
- assertTrue (removedReservedState .await (20 , TimeUnit .SECONDS ));
299
+ assertTrue (removedReservedState .v1 (). await (20 , TimeUnit .SECONDS ));
293
300
294
301
logger .info ("--> reserved state would be restored" );
295
- assertTrue (restoredReservedState .await (20 , TimeUnit .SECONDS ));
302
+ assertTrue (restoredReservedState .v1 (). await (20 , TimeUnit .SECONDS ));
296
303
297
- final ClusterStateResponse clusterStateResponse = clusterAdmin ().state (new ClusterStateRequest ().metadata (true )).actionGet ();
304
+ final ClusterStateResponse clusterStateResponse = clusterAdmin ().state (
305
+ new ClusterStateRequest ().metadata (true ).waitForMetadataVersion (restoredReservedState .v2 ().get ())
306
+ ).actionGet ();
298
307
299
308
assertNotNull (clusterStateResponse .getState ().metadata ().reservedStateMetadata ().get (FileSettingsService .NAMESPACE ));
300
309
@@ -313,7 +322,7 @@ public void testRestoreWithPersistedFileSettings() throws Exception {
313
322
314
323
logger .info ("--> clear the file based settings" );
315
324
writeJSONFile (masterNode , emptyFileSettingsJSON );
316
- assertClusterStateSaveOK (cleanupReservedState );
325
+ assertClusterStateSaveOK (cleanupReservedState . v1 (), cleanupReservedState . v2 () );
317
326
} finally {
318
327
// cleanup
319
328
assertAcked (
0 commit comments