Skip to content

Question about intended observable api. #278

@Nerdinacan

Description

@Nerdinacan

Thanks again for great work here. I notice some other commenters are using this to analyze covid19. I too am using it on software that is being leveraged against that. Great minds thinking alike, and all that.

I have another question regarding observable support. This is a little complex, so I've tried to break the issue down as carefully as I can.

  • It may be that you did not intend to support the feature I am about to describe.
  • It may be that you do support this feature, and I'm just not using it properly.
  • Or... It may just be a bug.

I'll let you decide. :)

Current Behavior

You say your library makes usage of observables transparent, and yet if I...

  1. create an observable in the main thread
  2. then try to emit 2 values from the main thread into the worker
  3. inside the worker your exposed function is going to get executed 2 times and create 2 separate subscriptions to single elements of the source data

Why is this a problem?

As it currently stands, your exposed function inside the worker cannot be the thing that does processing on a continuous stream of input values because the state of the subscription is destroyed each time the exposed function runs, and a new observable is created. As a result, operators like distinct(), scan(), etc.... can't function properly inside the worker since they have no history.

Every exposed function call is necessarily a subscription on a source stream of only one value.

It is true that the worker can create an observable that emits multiple values back to the main thread, but only if that observable creates all its source values from inside the worker before its single execution ends. Thus your api is "transparent" from the worker to the main thread, but it is not "transparent" from the main thread INTO the worker.

My goal was to have your library "transparently" do some heavy processing inside the worker and return the results as an observable result from my source stream. But currently I can't do that if I need a distinct(), or a scan() or any other stateful observable operator.

Workaround

I think I can support the behavior I want by creating a unique subscription ID in the master thread and passing it in as part of the payload to your internally exposed function, or maybe materializing the source stream over the pipe and manually reading subscription/cancel events.... then creating a receiver inside the worker that keys an internal subject to a subscription ID I sent in and a stored instance of the observable that I want to run.

But all that sounds almost exactly like what your code must already be doing, right? That's what led me to think this might actually be a bug.

Suggested strategy

Here is what my dream API would look like. I will have to write the adapter functions I've described unless you already support this behavior, and I am just missing the point.

// master.js
import { spawn, Thread, Worker } from "threads"
const thread = await spawn(new Worker("./workers/counter"))
const { nameOfInternalFunction } = thread;

const proxyObservableToWorker = exposedFn => pipe(
    mergeMap(sourceVal => {
           // create some kind of subscription id
          // subscribe to the observable result of calling the exposed function
          return exposedFn({ key, ...sourceVal }) // or something
    })
)

const someSourceObservable = of(1,2,3,) // or fromEvent, or whatever your master does

const result = someSourceObservable.pipe(
    proxyObservableToWorker(nameOfInternalFunction), // I'll currently have to create this proxy operator
    doThings()
);


// worker.js
import { expose } from "threads/worker"

const proxyIncomingValue = operator => rawValue => {
   // take a subscription key,
   // make a pool of subjects
   // connect new subject to a stored instance of the operator we want to execute against multiple rawValues
}

const internalObservableOperator = pipe(
    theOperations(),
    iWantToHappen(),
    insideTheWorker()
)

expose({
  nameOfInternalFunction: proxyIncomingValue(internalObservableOperator)
})

Am I just recreating something you're already doing?

Thanks for reading this far, and sorry for the wall of text, but it's kind of a subtle issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions