Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@types/node": "7.0.4",
"docs-ts": "^0.3.4",
"fp-ts": "^2.4.3",
"fp-ts-laws": "^0.2.1",
"import-path-rewrite": "github:gcanti/import-path-rewrite",
"jest": "^24.8.0",
"mocha": "^5.2.0",
Expand Down
47 changes: 30 additions & 17 deletions src/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
/**
* @since 0.6.0
*/
import { Alternative1 } from 'fp-ts/lib/Alternative'
import * as E from 'fp-ts/lib/Either'
import { Filterable1 } from 'fp-ts/lib/Filterable'
import { identity, Predicate } from 'fp-ts/lib/function'
import { Monad1 } from 'fp-ts/lib/Monad'
import { Monoid } from 'fp-ts/lib/Monoid'
import * as O from 'fp-ts/lib/Option'
import { pipe, pipeable } from 'fp-ts/lib/pipeable'
import { combineLatest, EMPTY, merge, Observable, of as rxOf, defer } from 'rxjs'
import { map as rxMap, mergeMap } from 'rxjs/operators'
import { pipeable } from 'fp-ts/lib/pipeable'
import { combineLatest, EMPTY, merge, Observable, of as rxOf, defer, Subscriber, Operator } from 'rxjs'
import { map as rxMap, switchMap, filter as rxFilter } from 'rxjs/operators'
import { IO } from 'fp-ts/lib/IO'
import { Task } from 'fp-ts/lib/Task'
import { MonadObservable1 } from './MonadObservable'
import { Alt1 } from 'fp-ts/lib/Alt'
import { Plus1 } from './Plus'

declare module 'fp-ts/lib/HKT' {
interface URItoKind<A> {
Expand Down Expand Up @@ -79,12 +80,12 @@ export function toTask<A>(o: Observable<A>): Task<A> {
/**
* @since 0.6.0
*/
export const observable: Monad1<URI> & Alternative1<URI> & Filterable1<URI> & MonadObservable1<URI> = {
export const observable: Monad1<URI> & Alt1<URI> & Plus1<URI> & Filterable1<URI> & MonadObservable1<URI> = {
URI,
map: (fa, f) => fa.pipe(rxMap(f)),
of,
ap: (fab, fa) => combineLatest([fab, fa]).pipe(rxMap(([f, a]) => f(a))),
chain: (fa, f) => fa.pipe(mergeMap(f)),
chain: (fa, f) => fa.pipe(switchMap(f)),
Copy link
Collaborator

@mlegenhausen mlegenhausen Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretical we could replace switchMap with exhaustMap and would also get the memory problem solved and hold all laws right?

In practice you are totally right that switchMap should be the default chain. Should we add for completeness alternative Monad instances like mergeMonad and exhaustMonad?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK exhaustMap waits for passed Observable to complete before subscribing to the new created one (result of chain). Looks like an incorrect behavior and I can't think of a use case for that.

Replacing switchMap with mergeMap is a matter of const m = {...observable, chain: (fa, f) => fa.pipe(mergeMap(f))} - is it really necessary to put it into the lib? What about other possible operator replacements? IMO switchMap covers 99% usecases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK exhaustMap waits for passed Observable to complete before subscribing to the new created one (result of chain). Looks like an incorrect behavior and I can't think of a use case for that.

Just a theorical question sure for the use cases we are looking at it does different things.

Replacing switchMap with mergeMap is a matter of const m = {...observable, chain: (fa, f) => fa.pipe(mergeMap(f))} - is it really necessary to put it into the lib? What about other possible operator replacements? IMO switchMap covers 99% usecases.

For the ReaderObservable this approach would not be practical and it would follow the implementation style of fp-ts like done for taskSeq. But of cause it could be part of another PR. Totally agree with the fact that switchMap covers most of the usecases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow the implementation style of fp-ts like done for taskSeq

Do you mean that there should be additional mergeReaderObservable and the same for any other instance using Observable as base?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats what I would like to discuss to keep up with the fp-ts coding style. @gcanti any opinions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's tricky :) There's no way to test .toNotBe on TestScheduler's expectObservable, so we need to pass assert.notDeepStrictEqual as equality assertion function. Then we intentionally test that resulting observable does not equal marbles, because they are "incorrect" but should be the same as in success' RIGHT SIDE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waaaaaaaait... Let me recheck o_O

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right, there's an error in the test. How could I miss it?... Anyway I updated the test and it still shows distributivity does not hold

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a proposal: for what I can undestand switching to switchMap is important to you, while a discussion about Alt / Alternative instances, albeit being very interesting from a theoretical point of view, is way less important in practice (it's just merge in the end), and currently is a blocker.

So what about:

  • replace mergeMap with switchMap (breaking change)
  • optimize filterMap implementation
  • remove Alt / Alternative instances from Observable, ObservableEither and ReaderObservable (it's a breaking change but we are already doing a breaking change for switchMap)
  • release v0.7
  • (maybe) continue the debate about Alternative in the future with less rush

@raveclassic @mlegenhausen what do you think?

Copy link
Contributor Author

@raveclassic raveclassic Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gcanti We can keep zero as part of Plus typeclass, it doesn't require alt to be distributive. So that Observable is a Plus & Alt, but not an Alternative. This is not a breaking change, everything stays structurally equal.
But I'm open to any decision.

zero: () => EMPTY,
alt: (fx, f) => merge(fx, f()),
compact: fa => observable.filterMap(fa, identity),
Expand All @@ -94,17 +95,8 @@ export const observable: Monad1<URI> & Alternative1<URI> & Filterable1<URI> & Mo
right: observable.filterMap(fa, a => O.fromEither(f(a)))
}),
partition: <A>(fa: Observable<A>, p: Predicate<A>) => observable.partitionMap(fa, E.fromPredicate(p, identity)),
filterMap: <A, B>(fa: Observable<A>, f: (a: A) => O.Option<B>) =>
fa.pipe(
mergeMap(a =>
pipe(
f(a),
// tslint:disable-next-line: deprecation
O.fold<B, Observable<B>>(() => EMPTY, of)
)
)
),
filter: <A>(fa: Observable<A>, p: Predicate<A>) => observable.filterMap(fa, O.fromPredicate(p)),
filterMap: (fa, f) => fa.lift(new FilterMapOperator(f)),
filter: <A>(fa: Observable<A>, p: Predicate<A>) => fa.pipe(rxFilter(p)),
fromIO,
fromTask,
fromObservable: identity
Expand Down Expand Up @@ -185,3 +177,24 @@ export {
*/
separate
}

class FilterMapOperator<A, B> implements Operator<A, B> {
constructor(private readonly f: (a: A) => O.Option<B>) {}

call(subscriber: Subscriber<B>, source: Observable<A>) {
return source.subscribe(new FilterMapSubscriber(subscriber, this.f))
}
}

class FilterMapSubscriber<A, B> extends Subscriber<A> {
constructor(destination: Subscriber<B>, private readonly f: (a: A) => O.Option<B>) {
super(destination)
}

protected _next(value: A): void {
const b = this.f(value)
if (O.isSome(b) && this.destination && this.destination.next) {
this.destination.next(b.value)
}
}
}
20 changes: 20 additions & 0 deletions src/Plus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Kind, URIS } from 'fp-ts/lib/HKT'
import { Alt1 } from 'fp-ts/lib/Alt'

/**
* The `Plus` type class extends the `Alt` type class with a value that should be the left and right identity for `alt`.
*
* It is similar to `Monoid`, except that it applies to types of kind `* -> *`, like `Array` or `Option`, rather than
* concrete types like `string` or `number`.
*
* `Plus` instances should satisfy the following laws:
*
* 1. Left identity: `A.alt(zero, fa) == fa`
* 2. Right identity: `A.alt(fa, zero) == fa`
* 3. Annihilation: `A.map(zero, f) == zero`
*
* @since 0.7.0
*/
export interface Plus1<F extends URIS> extends Alt1<F> {
readonly zero: <A>() => Kind<F, A>
}
158 changes: 155 additions & 3 deletions test/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,166 @@
import * as assert from 'assert'
import { from } from 'rxjs'
import { bufferTime } from 'rxjs/operators'
import { from, Observable } from 'rxjs'
import { bufferTime, subscribeOn } from 'rxjs/operators'
import * as O from 'fp-ts/lib/Option'
import * as E from 'fp-ts/lib/Either'
import * as T from 'fp-ts/lib/Task'
import { identity } from 'fp-ts/lib/function'
import { constFalse, constTrue, identity } from 'fp-ts/lib/function'
import * as laws from 'fp-ts-laws'

import { observable as R } from '../src'
import { TestScheduler } from 'rxjs/testing'
import { Eq, eqNumber, eqString } from 'fp-ts/lib/Eq'
import { getEq } from 'fp-ts/lib/Array'

const liftE = <A>(E: Eq<A>): Eq<Observable<A>> => {
const arrayE = getEq(E)
return {
equals: (x, y) => {
const scheduler = new TestScheduler(assert.deepStrictEqual)
const xas: Array<A> = []
x.pipe(subscribeOn(scheduler)).subscribe(a => xas.push(a))
const yas: Array<A> = []
y.pipe(subscribeOn(scheduler)).subscribe(a => yas.push(a))
scheduler.flush()
assert.deepStrictEqual(xas, yas)
return arrayE.equals(xas, yas)
}
}
}

describe('Observable', () => {
describe('laws', () => {
const f = (n: number): string => `map(${n})`
const a = R.observable.of(1)
const b = R.observable.of(2)
const c = R.observable.of(3)
it('Monad', () => {
laws.monad(R.observable)(liftE)
})
describe('Alt', () => {
it('associativity', () => {
const left = R.observable.alt(
R.observable.alt(a, () => b),
() => c
)
const right = R.observable.alt(a, () => R.observable.alt(b, () => c))
assert.ok(liftE(eqNumber).equals(left, right))
})
it('distributivity', () => {
const left = R.observable.map(
R.observable.alt(a, () => b),
f
)
const right = R.observable.alt(R.observable.map(a, f), () => R.observable.map(b, f))
assert.ok(liftE(eqString).equals(left, right))
})
})
describe('Plus', () => {
it('right identity', () => {
const left = R.observable.alt(a, () => R.observable.zero())
assert.ok(liftE(eqNumber).equals(left, a))
})
it('left identity', () => {
const left = R.observable.alt(R.observable.zero<number>(), () => a)
assert.ok(liftE(eqNumber).equals(left, a))
})
it('annihilation', () => {
const left = R.observable.map(R.observable.zero<number>(), f)
const right = R.observable.zero<string>()
assert.ok(liftE(eqString).equals(left, right))
})
})
describe('Observable is not an Alternative', () => {
describe('no distributivity', () => {
const a = 1
const f = (n: number) => n + 1
const g = (n: number) => n / 2
const result = { b: f(a), c: g(a) }
const success = {
fa: ' ------------a----------------|',
fab: ' ---f-------------------------|',
gac: ' ----------------g------------|',

'LEFT SIDE': '',
'alt(fab, gac)': ' ---f------------g------------|',
'ap(alt(fab, gac), fa)': ' ------------b---c------------|',

'RIGHT SIDE': '',
'ap(fab, fa)': ' ------------b----------------|',
'ap(gac, fa)': ' ----------------c------------|',
'alt(ap(fab, fa), ap(gac, fa))': '------------b---c------------|'
}
const failure = {
fa: ' ------------a----------------|',
fab: ' ---f-------------------------|',
gac: ' --------g--------------------|',

'LEFT SIDE': '',
'alt(fab, gac)': ' ---f----g--------------------|',
'ap(alt(fab, gac), fa)': ' ------------c----------------|',

'RIGHT SIDE': '',
'ap(fab, fa)': ' ------------b----------------|',
'ap(gac, fa)': ' ----------------c------------|',
'alt(ap(fab, fa), ap(gac, fa))': '------------b---c------------|'
}
it('left sides are not equal but they should be', () => {
assert.notDeepStrictEqual(success['ap(alt(fab, gac), fa)'], failure['ap(alt(fab, gac), fa)'])
})
it('right sides should be equal', () => {
assert.deepStrictEqual(success['alt(ap(fab, fa), ap(gac, fa))'], failure['alt(ap(fab, fa), ap(gac, fa))'])
})
it('success', () => {
new TestScheduler(assert.deepStrictEqual).run(({ cold, expectObservable }) => {
const fa = cold(success.fa, { a })
const fab = cold(success.fab, { f })
const gac = cold(success.gac, { g })
const left = R.observable.ap(
R.observable.alt(fab, () => gac),
fa
)
const right = R.observable.alt(R.observable.ap(fab, fa), () => R.observable.ap(gac, fa))
expectObservable(left).toBe(success['ap(alt(fab, gac), fa)'], result)
expectObservable(right).toBe(success['alt(ap(fab, fa), ap(gac, fa))'], result)
})
})
it('failure', () => {
// use assert.notDeepStrictEqual as assert
new TestScheduler(assert.notDeepStrictEqual).run(({ cold, expectObservable }) => {
const fa = cold(failure.fa, { a })
const fab = cold(failure.fab, { f })
const gac = cold(failure.gac, { g })
const left = R.observable.ap(
R.observable.alt(fab, () => gac),
fa
)
const right = R.observable.alt(R.observable.ap(fab, fa), () => R.observable.ap(gac, fa))
expectObservable(left).toBe(success['ap(alt(fab, gac), fa)'], result)
expectObservable(right).toBe(success['alt(ap(fab, fa), ap(gac, fa))'], result)
})
})
})
})
describe('Filterable', () => {
const p = (n: number) => n > 0
const q = (n: number) => n < 10
const v = from([-5, 5, 15])
it('distributivity', () => {
const left = R.observable.filter(v, x => p(x) && q(x))
const right = R.observable.filter(R.observable.filter(v, p), q)
assert.ok(liftE(eqNumber).equals(left, right))
})
it('identity', () => {
const left = R.observable.filter(v, constTrue)
assert.ok(liftE(eqNumber).equals(left, v))
})
it('annihilation', () => {
const left = R.observable.filter(from([1, 2, 3]), constFalse)
const right = R.observable.filter(from([-1,-2,-3]), constFalse)
assert.ok(liftE(eqNumber).equals(left, right))
})
})
})
it('of', () => {
const fa = R.observable.of(1)
return fa
Expand Down