1919import org .elasticsearch .common .unit .ByteSizeUnit ;
2020import org .elasticsearch .common .unit .ByteSizeValue ;
2121import org .elasticsearch .index .query .QueryBuilders ;
22- import org .elasticsearch .index .shard .ShardId ;
23- import org .elasticsearch .indices .IndicesService ;
2422import org .elasticsearch .indices .recovery .PeerRecoveryTargetService ;
2523import org .elasticsearch .indices .recovery .RecoveryFileChunkRequest ;
26- import org .elasticsearch .indices .recovery .RecoveryFilesInfoRequest ;
2724import org .elasticsearch .indices .recovery .RecoverySettings ;
2825import org .elasticsearch .plugins .Plugin ;
2926import org .elasticsearch .test .ESIntegTestCase ;
3027import org .elasticsearch .test .transport .MockTransportService ;
3128import org .elasticsearch .transport .TransportService ;
3229
33- import java .nio .file .Files ;
34- import java .nio .file .Path ;
3530import java .util .ArrayList ;
3631import java .util .Arrays ;
3732import java .util .Collection ;
3833import java .util .Collections ;
3934import java .util .List ;
4035import java .util .concurrent .CountDownLatch ;
4136import java .util .concurrent .atomic .AtomicBoolean ;
42- import java .util .function .Function ;
4337
4438import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
4539import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertHitCount ;
@@ -78,14 +72,16 @@ public void testCancelRecoveryAndResume() throws Exception {
7872 // we use 2 nodes a lucky and unlucky one
7973 // the lucky one holds the primary
8074 // the unlucky one gets the replica and the truncated leftovers
81- String primariesNode = dataNodeStats .get (0 ). getNode (). getName ( );
82- String unluckyNode = dataNodeStats .get (1 ). getNode (). getName ( );
75+ NodeStats primariesNode = dataNodeStats .get (0 );
76+ NodeStats unluckyNode = dataNodeStats .get (1 );
8377
8478 // create the index and prevent allocation on any other nodes than the lucky one
8579 // we have no replicas so far and make sure that we allocate the primary on the lucky node
8680 assertAcked (
8781 prepareCreate ("test" ).setMapping ("field1" , "type=text" , "the_id" , "type=text" )
88- .setSettings (indexSettings (numberOfShards (), 0 ).put ("index.routing.allocation.include._name" , primariesNode ))
82+ .setSettings (
83+ indexSettings (numberOfShards (), 0 ).put ("index.routing.allocation.include._name" , primariesNode .getNode ().getName ())
84+ )
8985 ); // only allocate on the lucky node
9086
9187 // index some docs and check if they are coming back
@@ -106,54 +102,20 @@ public void testCancelRecoveryAndResume() throws Exception {
106102 indicesAdmin ().prepareFlush ().setForce (true ).get (); // double flush to create safe commit in case of async durability
107103 indicesAdmin ().prepareForceMerge ().setMaxNumSegments (1 ).setFlush (true ).get ();
108104
109- // We write some garbage into the shard directory so that we can verify that it is cleaned up before we resend.
110- // Cleanup helps prevent recovery from failing due to lack of space from garbage left over from a previous
111- // recovery that crashed during file transmission. #104473
112- // We can't look for the presence of the recovery temp files themselves because they are automatically
113- // cleaned up on clean shutdown by MultiFileWriter.
114- final String GARBAGE_PREFIX = "recovery.garbage." ;
115-
116105 final CountDownLatch latch = new CountDownLatch (1 );
117106 final AtomicBoolean truncate = new AtomicBoolean (true );
118-
119- IndicesService unluckyIndices = internalCluster ().getInstance (IndicesService .class , unluckyNode );
120- Function <ShardId , Path > getUnluckyIndexPath = (shardId ) -> unluckyIndices .indexService (shardId .getIndex ())
121- .getShard (shardId .getId ())
122- .shardPath ()
123- .resolveIndex ();
124-
125107 for (NodeStats dataNode : dataNodeStats ) {
126108 MockTransportService .getInstance (dataNode .getNode ().getName ())
127109 .addSendBehavior (
128- internalCluster ().getInstance (TransportService .class , unluckyNode ),
110+ internalCluster ().getInstance (TransportService .class , unluckyNode . getNode (). getName () ),
129111 (connection , requestId , action , request , options ) -> {
130112 if (action .equals (PeerRecoveryTargetService .Actions .FILE_CHUNK )) {
131113 RecoveryFileChunkRequest req = (RecoveryFileChunkRequest ) request ;
132114 logger .info ("file chunk [{}] lastChunk: {}" , req , req .lastChunk ());
133- // During the first recovery attempt (when truncate is set), write an extra garbage file once for each
134- // file transmitted. We get multiple chunks per file but only one is the last.
135- if (truncate .get () && req .lastChunk ()) {
136- final var shardPath = getUnluckyIndexPath .apply (req .shardId ());
137- final var garbagePath = Files .createTempFile (shardPath , GARBAGE_PREFIX , null );
138- logger .info ("writing garbage at: {}" , garbagePath );
139- }
140115 if ((req .name ().endsWith ("cfs" ) || req .name ().endsWith ("fdt" )) && req .lastChunk () && truncate .get ()) {
141116 latch .countDown ();
142117 throw new RuntimeException ("Caused some truncated files for fun and profit" );
143118 }
144- } else if (action .equals (PeerRecoveryTargetService .Actions .FILES_INFO )) {
145- // verify there are no garbage files present at the FILES_INFO stage of recovery. This precedes FILES_CHUNKS
146- // and so will run before garbage has been introduced on the first attempt, and before post-transfer cleanup
147- // has been performed on the second.
148- final var shardPath = getUnluckyIndexPath .apply (((RecoveryFilesInfoRequest ) request ).shardId ());
149- try (var list = Files .list (shardPath ).filter (path -> path .getFileName ().startsWith (GARBAGE_PREFIX ))) {
150- final var garbageFiles = list .toArray ();
151- assertArrayEquals (
152- "garbage files should have been cleaned before file transmission" ,
153- new Path [0 ],
154- garbageFiles
155- );
156- }
157119 }
158120 connection .sendRequest (requestId , action , request , options );
159121 }
@@ -166,14 +128,14 @@ public void testCancelRecoveryAndResume() throws Exception {
166128 .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 1 )
167129 .put (
168130 "index.routing.allocation.include._name" , // now allow allocation on all nodes
169- primariesNode + "," + unluckyNode
131+ primariesNode . getNode (). getName () + "," + unluckyNode . getNode (). getName ()
170132 ),
171133 "test"
172134 );
173135
174136 latch .await ();
175137
176- // at this point we got some truncated leftovers on the replica on the unlucky node
138+ // at this point we got some truncated left overs on the replica on the unlucky node
177139 // now we are allowing the recovery to allocate again and finish to see if we wipe the truncated files
178140 truncate .compareAndSet (true , false );
179141 ensureGreen ("test" );
0 commit comments