|
| 1 | +import { DifferenceStreamWriter } from "./graph.js" |
| 2 | +import type { |
| 3 | + BinaryOperator, |
| 4 | + DifferenceStreamReader, |
| 5 | + UnaryOperator, |
| 6 | +} from "./graph.js" |
| 7 | +import type { MultiSet, MultiSetArray } from "./multiset.js" |
| 8 | +import type { ID2, IStreamBuilder, PipedOperator } from "./types.js" |
| 9 | + |
| 10 | +export class D2 implements ID2 { |
| 11 | + #streams: Array<DifferenceStreamReader<any>> = [] |
| 12 | + #operators: Array<UnaryOperator<any> | BinaryOperator<any>> = [] |
| 13 | + #nextOperatorId = 0 |
| 14 | + #finalized = false |
| 15 | + |
| 16 | + constructor() {} |
| 17 | + |
| 18 | + #checkNotFinalized(): void { |
| 19 | + if (this.#finalized) { |
| 20 | + throw new Error(`Graph already finalized`) |
| 21 | + } |
| 22 | + } |
| 23 | + |
| 24 | + getNextOperatorId(): number { |
| 25 | + this.#checkNotFinalized() |
| 26 | + return this.#nextOperatorId++ |
| 27 | + } |
| 28 | + |
| 29 | + newInput<T>(): RootStreamBuilder<T> { |
| 30 | + this.#checkNotFinalized() |
| 31 | + const writer = new DifferenceStreamWriter<T>() |
| 32 | + // Use the root stream builder that exposes the sendData and sendFrontier methods |
| 33 | + const streamBuilder = new RootStreamBuilder<T>(this, writer) |
| 34 | + this.#streams.push(streamBuilder.connectReader()) |
| 35 | + return streamBuilder |
| 36 | + } |
| 37 | + |
| 38 | + addOperator(operator: UnaryOperator<any> | BinaryOperator<any>): void { |
| 39 | + this.#checkNotFinalized() |
| 40 | + this.#operators.push(operator) |
| 41 | + } |
| 42 | + |
| 43 | + addStream(stream: DifferenceStreamReader<any>): void { |
| 44 | + this.#checkNotFinalized() |
| 45 | + this.#streams.push(stream) |
| 46 | + } |
| 47 | + |
| 48 | + finalize() { |
| 49 | + this.#checkNotFinalized() |
| 50 | + this.#finalized = true |
| 51 | + } |
| 52 | + |
| 53 | + step(): void { |
| 54 | + if (!this.#finalized) { |
| 55 | + throw new Error(`Graph not finalized`) |
| 56 | + } |
| 57 | + for (const op of this.#operators) { |
| 58 | + op.run() |
| 59 | + } |
| 60 | + } |
| 61 | + |
| 62 | + pendingWork(): boolean { |
| 63 | + return this.#operators.some((op) => op.hasPendingWork()) |
| 64 | + } |
| 65 | + |
| 66 | + run(): void { |
| 67 | + while (this.pendingWork()) { |
| 68 | + this.step() |
| 69 | + } |
| 70 | + } |
| 71 | +} |
| 72 | + |
| 73 | +export class StreamBuilder<T> implements IStreamBuilder<T> { |
| 74 | + #graph: ID2 |
| 75 | + #writer: DifferenceStreamWriter<T> |
| 76 | + |
| 77 | + constructor(graph: ID2, writer: DifferenceStreamWriter<T>) { |
| 78 | + this.#graph = graph |
| 79 | + this.#writer = writer |
| 80 | + } |
| 81 | + |
| 82 | + connectReader(): DifferenceStreamReader<T> { |
| 83 | + return this.#writer.newReader() |
| 84 | + } |
| 85 | + |
| 86 | + get writer(): DifferenceStreamWriter<T> { |
| 87 | + return this.#writer |
| 88 | + } |
| 89 | + |
| 90 | + get graph(): ID2 { |
| 91 | + return this.#graph |
| 92 | + } |
| 93 | + |
| 94 | + // Don't judge, this is the only way to type this function. |
| 95 | + // rxjs has very similar code to type its pipe function |
| 96 | + // https://github.com/ReactiveX/rxjs/blob/master/packages/rxjs/src/internal/util/pipe.ts |
| 97 | + // We go to 20 operators deep, because surly that's enough for anyone... |
| 98 | + // A user can always split the pipe into multiple pipes to get around this. |
| 99 | + pipe<O>(o1: PipedOperator<T, O>): IStreamBuilder<O> |
| 100 | + // prettier-ignore |
| 101 | + pipe<T2, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, O>): IStreamBuilder<O> |
| 102 | + // prettier-ignore |
| 103 | + pipe<T2, T3, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, O>): IStreamBuilder<O> |
| 104 | + // prettier-ignore |
| 105 | + pipe<T2, T3, T4, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, O>): IStreamBuilder<O> |
| 106 | + // prettier-ignore |
| 107 | + pipe<T2, T3, T4, T5, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, O>): IStreamBuilder<O> |
| 108 | + // prettier-ignore |
| 109 | + pipe<T2, T3, T4, T5, T6, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, O>): IStreamBuilder<O> |
| 110 | + // prettier-ignore |
| 111 | + pipe<T2, T3, T4, T5, T6, T7, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, O>): IStreamBuilder<O> |
| 112 | + // prettier-ignore |
| 113 | + pipe<T2, T3, T4, T5, T6, T7, T8, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, O>): IStreamBuilder<O> |
| 114 | + // prettier-ignore |
| 115 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, O>): IStreamBuilder<O> |
| 116 | + // prettier-ignore |
| 117 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, O>): IStreamBuilder<O> |
| 118 | + // prettier-ignore |
| 119 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, O>): IStreamBuilder<O> |
| 120 | + // prettier-ignore |
| 121 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, O>): IStreamBuilder<O> |
| 122 | + // prettier-ignore |
| 123 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, O>): IStreamBuilder<O> |
| 124 | + // prettier-ignore |
| 125 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, O>): IStreamBuilder<O> |
| 126 | + // prettier-ignore |
| 127 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, O>): IStreamBuilder<O> |
| 128 | + // prettier-ignore |
| 129 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, O>): IStreamBuilder<O> |
| 130 | + // prettier-ignore |
| 131 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, O>): IStreamBuilder<O> |
| 132 | + // prettier-ignore |
| 133 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, T18>, o18: PipedOperator<T18, O>): IStreamBuilder<O> |
| 134 | + // prettier-ignore |
| 135 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, T18>, o18: PipedOperator<T18, T19>, o19: PipedOperator<T19, O>): IStreamBuilder<O> |
| 136 | + // prettier-ignore |
| 137 | + pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, T18>, o18: PipedOperator<T18, T19>, o19: PipedOperator<T19, T20>, o20: PipedOperator<T20, O>): IStreamBuilder<O> |
| 138 | + |
| 139 | + pipe(...operators: Array<PipedOperator<any, any>>): IStreamBuilder<any> { |
| 140 | + return operators.reduce((stream, operator) => { |
| 141 | + return operator(stream) |
| 142 | + }, this as IStreamBuilder<any>) |
| 143 | + } |
| 144 | +} |
| 145 | + |
| 146 | +export class RootStreamBuilder<T> extends StreamBuilder<T> { |
| 147 | + sendData(collection: MultiSet<T> | MultiSetArray<T>): void { |
| 148 | + this.writer.sendData(collection) |
| 149 | + } |
| 150 | +} |
0 commit comments