Skip to content

Commit bea1796

Browse files
committed
refactor: improve type checking for Stream and Observable inputs across operators
1 parent 92c5349 commit bea1796

File tree

11 files changed

+68
-55
lines changed

11 files changed

+68
-55
lines changed

src/observable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { produce, createDraft, finishDraft } from 'limu'
2-
import { Stream } from './stream'
2+
import type { Stream } from './stream'
33
import { safeCallback, isObjectLike, isAsyncFunction, isPromiseLike } from './utils'
44
import {
55
OnFulfilled,

src/operators/combine.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Observable } from '../observable'
22
import { Stream } from '../stream'
33
import { useUnsubscribeCallback } from '../utils'
44
import { StreamTupleValues, PromiseStatus } from '../types'
5-
import { getGlobalFluthFactory } from '../utils'
5+
import { getGlobalFluthFactory, checkStreamOrObservableInput } from '../utils'
66

77
/**
88
* combine takes multiple streams or Observable, and return a stream that emits values from all the input streams.
@@ -12,23 +12,23 @@ import { getGlobalFluthFactory } from '../utils'
1212
* @returns {Stream}
1313
*/
1414
export const combine = <T extends (Stream | Observable)[]>(...args$: T) => {
15+
// check input type
16+
if (!checkStreamOrObservableInput(args$, true)) {
17+
throw new Error('combine operator only accepts Stream or Observable as input')
18+
}
19+
1520
const stream$ = (getGlobalFluthFactory()?.(
16-
args$.map((arg$) => arg$?._getProtectedProperty?.('_v')) as StreamTupleValues<T>,
21+
args$.map((arg$) => arg$._getProtectedProperty('_v')) as StreamTupleValues<T>,
1722
) ||
1823
new Stream<StreamTupleValues<T>>(
19-
args$.map((arg$) => arg$?._getProtectedProperty?.('_v')) as StreamTupleValues<T>,
24+
args$.map((arg$) => arg$._getProtectedProperty('_v')) as StreamTupleValues<T>,
2025
)) as Stream<StreamTupleValues<T>>
2126
const payload: StreamTupleValues<T> = [] as any
2227
const promiseStatus = [...Array(args$.length)].map(() => PromiseStatus.PENDING)
2328
let finishCount = 0
2429
const { unsubscribeCallback } = useUnsubscribeCallback(stream$, args$.length)
2530
const completeCallback = () => (finishCount += 1)
2631

27-
// check input type
28-
if (args$.some((arg$) => !(arg$ instanceof Stream) && !(arg$ instanceof Observable))) {
29-
throw new Error('combine operator only accepts Stream or Observable as input')
30-
}
31-
3232
// if no input, return an empty stream
3333
if (args$.length === 0) {
3434
return stream$

src/operators/concat.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Observable } from '../observable'
22
import { Stream } from '../stream'
33
import { StreamTupleValues, PromiseStatus } from '../types'
4-
import { getGlobalFluthFactory } from '../utils'
4+
import { getGlobalFluthFactory, checkStreamOrObservableInput } from '../utils'
55

66
/**
77
* concat takes multiple streams or Observable, and return a stream that emits values in the order of the input streams.
@@ -12,18 +12,18 @@ import { getGlobalFluthFactory } from '../utils'
1212
* @returns {Stream}
1313
*/
1414
export const concat = <T extends (Stream | Observable)[]>(...args$: T) => {
15-
const stream$ = (getGlobalFluthFactory()?.(args$?.[0]?._getProtectedProperty?.('_v')) ||
15+
// check input type
16+
if (!checkStreamOrObservableInput(args$, true)) {
17+
throw new Error('concat operator only accepts Stream or Observable as input')
18+
}
19+
20+
const stream$ = (getGlobalFluthFactory()?.(args$[0]?._getProtectedProperty('_v')) ||
1621
new Stream<StreamTupleValues<T>[number]>(
17-
args$?.[0]?._getProtectedProperty?.('_v') as StreamTupleValues<T>[number],
22+
args$[0]?._getProtectedProperty('_v') as StreamTupleValues<T>[number],
1823
)) as Stream<StreamTupleValues<T>[number]>
1924
const finishFlag = [...Array(args$.length)].map(() => false)
2025
const unsubscribeFlag = [...Array(args$.length)].map(() => false)
2126

22-
// check input type
23-
if (args$.some((arg$) => !(arg$ instanceof Stream) && !(arg$ instanceof Observable))) {
24-
throw new Error('concat operator only accepts Stream or Observable as input')
25-
}
26-
2727
// check input empty
2828
if (args$.length === 0) {
2929
return stream$

src/operators/finish.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Observable } from '../observable'
22
import { Stream } from '../stream'
33
import { useUnsubscribeCallback } from '../utils'
44
import { StreamTupleValues, PromiseStatus } from '../types'
5-
import { getGlobalFluthFactory } from '../utils'
5+
import { getGlobalFluthFactory, checkStreamOrObservableInput } from '../utils'
66

77
/**
88
* @description
@@ -13,18 +13,18 @@ import { getGlobalFluthFactory } from '../utils'
1313
* @returns {Stream}
1414
*/
1515
export const finish = <T extends (Stream | Observable)[]>(...args$: T) => {
16+
// check input type
17+
if (!checkStreamOrObservableInput(args$, true)) {
18+
throw new Error('finish operator only accepts Stream or Observable as input')
19+
}
20+
1621
const stream$ = (getGlobalFluthFactory()?.() || new Stream<StreamTupleValues<T>>()) as Stream<
1722
StreamTupleValues<T>
1823
>
1924
const payload: StreamTupleValues<T> = [] as any
2025
let finishCount = 0
2126
let rejectFlag = false
2227

23-
// check input type
24-
if (args$.some((arg$) => !(arg$ instanceof Stream) && !(arg$ instanceof Observable))) {
25-
throw new Error('finish operator only accepts Stream or Observable as input')
26-
}
27-
2828
// if no input, return an empty stream
2929
if (args$.length === 0) {
3030
return stream$

src/operators/fork.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Observable } from '../observable'
22
import { Stream } from '../stream'
3-
import { getGlobalFluthFactory } from '../utils'
3+
import { getGlobalFluthFactory, checkStreamOrObservableInput } from '../utils'
44

55
/**
66
* fork takes a stream or Observable, and returns a stream that emits the same value as the input stream.
@@ -13,14 +13,15 @@ import { getGlobalFluthFactory } from '../utils'
1313
**/
1414

1515
export const fork = <T>(arg$: Stream<T> | Observable<T>, autoUnsubscribe = true): Stream<T> => {
16-
const stream$ = (getGlobalFluthFactory()?.() || new Stream<T>()) as Stream<T>
17-
let finishFlag = false
18-
1916
// check input type
20-
if (!(arg$ instanceof Stream) && !(arg$ instanceof Observable)) {
17+
if (!checkStreamOrObservableInput(arg$)) {
2118
throw new Error('fork operator only accepts Stream or Observable as input')
2219
}
2320

21+
const stream$ = (getGlobalFluthFactory()?.(arg$._getProtectedProperty('_v')) ||
22+
new Stream<T>(arg$._getProtectedProperty('_v') as T)) as Stream<T>
23+
let finishFlag = false
24+
2425
// if arg$ is finished, should not fork
2526
if (arg$._getProtectedProperty('_finishFlag')) {
2627
return stream$

src/operators/merge.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Observable } from '../observable'
22
import { Stream } from '../stream'
3-
import { useUnsubscribeCallback } from '../utils'
3+
import { checkStreamOrObservableInput, useUnsubscribeCallback } from '../utils'
44
import { StreamTupleValues } from '../types'
55
import { getGlobalFluthFactory } from '../utils'
66

@@ -12,15 +12,15 @@ import { getGlobalFluthFactory } from '../utils'
1212
* @returns {Stream}
1313
*/
1414
export const merge = <T extends (Stream | Observable)[]>(...args$: T) => {
15-
const stream$ = (getGlobalFluthFactory()?.() ||
16-
new Stream<StreamTupleValues<T>[number]>()) as Stream<StreamTupleValues<T>[number]>
17-
let finishCount = 0
18-
1915
// check input type
20-
if (args$.some((arg$) => !(arg$ instanceof Stream) && !(arg$ instanceof Observable))) {
16+
if (!checkStreamOrObservableInput(args$, true)) {
2117
throw new Error('merge operator only accepts Stream or Observable as input')
2218
}
2319

20+
const stream$ = (getGlobalFluthFactory()?.() ||
21+
new Stream<StreamTupleValues<T>[number]>()) as Stream<StreamTupleValues<T>[number]>
22+
let finishCount = 0
23+
2424
// check input empty
2525
if (args$.length === 0) {
2626
return stream$

src/operators/partition.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Observable } from '../observable'
22
import { Stream } from '../stream'
33
import { finish } from './finish'
44
import { PromiseStatus } from '../types'
5-
import { getGlobalFluthFactory } from '../utils'
5+
import { checkStreamOrObservableInput, getGlobalFluthFactory } from '../utils'
66

77
/**
88
* partition takes a stream or Observable, and a predicate function that takes value and index as arguments.
@@ -20,16 +20,16 @@ export const partition = <T>(
2020
predicate: (this: any, value: any, status: PromiseStatus, index: number) => boolean,
2121
thisArg?: any,
2222
) => {
23+
// check input type
24+
if (!checkStreamOrObservableInput(stream$)) {
25+
throw new Error('partition operator only accepts Stream or Observable as input')
26+
}
27+
2328
const selectedStream$ = (getGlobalFluthFactory()?.() || new Stream<T>()) as Stream<T>
2429
const unselectedStream$ = (getGlobalFluthFactory()?.() || new Stream<T>()) as Stream<T>
2530
let finishFlag = false
2631
let index = 1
2732

28-
// check input type
29-
if (!(stream$ instanceof Stream) && !(stream$ instanceof Observable)) {
30-
throw new Error('partition operator only accepts Stream or Observable as input')
31-
}
32-
3333
// check input finished
3434
if (stream$._getProtectedProperty('_finishFlag')) {
3535
finishFlag = true

src/operators/promiseAll.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import { Observable } from '../observable'
22
import { Stream } from '../stream'
33
import { StreamTupleValues, PromiseStatus } from '../types'
4-
import { useUnsubscribeCallback } from '../utils'
5-
import { getGlobalFluthFactory } from '../utils'
4+
import {
5+
getGlobalFluthFactory,
6+
useUnsubscribeCallback,
7+
checkStreamOrObservableInput,
8+
} from '../utils'
69

710
/**
811
* Internal implementation function for promiseAll variants
@@ -14,6 +17,11 @@ const promiseAllImpl = <T extends (Stream | Observable)[]>(
1417
args$: T,
1518
shouldAwait = true,
1619
): Stream<StreamTupleValues<T>> => {
20+
// check input type
21+
if (!checkStreamOrObservableInput(args$, true)) {
22+
throw new Error('promiseAll operator only accepts Stream or Observable as input')
23+
}
24+
1725
const stream$ = (getGlobalFluthFactory()?.() || new Stream<StreamTupleValues<T>>()) as Stream<
1826
StreamTupleValues<T>
1927
>
@@ -37,11 +45,6 @@ const promiseAllImpl = <T extends (Stream | Observable)[]>(
3745
}
3846
}
3947

40-
// check input type
41-
if (!args$.every((arg$) => arg$ instanceof Stream || arg$ instanceof Observable)) {
42-
throw new Error('promiseAll operator only accepts Stream or Observable as input')
43-
}
44-
4548
// check input empty
4649
if (args$.length === 0) {
4750
return stream$

src/operators/promiseRace.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Observable } from '../observable'
22
import { Stream } from '../stream'
33
import { StreamTupleValues } from '../types'
4-
import { getGlobalFluthFactory } from '../utils'
4+
import { checkStreamOrObservableInput, getGlobalFluthFactory } from '../utils'
55

66
/**
77
* race takes multiple streams or Observable, and returns a stream that emits the first value of all the input streams.
@@ -11,17 +11,17 @@ import { getGlobalFluthFactory } from '../utils'
1111
* @returns {Stream}
1212
*/
1313
export const promiseRace = <T extends (Stream | Observable)[]>(...args$: T) => {
14+
// check input type
15+
if (!checkStreamOrObservableInput(args$, true)) {
16+
throw new Error('promiseRace operator only accepts Stream or Observable as input')
17+
}
18+
1419
const stream$ = (getGlobalFluthFactory()?.() ||
1520
new Stream<StreamTupleValues<T>[number]>()) as Stream<StreamTupleValues<T>[number]>
1621
let finishFlag = false
1722
let finishCount = 0
1823
let firstIndex: number | null = null
1924

20-
// check input type
21-
if (!args$.every((arg$) => arg$ instanceof Stream || arg$ instanceof Observable)) {
22-
throw new Error('promiseRace operator only accepts Stream or Observable as input')
23-
}
24-
2525
// check input empty
2626
if (args$.length === 0) {
2727
return stream$

src/utils.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { isObject, isMap, isSet } from 'limu/lib/support/util'
2-
import { Stream } from './stream'
2+
import type { Stream } from './stream'
3+
import { Observable } from './observable'
34

45
export const safeCallback = (callback: any, errorHandler?: (error: any) => void) => {
56
return (...args: any[]) => {
@@ -39,6 +40,14 @@ export const isAsyncFunction = (fn: any): fn is (...args: any[]) => PromiseLike<
3940
)
4041
}
4142

43+
export const checkStreamOrObservableInput = (arg$: any, isArray = false) => {
44+
if (isArray) {
45+
return Array.isArray(arg$) && arg$.every((arg) => arg instanceof Observable)
46+
} else {
47+
return arg$ instanceof Observable
48+
}
49+
}
50+
4251
export const useUnsubscribeCallback = (stream$: Stream, length: number) => {
4352
let unsubscribeCount = 0
4453
const unsubscribeCallback = () => {

0 commit comments

Comments
 (0)