3434import java .util .HashMap ;
3535import java .util .List ;
3636import java .util .Map ;
37+ import java .util .Objects ;
3738import java .util .Random ;
3839import java .util .Set ;
3940import java .util .TreeMap ;
4041import java .util .TreeSet ;
4142import java .util .UUID ;
43+ import java .util .concurrent .Callable ;
4244import java .util .concurrent .CountDownLatch ;
45+ import java .util .concurrent .ForkJoinPool ;
46+ import java .util .concurrent .ForkJoinTask ;
4347import java .util .concurrent .ThreadLocalRandom ;
4448import java .util .concurrent .TimeUnit ;
4549import java .util .concurrent .atomic .AtomicBoolean ;
4650import java .util .concurrent .atomic .AtomicInteger ;
4751import java .util .concurrent .atomic .AtomicLong ;
4852import java .util .concurrent .atomic .LongAdder ;
53+ import java .util .function .BiConsumer ;
4954import java .util .function .BooleanSupplier ;
5055import java .util .function .Function ;
5156import java .util .function .UnaryOperator ;
97102import org .apache .ignite .internal .processors .cache .distributed .near .GridNearTxLocal ;
98103import org .apache .ignite .internal .processors .cache .persistence .CheckpointState ;
99104import org .apache .ignite .internal .processors .cache .persistence .GridCacheDatabaseSharedManager ;
105+ import org .apache .ignite .internal .processors .cache .persistence .checkpoint .CheckpointListener ;
100106import org .apache .ignite .internal .processors .cache .persistence .db .IgniteCacheGroupsWithRestartsTest ;
101107import org .apache .ignite .internal .processors .cache .persistence .diagnostic .pagelocktracker .dumpprocessors .ToFileDumpProcessor ;
102108import org .apache .ignite .internal .processors .cache .persistence .file .FileIO ;
134140import org .apache .ignite .spi .communication .tcp .TcpCommunicationSpi ;
135141import org .apache .ignite .spi .metric .LongMetric ;
136142import org .apache .ignite .spi .metric .Metric ;
143+ import org .apache .ignite .spi .systemview .view .ComputeJobView ;
144+ import org .apache .ignite .spi .systemview .view .ComputeTaskView ;
137145import org .apache .ignite .testframework .GridTestUtils ;
146+ import org .apache .ignite .testframework .ListeningTestLogger ;
138147import org .apache .ignite .testframework .LogListener ;
139148import org .apache .ignite .testframework .junits .WithSystemProperty ;
140149import org .apache .ignite .transactions .Transaction ;
162171import static org .apache .ignite .internal .commandline .CommandHandler .EXIT_CODE_OK ;
163172import static org .apache .ignite .internal .commandline .CommandHandler .EXIT_CODE_UNEXPECTED_ERROR ;
164173import static org .apache .ignite .internal .encryption .AbstractEncryptionTest .MASTER_KEY_NAME_2 ;
174+ import static org .apache .ignite .internal .management .cache .CacheIdleVerifyCancelTask .TASKS_TO_CANCEL ;
175+ import static org .apache .ignite .internal .management .cache .VerifyBackupPartitionsTaskV2 .CACL_PART_HASH_ERR_MSG ;
176+ import static org .apache .ignite .internal .management .cache .VerifyBackupPartitionsTaskV2 .CP_REASON ;
165177import static org .apache .ignite .internal .processors .cache .persistence .GridCacheDatabaseSharedManager .IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP ;
166178import static org .apache .ignite .internal .processors .cache .persistence .snapshot .AbstractSnapshotSelfTest .doSnapshotCancellationTest ;
167179import static org .apache .ignite .internal .processors .cache .persistence .snapshot .AbstractSnapshotSelfTest .snp ;
170182import static org .apache .ignite .internal .processors .cache .persistence .snapshot .IgniteSnapshotManager .SNAPSHOT_TRANSFER_RATE_DMS_KEY ;
171183import static org .apache .ignite .internal .processors .cache .persistence .snapshot .IgniteSnapshotManager .resolveSnapshotWorkDirectory ;
172184import static org .apache .ignite .internal .processors .cache .persistence .snapshot .SnapshotRestoreProcess .SNAPSHOT_RESTORE_METRICS ;
185+ import static org .apache .ignite .internal .processors .cache .verify .IdleVerifyUtility .CRC_CHECK_ERR_MSG ;
173186import static org .apache .ignite .internal .processors .cache .verify .IdleVerifyUtility .GRID_NOT_IDLE_MSG ;
174187import static org .apache .ignite .internal .processors .diagnostic .DiagnosticProcessor .DEFAULT_TARGET_FOLDER ;
188+ import static org .apache .ignite .internal .processors .job .GridJobProcessor .JOBS_VIEW ;
189+ import static org .apache .ignite .internal .processors .task .GridTaskProcessor .TASKS_VIEW ;
175190import static org .apache .ignite .testframework .GridTestUtils .assertContains ;
176191import static org .apache .ignite .testframework .GridTestUtils .assertNotContains ;
177192import static org .apache .ignite .testframework .GridTestUtils .assertThrows ;
@@ -218,6 +233,9 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
218233 /** */
219234 protected static File customDiagnosticDir ;
220235
236+ /** */
237+ protected ListeningTestLogger listeningLog ;
238+
221239 /** {@inheritDoc} */
222240 @ Override protected void beforeTest () throws Exception {
223241 super .beforeTest ();
@@ -227,13 +245,30 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
227245 cleanDiagnosticDir ();
228246 }
229247
248+ /** {@inheritDoc} */
249+ @ Override protected void afterTest () throws Exception {
250+ super .afterTest ();
251+
252+ listeningLog = null ;
253+ }
254+
230255 /** {@inheritDoc} */
231256 @ Override protected void cleanPersistenceDir () throws Exception {
232257 super .cleanPersistenceDir ();
233258
234259 cleanDiagnosticDir ();
235260 }
236261
262+ /** {@inheritDoc} */
263+ @ Override protected IgniteConfiguration getConfiguration (String igniteInstanceName ) throws Exception {
264+ IgniteConfiguration cfg = super .getConfiguration (igniteInstanceName );
265+
266+ if (listeningLog != null )
267+ cfg .setGridLogger (listeningLog );
268+
269+ return cfg ;
270+ }
271+
237272 /**
238273 * @throws IgniteCheckedException If failed.
239274 */
@@ -747,6 +782,199 @@ public void testIdleVerifyOnInactiveClusterWithPersistence() throws Exception {
747782 assertContains (log , testOut .toString (), "The check procedure has finished, no conflicts have been found." );
748783 }
749784
785+ /** */
786+ @ Test
787+ public void testIdleVerifyCancelOnCheckpoint () throws Exception {
788+ doTestCancelIdleVerify ((beforeCancelLatch , afterCancelLatch ) -> {
789+ G .allGrids ().forEach (grid -> {
790+ if (grid .configuration ().isClientMode ())
791+ return ;
792+
793+ GridCacheDatabaseSharedManager dbMgr =
794+ (GridCacheDatabaseSharedManager )((IgniteEx )grid ).context ().cache ().context ().database ();
795+
796+ dbMgr .addCheckpointListener (new CheckpointListener () {
797+ @ Override public void beforeCheckpointBegin (Context ctx ) {
798+ if (Objects .equals (ctx .progress ().reason (), CP_REASON ))
799+ beforeCancelLatch .countDown ();
800+ }
801+
802+ @ Override public void afterCheckpointEnd (Context ctx ) throws IgniteCheckedException {
803+ if (Objects .equals (ctx .progress ().reason (), CP_REASON )) {
804+ try {
805+ assertTrue (afterCancelLatch .await (getTestTimeout (), TimeUnit .MILLISECONDS ));
806+ }
807+ catch (InterruptedException e ) {
808+ throw new IgniteInterruptedCheckedException (e );
809+ }
810+ }
811+ }
812+
813+ @ Override public void onMarkCheckpointBegin (Context ctx ) {
814+ // No-op.
815+ }
816+
817+ @ Override public void onCheckpointBegin (Context ctx ) {
818+ // No-op.
819+ }
820+ });
821+ });
822+ }, false );
823+ }
824+
825+ /** */
826+ @ Test
827+ public void testIdleVerifyCancelBeforeCalcPartitionHashStarted () throws Exception {
828+ doTestCancelIdleVerify ((beforeCancelLatch , afterCancelLatch ) -> {
829+ ForkJoinPool pool = new ForkJoinPool () {
830+ @ Override public <T > ForkJoinTask <T > submit (Callable <T > task ) {
831+ beforeCancelLatch .countDown ();
832+
833+ ForkJoinTask <T > submitted = super .submit (task );
834+
835+ try {
836+ assertTrue (afterCancelLatch .await (getTestTimeout (), TimeUnit .MILLISECONDS ));
837+ }
838+ catch (InterruptedException e ) {
839+ throw new RuntimeException (e );
840+ }
841+
842+ return submitted ;
843+ }
844+ };
845+
846+ VerifyBackupPartitionsTaskV2 .poolSupplier = () -> pool ;
847+ }, false );
848+ }
849+
850+ /** */
851+ @ Test
852+ public void testIdleVerifyCancelWhileCalcPartitionHashRunning () throws Exception {
853+ for (boolean checkCrc : new boolean [] {false , true }) {
854+ // Can't place assert inside pool, because exceptions from task ignored.
855+ AtomicBoolean interruptedOnCancel = new AtomicBoolean (true );
856+ AtomicBoolean eCatched = new AtomicBoolean (false );
857+
858+ doTestCancelIdleVerify ((beforeCancelLatch , afterCancelLatch ) -> {
859+ ForkJoinPool pool = new ForkJoinPool () {
860+ @ Override public <T > ForkJoinTask <T > submit (Callable <T > task ) {
861+ return super .submit (new Callable <T >() {
862+ @ Override public T call () throws Exception {
863+ beforeCancelLatch .countDown ();
864+
865+ try {
866+ assertTrue (afterCancelLatch .await (getTestTimeout (), TimeUnit .MILLISECONDS ));
867+ }
868+ catch (InterruptedException ignored ) {
869+ interruptedOnCancel .set (false );
870+ }
871+
872+ try {
873+ // Call must fail.
874+ T res = task .call ();
875+
876+ interruptedOnCancel .set (false );
877+
878+ return res ;
879+ }
880+ catch (IgniteException e ) {
881+ if (!e .getMessage ().startsWith (checkCrc ? CRC_CHECK_ERR_MSG : CACL_PART_HASH_ERR_MSG ))
882+ interruptedOnCancel .set (false );
883+
884+ eCatched .set (true );
885+
886+ throw e ;
887+ }
888+ catch (Throwable e ) {
889+ interruptedOnCancel .set (false );
890+
891+ throw e ;
892+ }
893+ }
894+ });
895+ }
896+ };
897+
898+ VerifyBackupPartitionsTaskV2 .poolSupplier = () -> pool ;
899+ }, checkCrc );
900+
901+ assertTrue ("All tasks must be cancelled" , interruptedOnCancel .get ());
902+ assertTrue ("Task must fail with expected exception" , eCatched .get ());
903+
904+ eCatched .set (false );
905+ }
906+ }
907+
908+ /**
909+ * Wrapper for tests for idle verify cancel command.
910+ *
911+ * @param prepare Prepares the test using beforeCancelLatch and afterCancelLatch.
912+ * @param checkCrc If {@code true} then run idle verify with --check-crc argument.
913+ */
914+ private void doTestCancelIdleVerify (BiConsumer <CountDownLatch , CountDownLatch > prepare , boolean checkCrc ) throws Exception {
915+ final int gridsCnt = 4 ;
916+
917+ if (G .allGrids ().isEmpty ()) {
918+ listeningLog = new ListeningTestLogger (log );
919+
920+ IgniteEx srv = startGrids (gridsCnt );
921+
922+ srv .cluster ().state (ACTIVE );
923+
924+ IgniteCache <Integer , Integer > cache = srv .getOrCreateCache (new CacheConfiguration <Integer , Integer >(DEFAULT_CACHE_NAME )
925+ .setBackups (3 )
926+ .setAffinity (new RendezvousAffinityFunction ().setPartitions (3 )));
927+
928+ for (int part = 0 ; part < 3 ; part ++) {
929+ for (Integer key : partitionKeys (cache , part , 3 , 0 )) {
930+ cache .put (key , key );
931+ }
932+ }
933+ }
934+
935+ CountDownLatch beforeCancelLatch = new CountDownLatch (1 );
936+ CountDownLatch afterCancelLatch = new CountDownLatch (1 );
937+
938+ prepare .accept (beforeCancelLatch , afterCancelLatch );
939+
940+ LogListener lsnr = LogListener .matches ("Idle verify was cancelled." ).build ();
941+
942+ listeningLog .registerListener (lsnr );
943+
944+ IgniteInternalFuture <Integer > idleVerifyFut = GridTestUtils .runAsync (() -> {
945+ if (checkCrc )
946+ execute ("--cache" , "idle_verify" , "--check-crc" );
947+ else
948+ execute ("--cache" , "idle_verify" );
949+ });
950+
951+ assertTrue (beforeCancelLatch .await (getTestTimeout (), TimeUnit .MILLISECONDS ));
952+
953+ assertEquals (EXIT_CODE_OK , execute ("--cache" , "idle_verify" , "--cancel" ));
954+
955+ afterCancelLatch .countDown ();
956+
957+ assertTrue (waitForCondition (() -> {
958+ for (int i = 0 ; i < gridsCnt ; i ++) {
959+ for (ComputeTaskView taskView : grid (i ).context ().systemView ().<ComputeTaskView >view (TASKS_VIEW )) {
960+ if (TASKS_TO_CANCEL .contains (taskView .taskName ()))
961+ return false ;
962+ }
963+
964+ for (ComputeJobView jobView : grid (i ).context ().systemView ().<ComputeJobView >view (JOBS_VIEW )) {
965+ if (TASKS_TO_CANCEL .contains (jobView .taskName ()))
966+ return false ;
967+ }
968+ }
969+
970+ return true ;
971+ }, getTestTimeout ()));
972+
973+ idleVerifyFut .get (getTestTimeout (), TimeUnit .MILLISECONDS );
974+
975+ assertTrue (lsnr .check ());
976+ }
977+
750978 /**
751979 * Test deactivation works via control.sh
752980 *
@@ -2252,7 +2480,7 @@ public void testCacheIdleVerifyDumpCrcWithCorruptedPartition() throws Exception
22522480 String outputStr = testOut .toString ();
22532481
22542482 assertContains (log , outputStr , "The check procedure failed on 1 node." );
2255- assertContains (log , outputStr , "CRC check of partition failed" );
2483+ assertContains (log , outputStr , CRC_CHECK_ERR_MSG );
22562484 }
22572485
22582486 /** */
0 commit comments