Skip to content

Commit 1f03913

Browse files
committed
fix(Selector): 避免在 db.observe 之前调用对应的 db.unobserve
Inspired by #94. 使用 observeQuery 函数将 db.(un)observe 的行为封装在 observable 里,其 生命周期完全由生成的 observable 的订阅/退订决定。从而避免 db.(un)observe 的行为与流的生命周期不协同,导致报错说: `Assertion failed: Attempted to unobserve a query that was not observed.` 补充了测试,确保 changes 订阅是允许立即退订的。 另:使用新的写法避开了相关的两个 lovefield issues(它们有单元测试,所以 效果可以确认)。
1 parent 0efd1b4 commit 1f03913

File tree

2 files changed

+88
-60
lines changed

2 files changed

+88
-60
lines changed

src/storage/modules/Selector.ts

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,31 @@
1-
import { Observable, Observer, OperatorFunction, from, asyncScheduler } from 'rxjs'
1+
import { Observable, OperatorFunction, concat, from, asyncScheduler } from 'rxjs'
22
import {
3-
filter,
43
combineAll,
54
debounceTime,
65
map,
76
mergeMap,
87
publishReplay,
98
reduce,
109
refCount,
11-
scan,
10+
startWith,
1211
switchMap,
1312
} from 'rxjs/operators'
1413
import * as lf from 'lovefield'
1514
import * as Exception from '../../exception'
1615
import { predicatableQuery, graph } from '../helper'
17-
import { identity, forEach, assert, warn, isNonNullable } from '../../utils'
16+
import { identity, forEach, assert, warn } from '../../utils'
1817
import { PredicateProvider } from './PredicateProvider'
1918
import { ShapeMatcher, OrderInfo, StatementType } from '../../interface'
2019
import { mapFn } from './mapFn'
2120

21+
const observeQuery = (db: lf.Database, query: lf.query.Select) => {
22+
return new Observable<void>((observer) => {
23+
const listener = () => observer.next()
24+
db.observe(query, listener)
25+
return () => db.unobserve(query, listener)
26+
})
27+
}
28+
2229
export class Selector<T> {
2330
private static concatFactory<U>(...metaDatas: Selector<U>[]) {
2431
const [meta] = metaDatas
@@ -105,24 +112,23 @@ export class Selector<T> {
105112
let { skip } = this
106113
skip = limit && !skip ? 0 : skip
107114

108-
const observeOn = (query: lf.query.Select) =>
109-
Observable.create((observer: Observer<T[]>) => {
110-
const listener = () => {
111-
this.getValue(query)
112-
.then((r) => observer.next(r as T[]))
113-
.catch((e) => observer.error(e))
114-
}
115-
db.observe(query, listener)
116-
listener()
117-
return () => this.db.unobserve(query, listener)
118-
}) as Observable<T[]>
115+
const observeOn = (query: lf.query.Select) => {
116+
const queryOnce = () => from(this.getValue(query))
117+
// 下面的语句针对两个 lovefield issue 做了特殊调整:
118+
// issue#209: 确保 db.observe 之后立即执行一次查询;
119+
// issue#215: 确保 db.observe “不正确地”立即调用回调的行为,不会给消费方造成初始的重复推送。
120+
return observeQuery(db, query).pipe(
121+
startWith(void 0),
122+
switchMap(queryOnce),
123+
)
124+
}
119125

120126
const changesOnQuery =
121127
limit || skip
122128
? this.buildPrefetchingObserve().pipe(switchMap((pks) => observeOn(this.getQuery(this.inPKs(pks)))))
123129
: observeOn(this.getQuery())
124130

125-
return lfIssueFix(changesOnQuery).pipe(
131+
return changesOnQuery.pipe(
126132
publishReplay(1),
127133
refCount(),
128134
)
@@ -278,37 +284,9 @@ export class Selector<T> {
278284
}
279285

280286
private buildPrefetchingObserve(): Observable<(string | number)[]> {
281-
return Observable.create((observer: Observer<(string | number)[]>) => {
282-
const { rangeQuery } = this
283-
const listener = () => {
284-
return rangeQuery
285-
.exec()
286-
.then((r) => {
287-
observer.next(r.map((v) => v[this.shape.pk.name]))
288-
})
289-
.catch((e) => observer.error(e))
290-
}
291-
292-
listener().then(() => {
293-
this.db.observe(rangeQuery, listener)
294-
})
295-
296-
return () => this.db.unobserve(rangeQuery, listener)
297-
})
287+
const { rangeQuery } = this
288+
const queryOnce = () => from(rangeQuery.exec())
289+
const update$ = observeQuery(this.db, rangeQuery).pipe(switchMap(queryOnce))
290+
return concat(queryOnce(), update$).pipe(map((r) => r.map((v) => v[this.shape.pk.name])))
298291
}
299292
}
300-
301-
/**
302-
* Lovefield observe 出来的推送,第一次和第二次在它们的值不为空
303-
* 的时候是重复的,这里做优化,省去重复;但不是简单的 skip(1),因为
304-
* 那样会导致不能推出空结果集。详见:Lovefield issue#215
305-
*/
306-
const lfIssueFix = <T>(changes: Observable<T[]>) => {
307-
const doKeep = (prev: T[] | null, curr: T[] | null, idx: number) =>
308-
idx === 1 && prev && prev.length && curr && curr.length ? null : curr
309-
310-
return changes.pipe(
311-
scan(doKeep, null),
312-
filter(isNonNullable),
313-
)
314-
}

test/specs/storage/modules/Selector.spec.ts

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Subscription, of, defer, range, from, asyncScheduler, asapScheduler } from 'rxjs'
1+
import { Subscription, of, defer, range, from, asyncScheduler, asapScheduler, timer } from 'rxjs'
22
import * as lf from 'lovefield'
33
import { expect, use } from 'chai'
44
import * as sinon from 'sinon'
@@ -199,7 +199,35 @@ export default describe('Selector test', () => {
199199
})
200200

201201
describe('Selector.prototype.changes', () => {
202-
it('observe should ok', (done) => {
202+
it('should allow immediate unsubscribe', () => {
203+
const selector = new Selector(
204+
db,
205+
db.select().from(table),
206+
tableShape,
207+
new PredicateProvider(table, { time: { $gte: 50 } }),
208+
)
209+
210+
const signal = selector.changes().subscribe()
211+
212+
expect(() => signal.unsubscribe()).to.not.throw()
213+
})
214+
215+
it('should allow immediate unsubscribe, for range query (with limit, skip)', () => {
216+
const selector = new Selector(
217+
db,
218+
db.select().from(table),
219+
tableShape,
220+
new PredicateProvider(table, { time: { $gte: 50 } }),
221+
10,
222+
0,
223+
)
224+
225+
const signal = selector.changes().subscribe()
226+
227+
expect(() => signal.unsubscribe()).to.not.throw()
228+
})
229+
230+
it('observe should ok', function*() {
203231
const selector = new Selector(
204232
db,
205233
db.select().from(table),
@@ -209,18 +237,30 @@ export default describe('Selector test', () => {
209237

210238
const newName = 'test name change'
211239

212-
subscription = selector
213-
.changes()
214-
.pipe(skip(1))
215-
.subscribe((r: any[]) => {
216-
expect(r[0].name).to.equal(newName)
217-
done()
218-
})
240+
const signal = selector.changes()
241+
subscription = signal.subscribe()
242+
243+
yield signal.pipe(
244+
take(1),
245+
subscribeOn(asapScheduler),
246+
tap((r: any[]) => {
247+
expect(r[0].name).to.equal('name:50') // before update
248+
}),
249+
)
219250

220-
db.update(table)
251+
yield db
252+
.update(table)
221253
.set(table['name'], newName)
222254
.where(table['_id'].eq('_id:50'))
223255
.exec()
256+
257+
yield signal.pipe(
258+
take(1),
259+
subscribeOn(asapScheduler),
260+
tap((r: any[]) => {
261+
expect(r[0].name).to.equal(newName) // after update
262+
}),
263+
)
224264
})
225265

226266
it('observe asynchronous insertions completely', function*() {
@@ -307,14 +347,22 @@ export default describe('Selector test', () => {
307347

308348
const newName = 'test name change'
309349

310-
const _subscription = selector.changes().subscribe(spy)
350+
const _subscription = selector
351+
.changes()
352+
.pipe(subscribeOn(asapScheduler))
353+
.subscribe(spy)
311354

312355
yield db
313356
.update(table)
314357
.set(table['name'], newName)
315358
.where(table['_id'].eq('_id:50'))
316359
.exec()
317360

361+
yield timer(0) // 从 update 执行完成,到 changes 推出,是异步的,需要等待
362+
363+
const countFollowFirstUpdate = spy.callCount
364+
expect(countFollowFirstUpdate).to.equal(1)
365+
318366
_subscription.unsubscribe()
319367

320368
yield db
@@ -323,7 +371,9 @@ export default describe('Selector test', () => {
323371
.where(table['_id'].eq('_id:50'))
324372
.exec()
325373

326-
expect(spy.callCount).to.equal(1)
374+
yield timer(0) // 从 update 执行完成,到 changes 推出,是异步的,需要等待
375+
376+
expect(spy.callCount).to.equal(countFollowFirstUpdate)
327377
})
328378

329379
it('predicate should be clone before use', function*() {

0 commit comments

Comments
 (0)