2323import java .nio .file .Path ;
2424import java .nio .file .Paths ;
2525import java .util .Collections ;
26+ import java .util .concurrent .atomic .AtomicInteger ;
2627import org .apache .ignite .Ignite ;
2728import org .apache .ignite .IgniteCheckedException ;
2829import org .apache .ignite .IgniteDataStreamer ;
3334import org .apache .ignite .configuration .CacheConfiguration ;
3435import org .apache .ignite .configuration .IgniteConfiguration ;
3536import org .apache .ignite .internal .IgniteEx ;
37+ import org .apache .ignite .internal .TestRecordingCommunicationSpi ;
3638import org .apache .ignite .internal .util .typedef .F ;
3739import org .apache .ignite .internal .util .typedef .internal .U ;
38- import org .apache .ignite .testframework .ListeningTestLogger ;
39- import org .apache .ignite .testframework .LogListener ;
4040import org .junit .Before ;
4141import org .junit .Test ;
4242
@@ -53,9 +53,6 @@ public class IgniteSnapshotRestoreFromRemoteMdcTest extends AbstractSnapshotSelf
5353 /** */
5454 private static final String DC_ID_1 = "DC_ID_1" ;
5555
56- /** */
57- private final ListeningTestLogger listeningLog = new ListeningTestLogger (log );
58-
5956 /** @throws Exception If fails. */
6057 @ Before
6158 public void before () throws Exception {
@@ -67,8 +64,7 @@ public void before() throws Exception {
6764 @ Override protected IgniteConfiguration getConfiguration (String igniteInstanceName ) throws Exception {
6865 IgniteConfiguration cfg = super .getConfiguration (igniteInstanceName );
6966
70- if (listeningLog != null )
71- cfg .setGridLogger (listeningLog );
67+ cfg .setCommunicationSpi (new TestRecordingCommunicationSpi ());
7268
7369 return cfg ;
7470 }
@@ -101,20 +97,30 @@ private void testMdcAwareSnapshot(boolean replicatedCache) throws Exception {
10197
10298 awaitPartitionMapExchange ();
10399
104- startGridWithCustomWorkdir ("demander" , DC_ID_0 );
100+ Ignite demander = startGridWithCustomWorkdir ("demander" , DC_ID_0 );
105101
106102 resetBaselineTopology ();
107103
108- LogListener supLsnr = LogListener .matches (" sec, rmtId=" + supplier .cluster ().localNode ().id ()).build ();
109- LogListener otherLsnr = LogListener .matches (" sec, rmtId=" + other .cluster ().localNode ().id ()).build ();
104+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi .spi (demander );
105+
106+ AtomicInteger supReqCnt = new AtomicInteger ();
107+ AtomicInteger otherReqCnt = new AtomicInteger ();
108+
109+ spi .blockMessages ((node , message ) -> {
110+ if (message instanceof SnapshotFilesRequestMessage ) {
111+ if (node .id ().equals (supplier .cluster ().localNode ().id ()))
112+ supReqCnt .incrementAndGet ();
113+ else if (node .id ().equals (other .cluster ().localNode ().id ()))
114+ otherReqCnt .incrementAndGet ();
115+ }
110116
111- listeningLog . registerListener ( supLsnr ) ;
112- listeningLog . registerListener ( otherLsnr );
117+ return false ;
118+ } );
113119
114120 other .snapshot ().restoreSnapshot (SNAPSHOT_NAME , Collections .singleton (CACHE )).get (60_000 );
115121
116- assertTrue (supLsnr . check () );
117- assertEquals (!replicatedCache , otherLsnr . check () );
122+ assertTrue (supReqCnt . get () > 0 );
123+ assertEquals (!replicatedCache , otherReqCnt . get () > 0 );
118124
119125 assertCacheKeys (other .cache (CACHE ), CACHE_KEYS_RANGE );
120126 }
0 commit comments