diff --git a/docs/modules/ObservableEither.ts.md b/docs/modules/ObservableEither.ts.md index 71f09b4..6caed1d 100644 --- a/docs/modules/ObservableEither.ts.md +++ b/docs/modules/ObservableEither.ts.md @@ -31,6 +31,7 @@ Added in v0.6.8 - [apSecond](#apsecond) - [chainFirst](#chainfirst) - [flatten](#flatten) + - [liftOperator](#liftoperator) - [orElse](#orelse) - [swap](#swap) - [constructors](#constructors) @@ -255,6 +256,21 @@ export declare const flatten: (mma: ObservableEither( + f: OperatorFunction +): (obs: ObservableEither) => ObservableEither +``` + +Added in v0.6.12 + ## orElse **Signature** diff --git a/docs/modules/ObservableThese.ts.md b/docs/modules/ObservableThese.ts.md index e5e94c0..f01a590 100644 --- a/docs/modules/ObservableThese.ts.md +++ b/docs/modules/ObservableThese.ts.md @@ -20,6 +20,7 @@ Added in v0.6.12 - [Functor](#functor) - [map](#map) - [combinators](#combinators) + - [liftOperator](#liftoperator) - [swap](#swap) - [constructors](#constructors) - [both](#both) @@ -103,6 +104,21 @@ Added in v0.6.12 # combinators +## liftOperator + +Lifts an OperatorFunction into an ObservableThese context +Allows e.g. filter to be used on on ObservableThese + +**Signature** + +```ts +export declare function liftOperator( + f: OperatorFunction +): (obs: ObservableThese) => ObservableThese +``` + +Added in v0.6.12 + ## swap **Signature** diff --git a/docs/modules/ReaderObservableEither.ts.md b/docs/modules/ReaderObservableEither.ts.md index e8ffe7b..9e7eb02 100644 --- a/docs/modules/ReaderObservableEither.ts.md +++ b/docs/modules/ReaderObservableEither.ts.md @@ -31,6 +31,7 @@ Added in v0.6.10 - [apSecond](#apsecond) - [chainFirst](#chainfirst) - [flatten](#flatten) + - [liftOperator](#liftoperator) - [local](#local) - [constructors](#constructors) - [ask](#ask) @@ -246,6 +247,21 @@ export declare const flatten: ( Added in v0.6.10 +## liftOperator + +Lifts an OperatorFunction into a ReaderObservableEither context +Allows e.g. filter to be used on on ReaderObservableEither + +**Signature** + +```ts +export declare function liftOperator( + f: OperatorFunction +): (obs: ReaderObservableEither) => ReaderObservableEither +``` + +Added in v0.6.12 + ## local **Signature** diff --git a/docs/modules/StateReaderObservableEither.ts.md b/docs/modules/StateReaderObservableEither.ts.md index 319964a..d31c343 100644 --- a/docs/modules/StateReaderObservableEither.ts.md +++ b/docs/modules/StateReaderObservableEither.ts.md @@ -29,6 +29,7 @@ Added in v0.6.10 - [apSecond](#apsecond) - [chainFirst](#chainfirst) - [flatten](#flatten) + - [liftOperator](#liftoperator) - [constructors](#constructors) - [fromIO](#fromio) - [fromObservable](#fromobservable) @@ -233,6 +234,21 @@ export declare const flatten: ( Added in v0.6.10 +## liftOperator + +Lifts an OperatorFunction into a StateReaderObservableEither context +Allows e.g. filter to be used on on StateReaderObservableEither + +**Signature** + +```ts +export declare function liftOperator( + f: OperatorFunction<[A, S], [B, S]> +): (obs: StateReaderObservableEither) => StateReaderObservableEither +``` + +Added in v0.6.12 + # constructors ## fromIO diff --git a/src/ObservableEither.ts b/src/ObservableEither.ts index 7bd53a5..e31d192 100644 --- a/src/ObservableEither.ts +++ b/src/ObservableEither.ts @@ -17,8 +17,8 @@ import { MonadThrow2 } from 'fp-ts/lib/MonadThrow' import { Option } from 'fp-ts/lib/Option' import { pipe } from 'fp-ts/lib/pipeable' import * as TE from 'fp-ts/lib/TaskEither' -import { Observable } from 'rxjs' -import { catchError } from 'rxjs/operators' +import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs' +import { finalize, catchError } from 'rxjs/operators' import { MonadObservable2 } from './MonadObservable' import * as R from './Observable' @@ -166,6 +166,27 @@ export const swap: (ma: ObservableEither) => ObservableEither /*#__PURE__*/ R.map(E.swap) +/** + * Lifts an OperatorFunction into an ObservableEither context + * Allows e.g. filter to be used on on ObservableEither + * + * @category combinators + * @since 0.6.12 + */ +export function liftOperator( + f: OperatorFunction +): (obs: ObservableEither) => ObservableEither { + return obs => { + const subj = new Subject>() + return merge( + pipe(subj, R.separate, ({ left, right }) => merge(pipe(left, R.map(E.left)), pipe(right, f, R.map(E.right)))), + defer(() => { + obs.pipe(finalize(() => subj.complete())).subscribe(subj) + }) + ) + } +} + // ------------------------------------------------------------------------------------- // type class members // ------------------------------------------------------------------------------------- diff --git a/src/ObservableThese.ts b/src/ObservableThese.ts index bb4166d..812aa40 100644 --- a/src/ObservableThese.ts +++ b/src/ObservableThese.ts @@ -15,7 +15,8 @@ import { pipe } from 'fp-ts/lib/pipeable' import { Semigroup } from 'fp-ts/lib/Semigroup' import * as TT from 'fp-ts/lib/TaskThese' import * as TH from 'fp-ts/lib/These' -import { Observable } from 'rxjs' +import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs' +import { finalize, withLatestFrom } from 'rxjs/operators' import * as R from './Observable' // ------------------------------------------------------------------------------------- @@ -142,6 +143,55 @@ export const swap: (ma: ObservableThese) => ObservableThese = /*#__PURE__*/ R.map(TH.swap) +/** + * Lifts an OperatorFunction into an ObservableThese context + * Allows e.g. filter to be used on on ObservableThese + * + * @category combinators + * @since 0.6.12 + */ +export function liftOperator( + f: OperatorFunction +): (obs: ObservableThese) => ObservableThese { + return obs => { + const subj = new Subject>() + return merge( + pipe( + merge( + pipe(subj, R.filter(TH.isLeft)), + pipe( + subj, + R.filter(TH.isRight), + R.map(({ right }) => right), + f, + R.map(TH.right) + ), + pipe( + subj, + R.filter(TH.isBoth), + a => + pipe( + a, + R.map(({ right }) => right), + f, + withLatestFrom( + pipe( + a, + R.map(({ left }) => left) + ) + ) + ), + R.map(([b, e]) => TH.both(e, b)) + ) + ) + ), + defer(() => { + obs.pipe(finalize(() => subj.complete())).subscribe(subj) + }) + ) + } +} + // ------------------------------------------------------------------------------------- // type class members // ------------------------------------------------------------------------------------- diff --git a/src/ReaderObservableEither.ts b/src/ReaderObservableEither.ts index a1e8d03..dad4695 100644 --- a/src/ReaderObservableEither.ts +++ b/src/ReaderObservableEither.ts @@ -14,6 +14,7 @@ import { MonadThrow3 } from 'fp-ts/lib/MonadThrow' import { Option } from 'fp-ts/lib/Option' import { pipe } from 'fp-ts/lib/pipeable' import * as R from 'fp-ts/lib/Reader' +import { OperatorFunction } from 'rxjs' import { MonadObservable3 } from './MonadObservable' import * as OE from './ObservableEither' @@ -103,6 +104,19 @@ export const local: ( f: (d: R2) => R1 ) => (ma: ReaderObservableEither) => ReaderObservableEither = R.local +/** + * Lifts an OperatorFunction into a ReaderObservableEither context + * Allows e.g. filter to be used on on ReaderObservableEither + * + * @category combinators + * @since 0.6.12 + */ +export function liftOperator( + f: OperatorFunction +): (obs: ReaderObservableEither) => ReaderObservableEither { + return obs => r => OE.liftOperator(f)(obs(r)) +} + // ------------------------------------------------------------------------------------- // type class members // ------------------------------------------------------------------------------------- diff --git a/src/StateReaderObservableEither.ts b/src/StateReaderObservableEither.ts index 2ec1d09..73a6300 100644 --- a/src/StateReaderObservableEither.ts +++ b/src/StateReaderObservableEither.ts @@ -13,8 +13,10 @@ import { MonadTask4 } from 'fp-ts/lib/MonadTask' import { MonadThrow4 } from 'fp-ts/lib/MonadThrow' import { Option } from 'fp-ts/lib/Option' import { pipe } from 'fp-ts/lib/pipeable' +import { OperatorFunction } from 'rxjs' import { MonadObservable4 } from './MonadObservable' import * as OB from './Observable' +import * as OE from './ObservableEither' import * as ROE from './ReaderObservableEither' // ------------------------------------------------------------------------------------- @@ -193,6 +195,19 @@ export const apSecond = ( ap(fb) ) +/** + * Lifts an OperatorFunction into a StateReaderObservableEither context + * Allows e.g. filter to be used on on StateReaderObservableEither + * + * @category combinators + * @since 0.6.12 + */ +export function liftOperator( + f: OperatorFunction<[A, S], [B, S]> +): (obs: StateReaderObservableEither) => StateReaderObservableEither { + return obs => s => r => OE.liftOperator(f)(obs(s)(r)) +} + /** * @category Bifunctor * @since 0.6.10 diff --git a/test/ObservableEither.ts b/test/ObservableEither.ts index 06a050c..b6549f8 100644 --- a/test/ObservableEither.ts +++ b/test/ObservableEither.ts @@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable' import { bufferTime } from 'rxjs/operators' import * as O from 'fp-ts/lib/Option' import * as _ from '../src/ObservableEither' -import { of as rxOf, Observable, throwError as rxThrowError } from 'rxjs' +import { of as rxOf, Observable, from, throwError as rxThrowError } from 'rxjs' +import { filter } from '../src/Observable' describe('ObservableEither', () => { it('rightIO', async () => { @@ -125,6 +126,20 @@ describe('ObservableEither', () => { assert.deepStrictEqual(e, [E.left(1)]) }) + it('liftOperator (left)', async () => { + const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0))) + .pipe(bufferTime(10)) + .toPromise() + assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')]) + }) + + it('liftOperator (right)', async () => { + const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0))) + .pipe(bufferTime(10)) + .toPromise() + assert.deepStrictEqual(e, [E.right(2), E.right(4)]) + }) + describe('Monad', () => { it('of', async () => { const fea = _.of(1) diff --git a/test/ObservableThese.ts b/test/ObservableThese.ts index 23aff01..c99ac4f 100644 --- a/test/ObservableThese.ts +++ b/test/ObservableThese.ts @@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable' import { bufferTime } from 'rxjs/operators' import { monoidString } from 'fp-ts/lib/Monoid' import * as _ from '../src/ObservableThese' -import { of as rxOf, Observable } from 'rxjs' +import { of as rxOf, Observable, from } from 'rxjs' +import { filter } from '../src/Observable' describe('ObservableThese', () => { it('rightIO', async () => { @@ -93,6 +94,30 @@ describe('ObservableThese', () => { assert.deepStrictEqual(e, [TH.both(2, 1)]) }) + it('liftOperator (left)', async () => { + const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0))) + .pipe(bufferTime(10)) + .toPromise() + assert.deepStrictEqual(e, [TH.left('error1'), TH.left('error2')]) + }) + + it('liftOperator (right)', async () => { + const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0))) + .pipe(bufferTime(10)) + .toPromise() + assert.deepStrictEqual(e, [TH.right(2), TH.right(4)]) + }) + + it('liftOperator (both)', async () => { + const e = await pipe( + from([TH.both('error1', 1), TH.both('error2', 2), TH.both('error3', 3), TH.both('error4', 4)]), + _.liftOperator(filter(x => x % 2 === 0)) + ) + .pipe(bufferTime(10)) + .toPromise() + assert.deepStrictEqual(e, [TH.both('error2', 2), TH.both('error4', 4)]) + }) + it('map', async () => { const f = (n: number): number => n * 2 assert.deepStrictEqual(await pipe(_.right(1), _.map(f)).toPromise(), TH.right(2)) diff --git a/test/ReaderObservableEither.ts b/test/ReaderObservableEither.ts index a4a3089..797273a 100644 --- a/test/ReaderObservableEither.ts +++ b/test/ReaderObservableEither.ts @@ -8,6 +8,7 @@ import * as O from 'fp-ts/lib/Option' import * as E from 'fp-ts/lib/Either' import * as R from 'fp-ts/lib/Reader' import * as T from 'fp-ts/lib/Task' +import { from } from 'rxjs' // test helper to dry up LOC. export const buffer = flow(R.map(bufferTime(10)), R.map(OB.toTask)) @@ -100,6 +101,30 @@ describe('ReaderObservable', () => { assert.deepStrictEqual(e, [E.right(4)]) }) + it('liftOperator (left)', async () => { + const robe = pipe( + from(['error1', 'error2']), + OBE.leftObservable, + _.fromObservableEither, + _.liftOperator(OB.filter(x => x % 2 === 0)), + buffer + ) + const e = await robe({})() + assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')]) + }) + + it('liftOperator (right)', async () => { + const robe = pipe( + from([1, 2, 3, 4]), + OBE.rightObservable, + _.fromObservableEither, + _.liftOperator(OB.filter(x => x % 2 === 0)), + buffer + ) + const e = await robe({})() + assert.deepStrictEqual(e, [E.right(2), E.right(4)]) + }) + it('fromTask', async () => { const robe = pipe(_.fromTask(T.of(1)), buffer) const x = await robe({})() diff --git a/test/StateReaderObservableEither.ts b/test/StateReaderObservableEither.ts index e840b37..f318422 100644 --- a/test/StateReaderObservableEither.ts +++ b/test/StateReaderObservableEither.ts @@ -8,6 +8,8 @@ import { bufferTime } from 'rxjs/operators' import { observable as OB, stateReaderObservableEither as _ } from '../src' import { buffer as _buffer } from './ReaderObservableEither' import * as ROE from '../src/ReaderObservableEither' +import { from } from 'rxjs' +import { leftObservable } from '../src/ObservableEither' function buffer( srobe: _.StateReaderObservableEither @@ -135,6 +137,24 @@ describe('stateReaderObservableEither', () => { }) }) + it('liftOperator (left)', async () => { + const srobe = pipe( + leftObservable(from(['error1', 'error2'])), + ROE.fromObservableEither, + _.fromReaderObservableEither, + _.liftOperator(OB.filter(([x]) => x % 2 === 0)), + buffer + ) + const e = await srobe(1)(2)() + assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')]) + }) + + it('liftOperator (right)', async () => { + const srobe = pipe(from([1, 2, 3, 4]), _.fromObservable, _.liftOperator(OB.filter(([x]) => x % 2 === 0)), buffer) + const e = await srobe(1)(2)() + assert.deepStrictEqual(e, [E.right([2, 1]), E.right([4, 1])]) + }) + it('chainFirst', async () => { const f = (a: string) => _.of(a.length) const e1 = await pipe(