Skip to content

Commit 1de9204

Browse files
liftOperator
1 parent d3c804e commit 1de9204

12 files changed

+254
-5
lines changed

docs/modules/ObservableEither.ts.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Added in v0.6.8
3131
- [apSecond](#apsecond)
3232
- [chainFirst](#chainfirst)
3333
- [flatten](#flatten)
34+
- [liftOperator](#liftoperator)
3435
- [orElse](#orelse)
3536
- [swap](#swap)
3637
- [constructors](#constructors)
@@ -255,6 +256,21 @@ export declare const flatten: <E, A>(mma: ObservableEither<E, ObservableEither<E
255256
256257
Added in v0.6.0
257258
259+
## liftOperator
260+
261+
Lifts an OperatorFunction into an ObservableEither context
262+
Allows e.g. filter to be used on on ObservableEither
263+
264+
**Signature**
265+
266+
```ts
267+
export declare function liftOperator<E, A, B>(
268+
f: OperatorFunction<A, B>
269+
): (obs: ObservableEither<E, A>) => ObservableEither<E, B>
270+
```
271+
272+
Added in v0.6.12
273+
258274
## orElse
259275

260276
**Signature**

docs/modules/ObservableThese.ts.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Added in v0.6.12
2020
- [Functor](#functor)
2121
- [map](#map)
2222
- [combinators](#combinators)
23+
- [liftOperator](#liftoperator)
2324
- [swap](#swap)
2425
- [constructors](#constructors)
2526
- [both](#both)
@@ -103,6 +104,21 @@ Added in v0.6.12
103104
104105
# combinators
105106
107+
## liftOperator
108+
109+
Lifts an OperatorFunction into an ObservableThese context
110+
Allows e.g. filter to be used on on ObservableThese
111+
112+
**Signature**
113+
114+
```ts
115+
export declare function liftOperator<E, A, B>(
116+
f: OperatorFunction<A, B>
117+
): (obs: ObservableThese<E, A>) => ObservableThese<E, B>
118+
```
119+
120+
Added in v0.6.12
121+
106122
## swap
107123

108124
**Signature**

docs/modules/ReaderObservableEither.ts.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Added in v0.6.10
3131
- [apSecond](#apsecond)
3232
- [chainFirst](#chainfirst)
3333
- [flatten](#flatten)
34+
- [liftOperator](#liftoperator)
3435
- [local](#local)
3536
- [constructors](#constructors)
3637
- [ask](#ask)
@@ -246,6 +247,21 @@ export declare const flatten: <R, E, A>(
246247
247248
Added in v0.6.10
248249
250+
## liftOperator
251+
252+
Lifts an OperatorFunction into a ReaderObservableEither context
253+
Allows e.g. filter to be used on on ReaderObservableEither
254+
255+
**Signature**
256+
257+
```ts
258+
export declare function liftOperator<R, E, A, B>(
259+
f: OperatorFunction<A, B>
260+
): (obs: ReaderObservableEither<R, E, A>) => ReaderObservableEither<R, E, B>
261+
```
262+
263+
Added in v0.6.12
264+
249265
## local
250266

251267
**Signature**

docs/modules/StateReaderObservableEither.ts.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Added in v0.6.10
2929
- [apSecond](#apsecond)
3030
- [chainFirst](#chainfirst)
3131
- [flatten](#flatten)
32+
- [liftOperator](#liftoperator)
3233
- [constructors](#constructors)
3334
- [fromIO](#fromio)
3435
- [fromObservable](#fromobservable)
@@ -233,6 +234,21 @@ export declare const flatten: <S, R, E, A>(
233234
234235
Added in v0.6.10
235236
237+
## liftOperator
238+
239+
Lifts an OperatorFunction into a StateReaderObservableEither context
240+
Allows e.g. filter to be used on on StateReaderObservableEither
241+
242+
**Signature**
243+
244+
```ts
245+
export declare function liftOperator<S, R, E, A, B>(
246+
f: OperatorFunction<[A, S], [B, S]>
247+
): (obs: StateReaderObservableEither<S, R, E, A>) => StateReaderObservableEither<S, R, E, B>
248+
```
249+
250+
Added in v0.6.12
251+
236252
# constructors
237253

238254
## fromIO

src/ObservableEither.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import { MonadThrow2 } from 'fp-ts/lib/MonadThrow'
1717
import { Option } from 'fp-ts/lib/Option'
1818
import { pipe } from 'fp-ts/lib/pipeable'
1919
import * as TE from 'fp-ts/lib/TaskEither'
20-
import { Observable } from 'rxjs'
21-
import { catchError } from 'rxjs/operators'
20+
import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs'
21+
import { finalize, catchError } from 'rxjs/operators'
2222
import { MonadObservable2 } from './MonadObservable'
2323
import * as R from './Observable'
2424

@@ -166,6 +166,27 @@ export const swap: <E, A>(ma: ObservableEither<E, A>) => ObservableEither<A, E>
166166
/*#__PURE__*/
167167
R.map(E.swap)
168168

169+
/**
170+
* Lifts an OperatorFunction into an ObservableEither context
171+
* Allows e.g. filter to be used on on ObservableEither
172+
*
173+
* @category combinators
174+
* @since 0.6.12
175+
*/
176+
export function liftOperator<E, A, B>(
177+
f: OperatorFunction<A, B>
178+
): (obs: ObservableEither<E, A>) => ObservableEither<E, B> {
179+
return obs => {
180+
const subj = new Subject<E.Either<E, A>>()
181+
return merge(
182+
pipe(subj, R.separate, ({ left, right }) => merge(pipe(left, R.map(E.left)), pipe(right, f, R.map(E.right)))),
183+
defer(() => {
184+
obs.pipe(finalize(() => subj.complete())).subscribe(subj)
185+
})
186+
)
187+
}
188+
}
189+
169190
// -------------------------------------------------------------------------------------
170191
// type class members
171192
// -------------------------------------------------------------------------------------

src/ObservableThese.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
1515
import { Semigroup } from 'fp-ts/lib/Semigroup'
1616
import * as TT from 'fp-ts/lib/TaskThese'
1717
import * as TH from 'fp-ts/lib/These'
18-
import { Observable } from 'rxjs'
18+
import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs'
19+
import { finalize, withLatestFrom } from 'rxjs/operators'
1920
import * as R from './Observable'
2021

2122
// -------------------------------------------------------------------------------------
@@ -142,6 +143,55 @@ export const swap: <E, A>(ma: ObservableThese<E, A>) => ObservableThese<A, E> =
142143
/*#__PURE__*/
143144
R.map(TH.swap)
144145

146+
/**
147+
* Lifts an OperatorFunction into an ObservableThese context
148+
* Allows e.g. filter to be used on on ObservableThese
149+
*
150+
* @category combinators
151+
* @since 0.6.12
152+
*/
153+
export function liftOperator<E, A, B>(
154+
f: OperatorFunction<A, B>
155+
): (obs: ObservableThese<E, A>) => ObservableThese<E, B> {
156+
return obs => {
157+
const subj = new Subject<TH.These<E, A>>()
158+
return merge(
159+
pipe(
160+
merge(
161+
pipe(subj, R.filter(TH.isLeft)),
162+
pipe(
163+
subj,
164+
R.filter(TH.isRight),
165+
R.map(({ right }) => right),
166+
f,
167+
R.map(TH.right)
168+
),
169+
pipe(
170+
subj,
171+
R.filter(TH.isBoth),
172+
a =>
173+
pipe(
174+
a,
175+
R.map(({ right }) => right),
176+
f,
177+
withLatestFrom(
178+
pipe(
179+
a,
180+
R.map(({ left }) => left)
181+
)
182+
)
183+
),
184+
R.map(([b, e]) => TH.both(e, b))
185+
)
186+
)
187+
),
188+
defer(() => {
189+
obs.pipe(finalize(() => subj.complete())).subscribe(subj)
190+
})
191+
)
192+
}
193+
}
194+
145195
// -------------------------------------------------------------------------------------
146196
// type class members
147197
// -------------------------------------------------------------------------------------

src/ReaderObservableEither.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { MonadThrow3 } from 'fp-ts/lib/MonadThrow'
1414
import { Option } from 'fp-ts/lib/Option'
1515
import { pipe } from 'fp-ts/lib/pipeable'
1616
import * as R from 'fp-ts/lib/Reader'
17+
import { OperatorFunction } from 'rxjs'
1718
import { MonadObservable3 } from './MonadObservable'
1819
import * as OE from './ObservableEither'
1920

@@ -103,6 +104,19 @@ export const local: <R2, R1>(
103104
f: (d: R2) => R1
104105
) => <E, A>(ma: ReaderObservableEither<R1, E, A>) => ReaderObservableEither<R2, E, A> = R.local
105106

107+
/**
108+
* Lifts an OperatorFunction into a ReaderObservableEither context
109+
* Allows e.g. filter to be used on on ReaderObservableEither
110+
*
111+
* @category combinators
112+
* @since 0.6.12
113+
*/
114+
export function liftOperator<R, E, A, B>(
115+
f: OperatorFunction<A, B>
116+
): (obs: ReaderObservableEither<R, E, A>) => ReaderObservableEither<R, E, B> {
117+
return obs => r => OE.liftOperator<E, A, B>(f)(obs(r))
118+
}
119+
106120
// -------------------------------------------------------------------------------------
107121
// type class members
108122
// -------------------------------------------------------------------------------------

src/StateReaderObservableEither.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import { MonadTask4 } from 'fp-ts/lib/MonadTask'
1313
import { MonadThrow4 } from 'fp-ts/lib/MonadThrow'
1414
import { Option } from 'fp-ts/lib/Option'
1515
import { pipe } from 'fp-ts/lib/pipeable'
16+
import { OperatorFunction } from 'rxjs'
1617
import { MonadObservable4 } from './MonadObservable'
1718
import * as OB from './Observable'
19+
import * as OE from './ObservableEither'
1820
import * as ROE from './ReaderObservableEither'
1921

2022
// -------------------------------------------------------------------------------------
@@ -193,6 +195,19 @@ export const apSecond = <S, R, E, B>(
193195
ap(fb)
194196
)
195197

198+
/**
199+
* Lifts an OperatorFunction into a StateReaderObservableEither context
200+
* Allows e.g. filter to be used on on StateReaderObservableEither
201+
*
202+
* @category combinators
203+
* @since 0.6.12
204+
*/
205+
export function liftOperator<S, R, E, A, B>(
206+
f: OperatorFunction<[A, S], [B, S]>
207+
): (obs: StateReaderObservableEither<S, R, E, A>) => StateReaderObservableEither<S, R, E, B> {
208+
return obs => s => r => OE.liftOperator<E, [A, S], [B, S]>(f)(obs(s)(r))
209+
}
210+
196211
/**
197212
* @category Bifunctor
198213
* @since 0.6.10

test/ObservableEither.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
77
import { bufferTime } from 'rxjs/operators'
88
import * as O from 'fp-ts/lib/Option'
99
import * as _ from '../src/ObservableEither'
10-
import { of as rxOf, Observable, throwError as rxThrowError } from 'rxjs'
10+
import { of as rxOf, Observable, from, throwError as rxThrowError } from 'rxjs'
11+
import { filter } from '../src/Observable'
1112

1213
describe('ObservableEither', () => {
1314
it('rightIO', async () => {
@@ -125,6 +126,20 @@ describe('ObservableEither', () => {
125126
assert.deepStrictEqual(e, [E.left(1)])
126127
})
127128

129+
it('liftOperator (left)', async () => {
130+
const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0)))
131+
.pipe(bufferTime(10))
132+
.toPromise()
133+
assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')])
134+
})
135+
136+
it('liftOperator (right)', async () => {
137+
const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0)))
138+
.pipe(bufferTime(10))
139+
.toPromise()
140+
assert.deepStrictEqual(e, [E.right(2), E.right(4)])
141+
})
142+
128143
describe('Monad', () => {
129144
it('of', async () => {
130145
const fea = _.of(1)

test/ObservableThese.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
77
import { bufferTime } from 'rxjs/operators'
88
import { monoidString } from 'fp-ts/lib/Monoid'
99
import * as _ from '../src/ObservableThese'
10-
import { of as rxOf, Observable } from 'rxjs'
10+
import { of as rxOf, Observable, from } from 'rxjs'
11+
import { filter } from '../src/Observable'
1112

1213
describe('ObservableThese', () => {
1314
it('rightIO', async () => {
@@ -93,6 +94,30 @@ describe('ObservableThese', () => {
9394
assert.deepStrictEqual(e, [TH.both(2, 1)])
9495
})
9596

97+
it('liftOperator (left)', async () => {
98+
const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0)))
99+
.pipe(bufferTime(10))
100+
.toPromise()
101+
assert.deepStrictEqual(e, [TH.left('error1'), TH.left('error2')])
102+
})
103+
104+
it('liftOperator (right)', async () => {
105+
const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0)))
106+
.pipe(bufferTime(10))
107+
.toPromise()
108+
assert.deepStrictEqual(e, [TH.right(2), TH.right(4)])
109+
})
110+
111+
it('liftOperator (both)', async () => {
112+
const e = await pipe(
113+
from([TH.both('error1', 1), TH.both('error2', 2), TH.both('error3', 3), TH.both('error4', 4)]),
114+
_.liftOperator(filter(x => x % 2 === 0))
115+
)
116+
.pipe(bufferTime(10))
117+
.toPromise()
118+
assert.deepStrictEqual(e, [TH.both('error2', 2), TH.both('error4', 4)])
119+
})
120+
96121
it('map', async () => {
97122
const f = (n: number): number => n * 2
98123
assert.deepStrictEqual(await pipe(_.right(1), _.map(f)).toPromise(), TH.right(2))

0 commit comments

Comments
 (0)