1919
2020import android .support .annotation .NonNull ;
2121
22+ import java .util .concurrent .Executor ;
23+ import java .util .concurrent .atomic .AtomicReference ;
24+
2225import com .couchbase .lite .internal .support .Log ;
2326
24- import java .util .Locale ;
25- import java .util .concurrent .Executor ;
2627
2728/**
2829 * A Query subclass that automatically refreshes the result rows every time the database changes.
30+ * <p>
31+ * Be careful with the state machine here:
32+ * A query that has been STOPPED can be STARTED again!
33+ * In particular, a query that is stopping when it receives a request to restart
34+ * should suspend the restart request, finish stopping, and then restart.
2935 */
3036final class LiveQuery implements DatabaseChangeListener {
3137 //---------------------------------------------
3238 // static variables
3339 //---------------------------------------------
34- private final static LogDomain DOMAIN = LogDomain .QUERY ;
35- private final static long kDefaultLiveQueryUpdateInterval = 200 ; // 0.2sec (200ms)
40+ private static final LogDomain DOMAIN = LogDomain .QUERY ;
41+ private static final long LIVE_QUERY_UPDATE_INTERVAL_MS = 200 ; // 0.2sec (200ms)
42+
43+ private enum State {STOPPED , STARTED , SCHEDULED }
3644
3745 //---------------------------------------------
3846 // member variables
3947 //---------------------------------------------
4048
41- private ChangeNotifier <QueryChange > changeNotifier ;
49+ private final ChangeNotifier <QueryChange > changeNotifier = new ChangeNotifier <>();
50+
51+ private final AtomicReference <State > state = new AtomicReference <>(State .STOPPED );
52+
53+ private ResultSet previousResults ;
54+
55+
56+ @ NonNull
4257 private final AbstractQuery query ;
43- private ResultSet resultSet ;
44- private boolean observing ;
45- private boolean willUpdate ;
46- private long lastUpdatedAt ;
58+
59+ private final Object lock = new Object ();
60+
4761 private ListenerToken dbListenerToken ;
48- private final Object lock = new Object (); // lock for thread-safety
4962
5063 //---------------------------------------------
5164 // Constructors
5265 //---------------------------------------------
5366
54- LiveQuery (AbstractQuery query ) {
55- if (query == null )
56- throw new IllegalArgumentException ("query should not be null." );
57-
67+ LiveQuery (@ NonNull AbstractQuery query ) {
68+ if (query == null ) { throw new IllegalArgumentException ("query cannot be null." ); }
5869 this .query = query ;
59- this .changeNotifier = new ChangeNotifier <>();
60- this .resultSet = null ;
61- this .observing = false ;
62- this .willUpdate = false ;
63- this .lastUpdatedAt = 0L ;
6470 }
6571
6672 //---------------------------------------------
@@ -69,91 +75,39 @@ final class LiveQuery implements DatabaseChangeListener {
6975
7076 @ NonNull
7177 @ Override
72- public String toString () {
73- return String .format (Locale .ENGLISH , "%s[%s]" , this .getClass ().getSimpleName (), query .toString ());
74- }
78+ public String toString () { return "LiveQuery[" + query .toString () + "]" ; }
7579
7680 //---------------------------------------------
7781 // Implementation of DatabaseChangeListener
7882 //---------------------------------------------
7983
8084 @ Override
81- public void changed (@ NonNull DatabaseChange change ) {
82- synchronized (lock ) {
83- if (willUpdate )
84- return ; // Already a pending update scheduled
85-
86- if (!observing )
87- return ;
88-
89- // Schedule an update, respecting the updateInterval:
90- long updateDelay = lastUpdatedAt + kDefaultLiveQueryUpdateInterval - System .currentTimeMillis ();
91- updateDelay = Math .max (0 , Math .min (this .kDefaultLiveQueryUpdateInterval , updateDelay ));
92- update (updateDelay );
93- }
94- }
85+ public void changed (@ NonNull DatabaseChange change ) { update (); }
9586
9687 //---------------------------------------------
9788 // protected methods
9889 //---------------------------------------------
9990
91+ @ SuppressWarnings ("NoFinalizer" )
10092 @ Override
10193 protected void finalize () throws Throwable {
102- stop (true );
94+ stop ();
10395 super .finalize ();
10496 }
10597
10698 //---------------------------------------------
10799 // package
108100 //---------------------------------------------
109101
110- /**
111- * Starts observing database changes and reports changes in the query result.
112- */
113- void start () {
114- synchronized (lock ) {
115- if (query .getDatabase () == null )
116- throw new IllegalArgumentException ("associated database should not be null." );
117-
118- observing = true ;
119- releaseResultSet ();
120- query .getDatabase ().getActiveLiveQueries ().add (this );
121- // NOTE: start() method could be called during LiveQuery is running.
122- // Ex) Query.setParameters() with LiveQuery.
123- if (dbListenerToken == null )
124- dbListenerToken = query .getDatabase ().addChangeListener (this );
125- update (0 );
126- }
127- }
128-
129- /**
130- * Stops observing database changes.
131- */
132- void stop (boolean removeFromList ) {
133- synchronized (lock ) {
134- observing = false ;
135- willUpdate = false ; // cancels the delayed update started by -databaseChanged
136- if (query != null && query .getDatabase () != null && dbListenerToken != null ) {
137- query .getDatabase ().removeChangeListener (dbListenerToken );
138- dbListenerToken = null ;
139- }
140- if (removeFromList && query != null && query .getDatabase () != null )
141- query .getDatabase ().getActiveLiveQueries ().remove (this );
142- releaseResultSet ();
143- }
144- }
145-
146102 /**
147103 * Adds a change listener.
148104 * <p>
149105 * NOTE: this method is synchronized with Query level.
150106 */
151107 ListenerToken addChangeListener (Executor executor , QueryChangeListener listener ) {
152- synchronized (lock ) {
153- if (!observing )
154- start ();
155- return changeNotifier .addChangeListener (executor , listener );
156- }
108+ final ChangeListenerToken token = changeNotifier .addChangeListener (executor , listener );
109+ start (false );
110+ return token ;
157111 }
158112
159113 /**
@@ -162,77 +116,97 @@ ListenerToken addChangeListener(Executor executor, QueryChangeListener listener)
162116 * NOTE: this method is synchronized with Query level.
163117 */
164118 void removeChangeListener (ListenerToken token ) {
119+ if (changeNotifier .removeChangeListener (token ) <= 0 ) { stop (); }
120+ }
121+
122+ /**
123+ * Starts observing database changes and reports changes in the query result.
124+ */
125+ void start (boolean shouldClearResults ) {
126+ final Database db = query .getDatabase ();
127+ if (db == null ) { throw new IllegalArgumentException ("live query database cannot be null." ); }
128+
165129 synchronized (lock ) {
166- if (changeNotifier .removeChangeListener (token ) == 0 )
167- stop (true );
130+ if (state .compareAndSet (State .STOPPED , State .STARTED )) {
131+ db .getActiveLiveQueries ().add (this );
132+ dbListenerToken = db .addChangeListener (this );
133+ }
134+ else {
135+ // Here if the live query was already running. This can happen in two ways:
136+ // 1) when adding another listener
137+ // 2) when the query parameters have changed.
138+ // In either case we may want to kick off a new query.
139+ // In the latter case the current query results are irrelevant.
140+ if (shouldClearResults ) { releaseResultSetSynchronized (); }
141+ }
168142 }
143+
144+ update ();
169145 }
170146
171147 //---------------------------------------------
172148 // Private (in class only)
173149 //---------------------------------------------
174150
175- /**
176- * NOTE: update(long delay) is only called from synchronzied LiveQuery methods by lock.
177- *
178- * @param delay millisecond
179- */
180- private void update (long delay ) {
181- if (willUpdate )
182- return ; // Already a pending update scheduled
183-
184- if (!observing )
185- return ;
186-
187- willUpdate = true ;
151+ private void stop () {
152+ synchronized (lock ) {
153+ final State oldState = state .getAndSet (State .STOPPED );
154+ if (State .STOPPED == oldState ) { return ; }
188155
189- query .getDatabase ().scheduleOnQueryExecutor (new Runnable () {
190- @ Override
191- public void run () {
192- update ();
156+ final Database db = query .getDatabase ();
157+ if (db != null ) {
158+ db .getActiveLiveQueries ().remove (this );
159+ db .removeChangeListener (dbListenerToken );
160+ dbListenerToken = null ;
193161 }
194- }, delay );
162+
163+ releaseResultSetSynchronized ();
164+ }
195165 }
196166
197- /**
198- * NOTE: update() method is called from only ExecutorService for LiveQuery which is
199- * a single thread. But update changes and refers some instant variables
200- */
201167 private void update () {
202- synchronized (lock ) {
203- if (!observing )
204- return ;
205-
206- try {
207- Log .i (DOMAIN , "%s: Querying..." , this );
208- ResultSet oldResultSet = resultSet ;
209- ResultSet newResultSet ;
210- if (oldResultSet == null )
211- newResultSet = query .execute ();
212- else
213- newResultSet = oldResultSet .refresh ();
214-
215- willUpdate = false ;
216- lastUpdatedAt = System .currentTimeMillis ();
217-
218- if (newResultSet != null ) {
219- if (oldResultSet != null )
220- Log .i (DOMAIN , "%s: Changed!" , this );
221- resultSet = newResultSet ;
222- changeNotifier .postChange (new QueryChange (this .query , resultSet , null ));
223- } else {
224- Log .i (DOMAIN , "%s: ...no change" , this );
168+ if (!state .compareAndSet (State .STARTED , State .SCHEDULED )) { return ; }
169+ query .getDatabase ().scheduleOnQueryExecutor (
170+ new Runnable () {
171+ @ Override
172+ public void run () { refreshResults (); }
173+ },
174+ LIVE_QUERY_UPDATE_INTERVAL_MS );
175+ }
176+
177+ // Runs on the query.database.queryExecutor
178+ // Assumes that call to `previousResults.refresh` is safe, even if previousResults has been freed.
179+ private void refreshResults () {
180+ try {
181+ final ResultSet prevResults ;
182+ synchronized (lock ) {
183+ if (!state .compareAndSet (State .SCHEDULED , State .STARTED )) { return ; }
184+ prevResults = previousResults ;
185+ }
186+
187+ final ResultSet newResults = (prevResults == null ) ? query .execute () : prevResults .refresh ();
188+ Log .i (DOMAIN , "LiveQuery refresh: %s > %s" , prevResults , newResults );
189+ if (newResults == null ) { return ; }
190+
191+ boolean update = false ;
192+ synchronized (lock ) {
193+ if (state .get () != State .STOPPED ) {
194+ previousResults = newResults ;
195+ update = true ;
225196 }
226- } catch (CouchbaseLiteException e ) {
227- changeNotifier .postChange (new QueryChange (this .query , null , e ));
228197 }
198+
199+ // Listeners may be notified even after the LiveQuery has been stopped.
200+ if (update ) { changeNotifier .postChange (new QueryChange (query , newResults , null )); }
201+ }
202+ catch (CouchbaseLiteException err ) {
203+ changeNotifier .postChange (new QueryChange (query , null , err ));
229204 }
230205 }
231206
232- private void releaseResultSet () {
233- if (resultSet != null ) {
234- resultSet .free ();
235- resultSet = null ;
236- }
207+ private void releaseResultSetSynchronized () {
208+ if (previousResults == null ) { return ; }
209+ previousResults .free ();
210+ previousResults = null ;
237211 }
238212}
0 commit comments