Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/modules/ObservableEither.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Added in v0.6.8
- [apSecond](#apsecond)
- [chainFirst](#chainfirst)
- [flatten](#flatten)
- [liftOperator](#liftoperator)
- [orElse](#orelse)
- [swap](#swap)
- [constructors](#constructors)
Expand Down Expand Up @@ -255,6 +256,21 @@ export declare const flatten: <E, A>(mma: ObservableEither<E, ObservableEither<E

Added in v0.6.0

## liftOperator

Lifts an OperatorFunction into an ObservableEither context
Allows e.g. filter to be used on on ObservableEither

**Signature**

```ts
export declare function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableEither<E, A>) => ObservableEither<E, B>
```

Added in v0.6.12

## orElse

**Signature**
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/ObservableThese.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Added in v0.6.12
- [Functor](#functor)
- [map](#map)
- [combinators](#combinators)
- [liftOperator](#liftoperator)
- [swap](#swap)
- [constructors](#constructors)
- [both](#both)
Expand Down Expand Up @@ -103,6 +104,21 @@ Added in v0.6.12

# combinators

## liftOperator

Lifts an OperatorFunction into an ObservableThese context
Allows e.g. filter to be used on on ObservableThese

**Signature**

```ts
export declare function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableThese<E, A>) => ObservableThese<E, B>
```

Added in v0.6.12

## swap

**Signature**
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/ReaderObservableEither.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Added in v0.6.10
- [apSecond](#apsecond)
- [chainFirst](#chainfirst)
- [flatten](#flatten)
- [liftOperator](#liftoperator)
- [local](#local)
- [constructors](#constructors)
- [ask](#ask)
Expand Down Expand Up @@ -246,6 +247,21 @@ export declare const flatten: <R, E, A>(

Added in v0.6.10

## liftOperator

Lifts an OperatorFunction into a ReaderObservableEither context
Allows e.g. filter to be used on on ReaderObservableEither

**Signature**

```ts
export declare function liftOperator<R, E, A, B>(
f: OperatorFunction<A, B>
): (obs: ReaderObservableEither<R, E, A>) => ReaderObservableEither<R, E, B>
```

Added in v0.6.12

## local

**Signature**
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/StateReaderObservableEither.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Added in v0.6.10
- [apSecond](#apsecond)
- [chainFirst](#chainfirst)
- [flatten](#flatten)
- [liftOperator](#liftoperator)
- [constructors](#constructors)
- [fromIO](#fromio)
- [fromObservable](#fromobservable)
Expand Down Expand Up @@ -233,6 +234,21 @@ export declare const flatten: <S, R, E, A>(

Added in v0.6.10

## liftOperator

Lifts an OperatorFunction into a StateReaderObservableEither context
Allows e.g. filter to be used on on StateReaderObservableEither

**Signature**

```ts
export declare function liftOperator<S, R, E, A, B>(
f: OperatorFunction<[A, S], [B, S]>
): (obs: StateReaderObservableEither<S, R, E, A>) => StateReaderObservableEither<S, R, E, B>
```

Added in v0.6.12

# constructors

## fromIO
Expand Down
25 changes: 23 additions & 2 deletions src/ObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import { MonadThrow2 } from 'fp-ts/lib/MonadThrow'
import { Option } from 'fp-ts/lib/Option'
import { pipe } from 'fp-ts/lib/pipeable'
import * as TE from 'fp-ts/lib/TaskEither'
import { Observable } from 'rxjs'
import { catchError } from 'rxjs/operators'
import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs'
import { finalize, catchError } from 'rxjs/operators'
import { MonadObservable2 } from './MonadObservable'
import * as R from './Observable'

Expand Down Expand Up @@ -166,6 +166,27 @@ export const swap: <E, A>(ma: ObservableEither<E, A>) => ObservableEither<A, E>
/*#__PURE__*/
R.map(E.swap)

/**
* Lifts an OperatorFunction into an ObservableEither context
* Allows e.g. filter to be used on on ObservableEither
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableEither<E, A>) => ObservableEither<E, B> {
return obs => {
const subj = new Subject<E.Either<E, A>>()
return merge(
pipe(subj, R.separate, ({ left, right }) => merge(pipe(left, R.map(E.left)), pipe(right, f, R.map(E.right)))),
defer(() => {
obs.pipe(finalize(() => subj.complete())).subscribe(subj)
})
)
}
}

// -------------------------------------------------------------------------------------
// type class members
// -------------------------------------------------------------------------------------
Expand Down
52 changes: 51 additions & 1 deletion src/ObservableThese.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
import { Semigroup } from 'fp-ts/lib/Semigroup'
import * as TT from 'fp-ts/lib/TaskThese'
import * as TH from 'fp-ts/lib/These'
import { Observable } from 'rxjs'
import { defer, merge, Observable, OperatorFunction, Subject } from 'rxjs'
import { finalize, withLatestFrom } from 'rxjs/operators'
import * as R from './Observable'

// -------------------------------------------------------------------------------------
Expand Down Expand Up @@ -142,6 +143,55 @@ export const swap: <E, A>(ma: ObservableThese<E, A>) => ObservableThese<A, E> =
/*#__PURE__*/
R.map(TH.swap)

/**
* Lifts an OperatorFunction into an ObservableThese context
* Allows e.g. filter to be used on on ObservableThese
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<E, A, B>(
f: OperatorFunction<A, B>
): (obs: ObservableThese<E, A>) => ObservableThese<E, B> {
return obs => {
const subj = new Subject<TH.These<E, A>>()
return merge(
pipe(
merge(
pipe(subj, R.filter(TH.isLeft)),
pipe(
subj,
R.filter(TH.isRight),
R.map(({ right }) => right),
f,
R.map(TH.right)
),
pipe(
subj,
R.filter(TH.isBoth),
a =>
pipe(
a,
R.map(({ right }) => right),
f,
withLatestFrom(
pipe(
a,
R.map(({ left }) => left)
)
)
),
R.map(([b, e]) => TH.both(e, b))
)
)
),
defer(() => {
obs.pipe(finalize(() => subj.complete())).subscribe(subj)
})
)
}
}

// -------------------------------------------------------------------------------------
// type class members
// -------------------------------------------------------------------------------------
Expand Down
14 changes: 14 additions & 0 deletions src/ReaderObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { MonadThrow3 } from 'fp-ts/lib/MonadThrow'
import { Option } from 'fp-ts/lib/Option'
import { pipe } from 'fp-ts/lib/pipeable'
import * as R from 'fp-ts/lib/Reader'
import { OperatorFunction } from 'rxjs'
import { MonadObservable3 } from './MonadObservable'
import * as OE from './ObservableEither'

Expand Down Expand Up @@ -103,6 +104,19 @@ export const local: <R2, R1>(
f: (d: R2) => R1
) => <E, A>(ma: ReaderObservableEither<R1, E, A>) => ReaderObservableEither<R2, E, A> = R.local

/**
* Lifts an OperatorFunction into a ReaderObservableEither context
* Allows e.g. filter to be used on on ReaderObservableEither
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<R, E, A, B>(
f: OperatorFunction<A, B>
): (obs: ReaderObservableEither<R, E, A>) => ReaderObservableEither<R, E, B> {
return obs => r => OE.liftOperator<E, A, B>(f)(obs(r))
}

// -------------------------------------------------------------------------------------
// type class members
// -------------------------------------------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions src/StateReaderObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import { MonadTask4 } from 'fp-ts/lib/MonadTask'
import { MonadThrow4 } from 'fp-ts/lib/MonadThrow'
import { Option } from 'fp-ts/lib/Option'
import { pipe } from 'fp-ts/lib/pipeable'
import { OperatorFunction } from 'rxjs'
import { MonadObservable4 } from './MonadObservable'
import * as OB from './Observable'
import * as OE from './ObservableEither'
import * as ROE from './ReaderObservableEither'

// -------------------------------------------------------------------------------------
Expand Down Expand Up @@ -193,6 +195,19 @@ export const apSecond = <S, R, E, B>(
ap(fb)
)

/**
* Lifts an OperatorFunction into a StateReaderObservableEither context
* Allows e.g. filter to be used on on StateReaderObservableEither
*
* @category combinators
* @since 0.6.12
*/
export function liftOperator<S, R, E, A, B>(
f: OperatorFunction<[A, S], [B, S]>
): (obs: StateReaderObservableEither<S, R, E, A>) => StateReaderObservableEither<S, R, E, B> {
return obs => s => r => OE.liftOperator<E, [A, S], [B, S]>(f)(obs(s)(r))
}

/**
* @category Bifunctor
* @since 0.6.10
Expand Down
17 changes: 16 additions & 1 deletion test/ObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
import { bufferTime } from 'rxjs/operators'
import * as O from 'fp-ts/lib/Option'
import * as _ from '../src/ObservableEither'
import { of as rxOf, Observable, throwError as rxThrowError } from 'rxjs'
import { of as rxOf, Observable, from, throwError as rxThrowError } from 'rxjs'
import { filter } from '../src/Observable'

describe('ObservableEither', () => {
it('rightIO', async () => {
Expand Down Expand Up @@ -125,6 +126,20 @@ describe('ObservableEither', () => {
assert.deepStrictEqual(e, [E.left(1)])
})

it('liftOperator (left)', async () => {
const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')])
})

it('liftOperator (right)', async () => {
const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [E.right(2), E.right(4)])
})

describe('Monad', () => {
it('of', async () => {
const fea = _.of(1)
Expand Down
27 changes: 26 additions & 1 deletion test/ObservableThese.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'
import { bufferTime } from 'rxjs/operators'
import { monoidString } from 'fp-ts/lib/Monoid'
import * as _ from '../src/ObservableThese'
import { of as rxOf, Observable } from 'rxjs'
import { of as rxOf, Observable, from } from 'rxjs'
import { filter } from '../src/Observable'

describe('ObservableThese', () => {
it('rightIO', async () => {
Expand Down Expand Up @@ -93,6 +94,30 @@ describe('ObservableThese', () => {
assert.deepStrictEqual(e, [TH.both(2, 1)])
})

it('liftOperator (left)', async () => {
const e = await pipe(from(['error1', 'error2']), _.leftObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [TH.left('error1'), TH.left('error2')])
})

it('liftOperator (right)', async () => {
const e = await pipe(from([1, 2, 3, 4]), _.rightObservable, _.liftOperator(filter(x => x % 2 === 0)))
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [TH.right(2), TH.right(4)])
})

it('liftOperator (both)', async () => {
const e = await pipe(
from([TH.both('error1', 1), TH.both('error2', 2), TH.both('error3', 3), TH.both('error4', 4)]),
_.liftOperator(filter(x => x % 2 === 0))
)
.pipe(bufferTime(10))
.toPromise()
assert.deepStrictEqual(e, [TH.both('error2', 2), TH.both('error4', 4)])
})

it('map', async () => {
const f = (n: number): number => n * 2
assert.deepStrictEqual(await pipe(_.right(1), _.map(f)).toPromise(), TH.right(2))
Expand Down
25 changes: 25 additions & 0 deletions test/ReaderObservableEither.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as O from 'fp-ts/lib/Option'
import * as E from 'fp-ts/lib/Either'
import * as R from 'fp-ts/lib/Reader'
import * as T from 'fp-ts/lib/Task'
import { from } from 'rxjs'

// test helper to dry up LOC.
export const buffer = flow(R.map(bufferTime(10)), R.map(OB.toTask))
Expand Down Expand Up @@ -100,6 +101,30 @@ describe('ReaderObservable', () => {
assert.deepStrictEqual(e, [E.right(4)])
})

it('liftOperator (left)', async () => {
const robe = pipe(
from(['error1', 'error2']),
OBE.leftObservable,
_.fromObservableEither,
_.liftOperator(OB.filter(x => x % 2 === 0)),
buffer
)
const e = await robe({})()
assert.deepStrictEqual(e, [E.left('error1'), E.left('error2')])
})

it('liftOperator (right)', async () => {
const robe = pipe(
from([1, 2, 3, 4]),
OBE.rightObservable,
_.fromObservableEither,
_.liftOperator(OB.filter(x => x % 2 === 0)),
buffer
)
const e = await robe({})()
assert.deepStrictEqual(e, [E.right(2), E.right(4)])
})

it('fromTask', async () => {
const robe = pipe(_.fromTask(T.of(1)), buffer)
const x = await robe({})()
Expand Down
Loading