Skip to content

Commit 11cb1cc

Browse files
committed
refactor(QueryToken): 令 traces() 返回流在重新订阅时照样推值
...满足 Observable 订阅类似于函数调用,二次调用不应该出现与一次调用不 同的情况,否则行为难以预知。将 trace 的概念从 lastEmit 中区分开来,作 为 QueryToken 级的变量,而不受订阅影响。
1 parent 2ac82ef commit 11cb1cc

File tree

2 files changed

+34
-36
lines changed

2 files changed

+34
-36
lines changed

src/storage/modules/QueryToken.ts

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
import { Observable, OperatorFunction, from } from 'rxjs'
2-
import { combineAll, filter, map, publishReplay, refCount, skipWhile, switchMap, take, tap } from 'rxjs/operators'
2+
import {
3+
combineAll,
4+
filter,
5+
map,
6+
pairwise,
7+
publishReplay,
8+
refCount,
9+
skipWhile,
10+
switchMap,
11+
startWith,
12+
take,
13+
tap,
14+
} from 'rxjs/operators'
315
import { Selector } from './Selector'
416
import { ProxySelector } from './ProxySelector'
517
import { assert } from '../../utils/assert'
@@ -14,7 +26,7 @@ function initialTraceResult<T>(list: ReadonlyArray<T>): TraceResult<T> {
1426
return {
1527
type: OpsType.Success,
1628
ops: list.map((_value, index) => ({ type: OpType.New, index })),
17-
result: list
29+
result: list,
1830
}
1931
}
2032

@@ -28,20 +40,19 @@ export class QueryToken<T> {
2840
selector$: Observable<SelectorMeta<T>>
2941

3042
private consumed = false
43+
private lastEmit: ReadonlyArray<T> | undefined
44+
private trace: ReadonlyArray<T> | undefined
3145

32-
constructor(
33-
selector$: Observable<SelectorMeta<T>>,
34-
private lastEmit?: ReadonlyArray<T>
35-
) {
46+
constructor(selector$: Observable<SelectorMeta<T>>, trace?: ReadonlyArray<T>) {
3647
this.selector$ = selector$.pipe(
3748
publishReplay(1),
3849
refCount(),
3950
)
40-
this.lastEmit = lastEmit
51+
this.trace = trace
4152
}
4253

43-
setLastEmit(data: T[]) {
44-
this.lastEmit = data
54+
setTrace(data: T[]) {
55+
this.trace = data
4556
}
4657

4758
map<K>(fn: OperatorFunction<T[], K[]>) {
@@ -68,11 +79,14 @@ export class QueryToken<T> {
6879

6980
traces(pk?: string): Observable<TraceResult<T>> {
7081
return this.changes().pipe(
71-
map((result: T[]) => {
72-
if (!this.lastEmit) {
82+
startWith<undefined | ReadonlyArray<T>>(this.trace),
83+
pairwise(),
84+
map(([prev, curr]) => {
85+
const result = curr!
86+
if (!prev) {
7387
return initialTraceResult(result)
7488
}
75-
const ops = diff(this.lastEmit, result, pk)
89+
const ops = diff(prev, result, pk)
7690
return { result, ...ops }
7791
}),
7892
filter(({ type }) => type !== OpsType.ShouldSkip),

test/specs/storage/modules/QueryToken.spec.ts

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -112,33 +112,17 @@ export default describe('QueryToken Testcase', () => {
112112
note: newNote,
113113
})
114114
})
115-
})
116115

117-
describe('Method: traces', () => {
118-
it('should get traces when updated', (done) => {
119-
const task = tasks[0]
120-
const newNote = 'new task note'
116+
it('should emit result when the returning observable is re-subscribed', function*() {
117+
const data$ = queryToken.traces('_id')
121118

122-
queryToken
123-
.traces('_id')
124-
.pipe(skip(1))
125-
.subscribe((r) => {
126-
const { result, type, ops } = r
127-
expect(result[0].note).to.equal(newNote)
128-
expect(type).to.equal(1)
129-
ops.forEach((op: Op, index: number) => {
130-
if (index === 0) {
131-
expect(op.type).to.equal(1)
132-
} else {
133-
expect(op.type).to.equal(0)
134-
}
135-
})
136-
done()
137-
})
119+
yield data$.pipe(take(1)).toPromise()
138120

139-
MockSelector.update(task._id as string, {
140-
note: newNote,
141-
})
121+
let emittedOnResubscribe = false
122+
123+
yield data$.pipe(take(1)).subscribe(() => (emittedOnResubscribe = true))
124+
125+
expect(emittedOnResubscribe).to.be.true
142126
})
143127
})
144128

0 commit comments

Comments
 (0)