Skip to content

Review and suggestions #4

@akarnokd

Description

@akarnokd

Although I don't see any use in my work for such multi-observables currently, the idea about having higher arity dataflows made me interested (scientifically) in this library. Here, I'd like to write some points I see about such library.

Approach

First, Instead of creating Bi- and Tri- Observables by hand, you can explore the possibility of code templating and static code generation, similar how many of the primitive-collections libraries (such as Trove or fastutil) do it.

Second, I suggest adopting the reactive-streams idioms instead of the RxJava 1.x idions. I don't see too much direct code reuse and since you'll need wrappers and converters anyway, you have the opportunity to avoid a few less-than-optimal decisions the RxJava API has made.

Primitives

I'd start a new type hierarchy based on the reactive-streams idioms:

interface {Arity}Publisher<Ti, ...> {
    void subscribe({Arity}Subscriber<Ti, ...> s);
}
interface SubscriberBase {
    void onStart(Subscription s);
    void onError(Throwable e);
    void onCompleted();
}
interface {Arity}Subscriber<Ti, ...> extends SubscriberBase {
    void onNext(T1 t1, ...);
}
interface Subscription {  // or import reactive-streams directly
    void request(long n);
    void cancel();
}
interface {Arity}Processor<Ti, ..., Ri, ...> extends
    {Arity}Subscriber<Ti,...>, {Arity}Publisher<Ri, ...> {
} 

Queues

Since most operators require some Spsc queue to operate and they are designed to transmit one element at a time, you'll need to extend the logic and API of the classical Queues.

interface {Arity}Queue {
    boolean offer(Ti, ...);
    boolean poll(Action{k}<Ti, ...> out);  // true if element was available
    boolean peek(Action{k}<Ti, ...> out);
    boolean isEmpty();
    int size();
}

Note that indeed, using callbacks poses some overhead, but I think it is less than allocating some tuples for the classical queues.

In addition, the ring-buffer based Spsc queue implementation needs some slight changes regarding the element store: you can use the current indexing logic, but now that translates to k-times indexing into the underlying array (the power-of-2 remains), roughly:

Object[] array = new Object[powerOf2Capacity * 3];

void offer(T1 t1, T2 t2, T3 t3) {
    int offset = ((int)(producerIndex) & mask) * 3;
    array[offset] = t1;
    array[offset + 1] = t2;
    array[offset + 2].lazySet(t3); // memory order: release
    producerIndex++;
}
boolean poll(Action3<T1, T2, T3> out) {
    int offset = ((int)(consumerIndex) & mask) * 3;
    Object o3 = array[offset + 2]; // memory order: acquire
    if (o1 == null { return false; }
    Object o2 = array[offset + 1]; // read in opposite order
    Object o1 = array[offset];
    array[offset] = null;
    array[offset + 1] = null;
    array[offset + 2].lazySet(null);
    consumerIndex++;
    out.call((T1)o1, (T2)o2, (T3)o3);
    return true;
}

Callbacks on the stream

Since Java functions can't return more than one result, one can return a tuple for each 'column' and split the operators into many, or use the same trick as with queues and have an output callback:

public final BiObservable<R1, R2> map(Action3<? super T1, ? super T2, Action2<R1, R2>> mapper) {
    return lift(s -> {
        return new AbstractBiSubscriber<T1, T2>(s) {
            @Override
            public void onNext(T1 t1, T2 t2) {
                mapper.call(t1, t2, s::onNext);
            }
            // ...
        });
    });
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions