1111import java .net .URI ;
1212import java .net .URISyntaxException ;
1313import java .util .concurrent .CountDownLatch ;
14+ import java .util .concurrent .ExecutorService ;
15+ import java .util .concurrent .Executors ;
1416import java .util .concurrent .TimeUnit ;
1517
1618import static com .couchbase .lite .ReplicatorConfiguration .ReplicatorType .PULL ;
@@ -26,6 +28,7 @@ public class BaseReplicatorTest extends BaseTest {
2628 Database otherDB ;
2729 Replicator repl ;
2830 long timeout ; // seconds
31+ ExecutorService executor = null ;
2932
3033 protected URLEndpoint getRemoteEndpoint (String dbName , boolean secure ) throws URISyntaxException {
3134 String uri = (secure ? "wss://" : "ws://" ) + config .remoteHost () + ":" + config .remotePort () + "/" + dbName ;
@@ -60,13 +63,13 @@ protected Replicator run(final ReplicatorConfiguration config, final int code, f
6063 throws InterruptedException {
6164 repl = new Replicator (config );
6265 final CountDownLatch latch = new CountDownLatch (1 );
63- repl .addChangeListener (new ReplicatorChangeListener () {
66+ ListenerToken token = repl .addChangeListener (executor , new ReplicatorChangeListener () {
6467 @ Override
6568 public void changed (ReplicatorChange change ) {
6669 Replicator .Status status = change .getStatus ();
6770 CouchbaseLiteException error = status .getError ();
6871 final String kActivityNames [] = {"stopped" , "offline" , "connecting" , "idle" , "busy" };
69- Log .e (TAG , "---Status: %s (%d / %d), lastError = %s" ,
72+ Log .e (TAG , "--- Status: %s (%d / %d), lastError = %s" ,
7073 kActivityNames [status .getActivityLevel ().getValue ()],
7174 status .getProgress ().getCompleted (), status .getProgress ().getTotal (),
7275 error );
@@ -108,16 +111,24 @@ public void changed(ReplicatorChange change) {
108111 }
109112 });
110113 repl .start ();
111- assertTrue (latch .await (timeout , TimeUnit .SECONDS ));
114+ boolean ret = latch .await (timeout , TimeUnit .SECONDS );
115+ repl .removeChangeListener (token );
116+ assertTrue (ret );
112117 return repl ;
113118 }
114119
115120 void stopContinuousReplicator (Replicator repl ) throws InterruptedException {
116121 final CountDownLatch latch = new CountDownLatch (1 );
117- ListenerToken token = repl .addChangeListener (new ReplicatorChangeListener () {
122+ ListenerToken token = repl .addChangeListener (executor , new ReplicatorChangeListener () {
118123 @ Override
119124 public void changed (ReplicatorChange change ) {
120125 Replicator .Status status = change .getStatus ();
126+ CouchbaseLiteException error = status .getError ();
127+ final String kActivityNames [] = {"stopped" , "offline" , "connecting" , "idle" , "busy" };
128+ Log .e (TAG , "--- stopContinuousReplicator() -> Status: %s (%d / %d), lastError = %s" ,
129+ kActivityNames [status .getActivityLevel ().getValue ()],
130+ status .getProgress ().getCompleted (), status .getProgress ().getTotal (),
131+ error );
121132 if (status .getActivityLevel () == Replicator .ActivityLevel .STOPPED ) {
122133 latch .countDown ();
123134 }
@@ -144,6 +155,8 @@ public void setUp() throws Exception {
144155 assertTrue (otherDB .isOpen ());
145156 assertNotNull (otherDB );
146157
158+ executor = Executors .newSingleThreadExecutor ();
159+
147160 try {
148161 Thread .sleep (500 );
149162 } catch (Exception e ) {
@@ -159,11 +172,32 @@ public void tearDown() throws Exception {
159172 }
160173 deleteDatabase (kOtherDatabaseName );
161174
175+ shutdownAndAwaitTermination (executor );
176+ executor = null ;
177+
162178 super .tearDown ();
163179
164180 try {
165181 Thread .sleep (500 );
166182 } catch (Exception e ) {
167183 }
168184 }
185+
186+ void shutdownAndAwaitTermination (ExecutorService pool ) {
187+ pool .shutdown (); // Disable new tasks from being submitted
188+ try {
189+ // Wait a while for existing tasks to terminate
190+ if (!pool .awaitTermination (60 , TimeUnit .SECONDS )) {
191+ pool .shutdownNow (); // Cancel currently executing tasks
192+ // Wait a while for tasks to respond to being cancelled
193+ if (!pool .awaitTermination (60 , TimeUnit .SECONDS ))
194+ System .err .println ("Pool did not terminate" );
195+ }
196+ } catch (InterruptedException ie ) {
197+ // (Re-)Cancel if current thread also interrupted
198+ pool .shutdownNow ();
199+ // Preserve interrupt status
200+ Thread .currentThread ().interrupt ();
201+ }
202+ }
169203}
0 commit comments