Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.

Commit d874ea9

Browse files
committed
Merge pull request #90 from square/jw/scheduler
Take an emission scheduler when creating wrapper types.
2 parents 3c815e3 + 9595ea7 commit d874ea9

File tree

10 files changed

+155
-19
lines changed

10 files changed

+155
-19
lines changed

sample/src/main/java/com/example/sqlbrite/todo/db/DbModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import dagger.Module;
2323
import dagger.Provides;
2424
import javax.inject.Singleton;
25+
import rx.schedulers.Schedulers;
2526
import timber.log.Timber;
2627

2728
@Module(complete = false, library = true)
@@ -39,7 +40,7 @@ public final class DbModule {
3940
}
4041

4142
@Provides @Singleton BriteDatabase provideDatabase(SqlBrite sqlBrite, SQLiteOpenHelper helper) {
42-
BriteDatabase db = sqlBrite.wrapDatabaseHelper(helper);
43+
BriteDatabase db = sqlBrite.wrapDatabaseHelper(helper, Schedulers.io());
4344
db.setLoggingEnabled(true);
4445
return db;
4546
}

sample/src/main/java/com/example/sqlbrite/todo/ui/ItemsFragment.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
187187
return listName + " (" + itemCount + ")";
188188
}
189189
})
190-
.subscribeOn(Schedulers.io())
191190
.observeOn(AndroidSchedulers.mainThread())
192191
.subscribe(new Action1<String>() {
193192
@Override public void call(String title) {
@@ -197,7 +196,6 @@ public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
197196

198197
subscriptions.add(db.createQuery(TodoItem.TABLE, LIST_QUERY, listId)
199198
.mapToList(TodoItem.MAPPER)
200-
.subscribeOn(Schedulers.io())
201199
.observeOn(AndroidSchedulers.mainThread())
202200
.subscribe(adapter));
203201
}

sample/src/main/java/com/example/sqlbrite/todo/ui/ListsFragment.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import javax.inject.Inject;
3737
import rx.Subscription;
3838
import rx.android.schedulers.AndroidSchedulers;
39-
import rx.schedulers.Schedulers;
4039

4140
import static android.support.v4.view.MenuItemCompat.SHOW_AS_ACTION_IF_ROOM;
4241
import static android.support.v4.view.MenuItemCompat.SHOW_AS_ACTION_WITH_TEXT;
@@ -110,7 +109,6 @@ public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
110109

111110
subscription = db.createQuery(ListsItem.TABLES, ListsItem.QUERY)
112111
.mapToList(ListsItem.MAPPER)
113-
.subscribeOn(Schedulers.io())
114112
.observeOn(AndroidSchedulers.mainThread())
115113
.subscribe(adapter);
116114
}

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/BriteContentResolverTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class BriteContentResolverTest
4040

4141
private final List<String> logs = new ArrayList<>();
4242
private final RecordingObserver o = new BlockingRecordingObserver();
43+
private final TestScheduler scheduler = new TestScheduler();
4344

4445
private ContentResolver contentResolver;
4546
private BriteContentResolver db;
@@ -59,7 +60,7 @@ public BriteContentResolverTest() {
5960
logs.add(message);
6061
}
6162
};
62-
db = new BriteContentResolver(contentResolver, logger);
63+
db = new BriteContentResolver(contentResolver, logger, scheduler);
6364

6465
getProvider().init(getContext().getContentResolver());
6566
}
@@ -169,6 +170,20 @@ public void testBackpressureSupported() {
169170
o.assertNoMoreEvents();
170171
}
171172

173+
public void testInitialValueAndTriggerUsesScheduler() {
174+
scheduler.runTasksImmediately(false);
175+
176+
subscription = db.createQuery(TABLE, null, null, null, null, false).subscribe(o);
177+
o.assertNoMoreEvents();
178+
scheduler.triggerActions();
179+
o.assertCursor().isExhausted();
180+
181+
contentResolver.insert(TABLE, values("key1", "val1"));
182+
o.assertNoMoreEvents();
183+
scheduler.triggerActions();
184+
o.assertCursor().hasRow("key1", "val1").isExhausted();
185+
}
186+
172187
private ContentValues values(String key, String value) {
173188
ContentValues result = new ContentValues();
174189
result.put(KEY, key);

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/BriteDatabaseTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
public final class BriteDatabaseTest {
5959
private final List<String> logs = new ArrayList<>();
6060
private final RecordingObserver o = new RecordingObserver();
61+
private final TestScheduler scheduler = new TestScheduler();
6162

6263
private TestDb helper;
6364
private SQLiteDatabase real;
@@ -72,7 +73,7 @@ public final class BriteDatabaseTest {
7273
logs.add(message);
7374
}
7475
};
75-
db = new BriteDatabase(helper, logger);
76+
db = new BriteDatabase(helper, logger, scheduler);
7677
}
7778

7879
@After public void tearDown() {
@@ -162,6 +163,29 @@ public final class BriteDatabaseTest {
162163
.isExhausted();
163164
}
164165

166+
@Test public void queryInitialValueAndTriggerUsesScheduler() {
167+
scheduler.runTasksImmediately(false);
168+
169+
db.createQuery(TABLE_EMPLOYEE, SELECT_EMPLOYEES).subscribe(o);
170+
o.assertNoMoreEvents();
171+
scheduler.triggerActions();
172+
o.assertCursor()
173+
.hasRow("alice", "Alice Allison")
174+
.hasRow("bob", "Bob Bobberson")
175+
.hasRow("eve", "Eve Evenson")
176+
.isExhausted();
177+
178+
db.insert(TABLE_EMPLOYEE, employee("john", "John Johnson"));
179+
o.assertNoMoreEvents();
180+
scheduler.triggerActions();
181+
o.assertCursor()
182+
.hasRow("alice", "Alice Allison")
183+
.hasRow("bob", "Bob Bobberson")
184+
.hasRow("eve", "Eve Evenson")
185+
.hasRow("john", "John Johnson")
186+
.isExhausted();
187+
}
188+
165189
@Test public void queryNotNotifiedWhenInsertFails() {
166190
db.createQuery(TABLE_EMPLOYEE, SELECT_EMPLOYEES).subscribe(o);
167191
o.assertCursor()

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/QueryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.Test;
2525
import rx.functions.Func1;
2626
import rx.observables.BlockingObservable;
27+
import rx.schedulers.Schedulers;
2728

2829
import static com.google.common.truth.Truth.assertThat;
2930
import static com.squareup.sqlbrite.TestDb.SELECT_EMPLOYEES;
@@ -35,7 +36,7 @@ public final class QueryTest {
3536
@Before public void setUp() {
3637
SqlBrite sqlBrite = SqlBrite.create();
3738
TestDb helper = new TestDb(InstrumentationRegistry.getContext());
38-
db = sqlBrite.wrapDatabaseHelper(helper);
39+
db = sqlBrite.wrapDatabaseHelper(helper, Schedulers.immediate());
3940
}
4041

4142
@Test public void mapToOne() {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (C) 2016 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.squareup.sqlbrite;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import rx.Scheduler;
20+
import rx.Subscription;
21+
import rx.functions.Action0;
22+
23+
final class TestScheduler extends Scheduler {
24+
private final rx.schedulers.TestScheduler delegate = new rx.schedulers.TestScheduler();
25+
private boolean runTasksImmediately = true;
26+
27+
public void runTasksImmediately(boolean runTasksImmediately) {
28+
this.runTasksImmediately = runTasksImmediately;
29+
}
30+
31+
public void triggerActions() {
32+
delegate.triggerActions();
33+
}
34+
35+
@Override public Worker createWorker() {
36+
return new TestWorker();
37+
}
38+
39+
class TestWorker extends Worker {
40+
private final Worker delegateWorker = delegate.createWorker();
41+
42+
@Override public Subscription schedule(Action0 action) {
43+
Subscription subscription = delegateWorker.schedule(action);
44+
if (runTasksImmediately) {
45+
triggerActions();
46+
}
47+
return subscription;
48+
}
49+
50+
@Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
51+
Subscription subscription = delegateWorker.schedule(action, delayTime, unit);
52+
if (runTasksImmediately) {
53+
triggerActions();
54+
}
55+
return subscription;
56+
}
57+
58+
@Override public void unsubscribe() {
59+
delegateWorker.unsubscribe();
60+
}
61+
62+
@Override public boolean isUnsubscribed() {
63+
return delegateWorker.isUnsubscribed();
64+
}
65+
}
66+
}

sqlbrite/src/main/java/com/squareup/sqlbrite/BriteContentResolver.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Arrays;
2828
import rx.Observable;
2929
import rx.Observable.OnSubscribe;
30+
import rx.Scheduler;
3031
import rx.Subscriber;
3132
import rx.functions.Action0;
3233
import rx.subscriptions.Subscriptions;
@@ -45,12 +46,14 @@ public final class BriteContentResolver {
4546

4647
final ContentResolver contentResolver;
4748
private final Logger logger;
49+
private final Scheduler scheduler;
4850

4951
volatile boolean logging;
5052

51-
BriteContentResolver(@NonNull ContentResolver contentResolver, @NonNull Logger logger) {
53+
BriteContentResolver(ContentResolver contentResolver, Logger logger, Scheduler scheduler) {
5254
this.contentResolver = contentResolver;
5355
this.logger = logger;
56+
this.scheduler = scheduler;
5457
}
5558

5659
/** Control whether debug logging is enabled. */
@@ -67,6 +70,11 @@ public void setLoggingEnabled(boolean enabled) {
6770
* notifications for when the supplied {@code uri}'s data changes. Unsubscribe when you no longer
6871
* want updates to a query.
6972
* <p>
73+
* Since content resolver triggers are inherently asynchronous, items emitted from the returned
74+
* observable use the {@link Scheduler} supplied to {@link SqlBrite#wrapContentProvider}. For
75+
* consistency, the immediate notification sent on subscribe also uses this scheduler. As such,
76+
* calling {@link Observable#subscribeOn subscribeOn} on the returned observable has no effect.
77+
* <p>
7078
* Note: To skip the immediate notification and only receive subsequent notifications when data
7179
* has changed call {@code skip(1)} on the returned observable.
7280
* <p>
@@ -113,6 +121,7 @@ public QueryObservable createQuery(@NonNull final Uri uri, @Nullable final Strin
113121
};
114122
Observable<Query> queryObservable = Observable.create(subscribe) //
115123
.startWith(query) //
124+
.observeOn(scheduler) //
116125
.onBackpressureLatest();
117126
return new QueryObservable(queryObservable);
118127
}

sqlbrite/src/main/java/com/squareup/sqlbrite/BriteDatabase.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Set;
3434
import java.util.concurrent.TimeUnit;
3535
import rx.Observable;
36+
import rx.Scheduler;
3637
import rx.functions.Action0;
3738
import rx.functions.Func1;
3839
import rx.subjects.PublishSubject;
@@ -52,6 +53,8 @@
5253
* the result of a query. Create using a {@link SqlBrite} instance.
5354
*/
5455
public final class BriteDatabase implements Closeable {
56+
private static final Set<String> INITIAL = Collections.emptySet();
57+
5558
private final SQLiteOpenHelper helper;
5659
private final SqlBrite.Logger logger;
5760

@@ -99,12 +102,15 @@ public final class BriteDatabase implements Closeable {
99102
private volatile SQLiteDatabase writeableDatabase;
100103
private final Object databaseLock = new Object();
101104

105+
private final Scheduler scheduler;
106+
102107
// Package-private to avoid synthetic accessor method for 'transaction' instance.
103108
volatile boolean logging;
104109

105-
BriteDatabase(@NonNull SQLiteOpenHelper helper, @NonNull SqlBrite.Logger logger) {
110+
BriteDatabase(SQLiteOpenHelper helper, SqlBrite.Logger logger, Scheduler scheduler) {
106111
this.helper = helper;
107112
this.logger = logger;
113+
this.scheduler = scheduler;
108114
}
109115

110116
/**
@@ -223,6 +229,11 @@ public Transaction newTransaction() {
223229
* {@code update}, and {@code delete} methods of this class. Unsubscribe when you no longer want
224230
* updates to a query.
225231
* <p>
232+
* Since database triggers are inherently asynchronous, items emitted from the returned
233+
* observable use the {@link Scheduler} supplied to {@link SqlBrite#wrapDatabaseHelper}. For
234+
* consistency, the immediate notification sent on subscribe also uses this scheduler. As such,
235+
* calling {@link Observable#subscribeOn subscribeOn} on the returned observable has no effect.
236+
* <p>
226237
* Note: To skip the immediate notification and only receive subsequent notifications when data
227238
* has changed call {@code skip(1)} on the returned observable.
228239
* <p>
@@ -236,7 +247,7 @@ public QueryObservable createQuery(@NonNull final String table, @NonNull String
236247
@NonNull String... args) {
237248
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
238249
@Override public Boolean call(Set<String> triggers) {
239-
return triggers.contains(table);
250+
return triggers == INITIAL || triggers.contains(table);
240251
}
241252

242253
@Override public String toString() {
@@ -257,6 +268,9 @@ public QueryObservable createQuery(@NonNull final Iterable<String> tables, @NonN
257268
@NonNull String... args) {
258269
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
259270
@Override public Boolean call(Set<String> triggers) {
271+
if (triggers == INITIAL) {
272+
return true;
273+
}
260274
for (String table : tables) {
261275
if (triggers.contains(table)) {
262276
return true;
@@ -304,13 +318,14 @@ private QueryObservable createQuery(final Func1<Set<String>, Boolean> tableFilte
304318
};
305319

306320
Observable<Query> queryObservable = triggers //
321+
.startWith(INITIAL) // Immediately trigger the query for initial value.
322+
.observeOn(scheduler) //
307323
.filter(tableFilter) // Only trigger on tables we care about.
308324
.map(new Func1<Set<String>, Query>() {
309325
@Override public Query call(Set<String> trigger) {
310326
return query;
311327
}
312328
}) //
313-
.startWith(query) // Immediately trigger the query for initial value.
314329
.onBackpressureLatest() //
315330
.doOnSubscribe(new Action0() {
316331
@Override public void call() {

sqlbrite/src/main/java/com/squareup/sqlbrite/SqlBrite.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import rx.Observable;
2626
import rx.Observable.Operator;
27+
import rx.Scheduler;
2728
import rx.Subscriber;
2829
import rx.functions.Func1;
2930

@@ -59,16 +60,24 @@ private SqlBrite(@NonNull Logger logger) {
5960
* interacting with the underlying {@link SQLiteOpenHelper} and it is required for automatic
6061
* notifications of table changes to work. See {@linkplain BriteDatabase#createQuery the
6162
* <code>query</code> method} for more information on that behavior.
63+
*
64+
* @param scheduler The {@link Scheduler} on which items from {@link BriteDatabase#createQuery}
65+
* will be emitted.
6266
*/
63-
@CheckResult @NonNull
64-
public BriteDatabase wrapDatabaseHelper(@NonNull SQLiteOpenHelper helper) {
65-
return new BriteDatabase(helper, logger);
67+
@CheckResult @NonNull public BriteDatabase wrapDatabaseHelper(@NonNull SQLiteOpenHelper helper,
68+
@NonNull Scheduler scheduler) {
69+
return new BriteDatabase(helper, logger, scheduler);
6670
}
6771

68-
/** Wrap a {@link ContentResolver} for observable queries. */
69-
@CheckResult @NonNull
70-
public BriteContentResolver wrapContentProvider(@NonNull ContentResolver contentResolver) {
71-
return new BriteContentResolver(contentResolver, logger);
72+
/**
73+
* Wrap a {@link ContentResolver} for observable queries.
74+
*
75+
* @param scheduler The {@link Scheduler} on which items from
76+
* {@link BriteContentResolver#createQuery} will be emitted.
77+
*/
78+
@CheckResult @NonNull public BriteContentResolver wrapContentProvider(
79+
@NonNull ContentResolver contentResolver, @NonNull Scheduler scheduler) {
80+
return new BriteContentResolver(contentResolver, logger, scheduler);
7281
}
7382

7483
/** An executable query. */

0 commit comments

Comments
 (0)