Skip to content

Feature: Added extra scanning operators to EventStream.#147

Draft
SgtSwagrid wants to merge 5 commits intoraquo:masterfrom
SgtSwagrid:feature-sliding
Draft

Feature: Added extra scanning operators to EventStream.#147
SgtSwagrid wants to merge 5 commits intoraquo:masterfrom
SgtSwagrid:feature-sliding

Conversation

@SgtSwagrid
Copy link
Contributor

@SgtSwagrid SgtSwagrid commented Feb 24, 2026

Note: Deferred until after #153, which aims to strengthen the underlying scanLeft.

Change 1: Refactor of EventStream

Refactored EventStream to move operations related to scanLeft into a new trait ScanLeftEventOps, in a similar manner as was already done for Signal in the form of ScanLeftSignalOps. In particular, the following methods were relocated:

The motivation for the above was a desire to add some additional operators related to scanning without cluttering EventStream.

Change 2: Addition of scanning operators

Added the following operators to EventStream via the trait ScanLeftEventOps:

Many of the above are directly based on or inspired by equivalents from the Scala standard library (e.g. reduceLeft, zipWithIndex, sliding, grouped). The semantics of each is probably exactly as you'd expect, and is well documented in each case.

The main character here is sliding. This is something I've needed/used many times in my own projects, and I'd be very surprised if others haven't had the same experience. It just seems like a sensible thing to have. The remainder of the functions here are mostly either variants of sliding, or used in the implementation of sliding.

Example Usage

Consider the case where you need to produce a stream of a mouse movement deltas (e.g. for aiming in a first-person game or panning in a map). Unfortunately, MouseEvent (triggered by mouse movement, among other things) will only tell you the current cursor position. Given an EventStream[MouseEvent], we can use pairs to instead obtain a stream of movements:

case class Dir(dx: Double, dy: Double)

val mouseMovements: EventStream[MouseEvent] = ???

val deltas: EventStream[Dir] = mouseMovements.pairs.map((e1, e2) => Dir(e2.clientX - e1.clientX, e2.clientY - e1.clientY))

Testing

Manual Testing

All code was manually inspected and tested to rule out obvious logic errors. The possibility of more subtle errors is considered unlikely but not entirely ruled out.

Automated Tests

Additionally, ScanLeftSpec was created and filled with automated tests that cover the new functions. All of these new tests pass when using sbt +test. There are some test failures in three-level from-future flatMapSwitch and resetOnStop=true + drop(1), which appear to be unrelated.

Linting

The code was reformatted with sbt scalafmtAll, which also updated some unrelated files.

LLM Usage

All code was written by me by hand, with the exception of ScanLeftSpec which was written entirely by Claude and manually inspected.

@SgtSwagrid SgtSwagrid requested a review from raquo as a code owner February 24, 2026 19:56
@raquo
Copy link
Owner

raquo commented Feb 24, 2026

Thanks for the PR!

Just to set the expectations straight out – the upcoming V18 release is full, unfortunately I don't have the bandwidth to add more stuff to it (there's more to it than already-made code). With that it mind, some comments:

some test failures in three-level from-future flatMapSwitch

These specific ones are intermittent and should happen very rarely, I wouldn't worry about it. I probably got the test setup wrong.

Refactored EventStream to move operations related to scanLeft into a new trait ScanLeftEventOps, in a similar manner as was already done for Signal in the form of ScanLeftSignalOps

The signal version of this exists for a purpose – it allows StrictSignal to return a StrictSignal (and not just Signal) from scanLeft operators. EventStream has no such need, so I'm not sure if extracting these into a similar trait makes sense. Maybe it does, but that's a question for quite a bit later in V19. I'm still unsure of this new *Ops architecture that I just made for V18. I think I might need to try getting rid of the BaseObservable type (replacing Self type param with a type member) to move forward on it, but I haven't even started on that yet.

Operators in the PR:

drop: I'm wondering if instead of returning this if numEvents <= 0 it should maybe throw, but either way, I don't want to make this change in V18, too many changes already. Need to review how we handle such cases in other ops.

reduceLeft, sliding and pair: yeah I did want to add these to Airstream eventually.

Other operators – not sure. We can't feasibly copy over the entire collections API onto streams, so we need to be selective in where we apply our efforts, mostly for the reasons below.

There are three main issues with the design and implementation of these operators (and any operators really):

  • Designing, accounting for, testing, and documenting what happens when the stream stops and later re-starts. For example, the drop operator has a resetOnStop parameter that determines this. I had to design, implement it and decide that the default should be false to fit with prevailing Airstream semantics.
    • In contrast, your index-related operators for example don't offer this configuration, and don't document their behaviour in this case.
  • Similar concerns with errors. Any observable can emit an error in its error channel, instead of emitting an event. What happens then? scanLeft emits an error and gets permanently broken – it will never emit anything else afterwards. That's why there's scanLeftRecover for cases when you expect errors upstream.
    • But then, the operators you provided are more specialized, so they need a different design approach. For example, the count operator should be able to count events, even if there are errors. Should it also count errors? Maybe there should be a config for that, or more likely, it should probably just say that it doesn't count errors, but it probably shouldn't fail if there was a single error detected.
    • Or, zipWithIndex – this operator works only on events, so IMO it should not be affected by errors on the error channel.
  • Many of these operators should have alternatives for Signal as well, just as scanLeft is also defined on Signal. And again, those need to be designed to match typical Signals behaviour w.r.t errors / onstart-onstop / etc.

So as you see it's unfortunately a lot more complicated to add well-behaved operators to Airstream than to just use helpers locally – it's quite possible that your code doesn't care much about starts / stops and errors because it doesn't use either of these, but anything that's in Airstream needs to carefully account for that – when it doesn't, it inevitably becomes a bug report.

I've just finished my last sprint for V18 and I still need to write tons of documentation for everything in that release; I won't have the time budget to spend much time on design / review of new operators any time soon, so right now, this is the extent of the feedback I can provide, I'm sorry that it's not more positive, it's just a non-trivial problem to design, and I'm very low on resources for that. I think some of these operators can make it to V19 but they would need a significant amount of work and testing and it would be likely months before I can dedicate the time to design and review it properly.

The silver lining is that you don't need my blessing to use these operators, you've implemented them all "in userland" so you can simply add them to your Airstream observables as extension methods. If you have bigger ideas for more Airstream operators you could even publish a library with all of them, if you're ok with the issues I mentioned about error handling etc. (it's perfectly fine to be ok with those issues – I sometimes make quick ad-hoc operators for my own app code as well, when I can't afford the time to implement them more carefully in Airstream core).

@SgtSwagrid
Copy link
Contributor Author

SgtSwagrid commented Feb 25, 2026

Thanks for taking the time to look over my bag of goodies, I appreciate it and your feedback is very helpful!
Long story short, I've got some extra time on my hands, and I'm eager to carry this through to the finish line if I can.
I've laid out a few questions, ideas, and suggestions below. Since you're pressed for time, don't feel obligued to respond quickly, especially since I wrote a lot! I just wanted to put all my thoughts down on the table.

the upcoming V18 release is full, unfortunately I don't have the bandwidth to add more stuff to it

That's absolutely fine, I don't care, and I'm in no rush here, so V19 or later is perfect.

I'm still unsure of this new *Ops architecture

Don't let me tell you want to do, but I've had some success with 'Ops'-ifying things in the past. I think it's a good direction; even where it's not necessary for creating versions with different return types, it still makes it clear which 'helper' operations rely on which underlying functionality. (Since you also want Signal versions of these ops, it may even make sense to introduce a common 'ScanLeftOps' from which the Signal and Event versions both inherit, if we find that the semantics are indeed the same for some of the ops.)

I think I might need to try getting rid of the BaseObservable type (replacing Self type param with a type member) to move forward on it

I can't see why that change would be necessary. Have you tried writing e.g. +Self[+B] <: EventStream[B] instead of +Self[+_] <: EventStream[_] to help with the type inference? Forgive me if I just don't understand the problem fully, on account of being new here.

drop: I'm wondering if instead of returning this if numEvents <= 0 it should maybe throw

For numEvents < 0, up to you of course (although, drop and take are traditionally rather tolerant of out-of-bound numbers, for instance when dropping or taking more elements than exist in a collection). For numEvents == 0 however, one would have to protest; drop(0) should definitely be a well-defined no-op (I assume this is what you had in mind, anyway). For now, this was just intended as a small optimisation so the caller doesn't have to think about avoiding the extra indirection. However, I'm happy to revert it if you'd rather decouple this.

reduceLeft, sliding and pair: yeah I did want to add these to Airstream eventually.

Nice! I thought reduceLeft would be one of the more controversial additions, beacuse the interpretation seems a little 'awkward' IMO.

For sliding and pairs, there are some alternatives to consider, so I'll just note them here:

  • For sliding, we have the option of simplifying it if we were to remove the step parameter, but I don't think that would help much:
    • Pros of removing step: it probably wouldn't be used that often.
    • Cons of removing step: having it there mirrors the standard library, and without it grouped would need a separate, standalone implementation.
  • Instead of pairs, we could have a family sliding2, sliding3, ... auto generated in a similar manner to many other functions in Airstream.

Other operators – not sure. We can't feasibly copy over the entire collections API onto streams, so we need to be selective in where we apply our efforts

Remaining are count, zipWithIndex, filterIndex, stride, and grouped.

In my eyes, the most unclear operation here would be stride - it seems perhaps a little bit contrived. It's only there because of sliding, but we could almost as easily just use filterIndex directly or just set stride to private.

I would like to include grouped if at all possible. It matches well with sliding and the sematics are fairly clear.

The remaining three (count, zipWithIndex, filterIndex) are thematically linked so I would say take all 3 or none. If you're unsure, we can also just make them private for the time being. As far as I can tell, there's not yet any concept of event indices in Airstream. I can't say whether this speaks for or against such an inclusion, though.

  • Designing, accounting for, testing, and documenting what happens when the stream stops and later re-starts.

I had briefly considered this issue. However, I quickly brushed it aside as I assumed the semantics of the underlying scanLeft, whatever they were, were probably somewhat reasonable (and this was also undocumented, at least in the code itself). You're right to assume that I've never had to be concerned with restarting streams in my own code. But yeah, you're right, this is important to consider.

So either the counts reset to 0 after a restart or they stay where they are. Would it sound reasonable to you if we were to go with the latter, since this seems to match other behaviour in Airstream? I'd be happy to test for and document this.

  • Similar concerns with errors.

Now this is something that I hadn't thought about at all. Your suggestion sounds reasonable to me: errors shouldn't be counted but they shouldn't block everything either. Again, I'll test and document this. We could also consider adding a matching countErrors to count only the errors. Interestingly, scanLeft is marked as "guarded against exceptions" while scanLeftRecover says "Must not throw!". This seems like the reverse of what your comments would suggest - is this a typo or am I misunderstanding something?

  • Many of these operators should have alternatives for Signal as well

I'd be happy to do this if/where you think it would be helpful to have. Although, many of the ops don't seem to make sense for signals at all. In fact, I would argue that all of these don't make semantic sense for a signal, unless we think of a signal as a kind of "pseudo-event stream" over its updates, rather than strictly as a time-varying value. I guess there is some precedent for thinking of a signal in this way in Airstream, though.

Other Considerations

Something that you didn't mention was overflow. While it seems unlikely to me that someone would ever exceed 9,223,372,036,854,775,807 events on the client, the implementation given technically isn't robust against overflow. Up to you whether that's worth worrying about. (Even at 1,000,000 events per second, it would still take about 300,000 years to reach this many events).

raquo added a commit that referenced this pull request Feb 26, 2026
No change in behaviour. Borrowed from #147
@raquo
Copy link
Owner

raquo commented Feb 26, 2026

introduce a common 'ScanLeftOps' from which the Signal and Event versions both inherit, if we find that the semantics are indeed the same for some of the ops.)

Unfortunately the signal version of scanLeft needs a different initial value signature (it's a function), so that operator itself can't be shared. But other operators like count can be shared.

I can't see why that change would be necessary. Have you tried writing e.g. +Self[+B] <: EventStream[B] instead of +Self[+] <: EventStream[] to help with the type inference? Forgive me if I just don't understand the problem fully, on account of being new here.

I don't remember why exactly I had to implement BaseObservable like this, but it had something to do with being unable to write a more specific Self type, similar to what you mentioned, that would compile. But I didn't know (enough) about type members back then, so I was blind to the possibility of using them. I have since used type members in Laminar and I think they have a chance of working better for this. But that doesn't really affect your operators aside from simple packaging.

numEvents < 0

Makes sense, protest approved. Matching collections lib is actually a good heuristic here, and the 0 case optimization is nice. Actually, since this doesn't change the API or behaviour, and is only a performance optimization, I'll go ahead and steal this one line into V18.

I thought reduceLeft would be one of the more controversial additions, beacuse the interpretation seems a little 'awkward' IMO.

At first pass the logic seems reasonable, I'm only unsure about the naming. I try to keep naming pattern sort of consistent with the collections lib, so I originally had scanLeft named foldLeft, because it's kinda doing the same thing to streams as foldLeft does to lists, but people were confused, and I ended up naming it scanLeft to match ReactiveX naming. For "reduce" logic, I think the scan operator is probably the closest match (whereas RX reduce works differently, relying on the observable completion feature, which Airstream lacks).

Instead of pairs, we could have a family sliding2, sliding3, ... auto generated in a similar manner to many other functions in Airstream.

Hmm, I think a single pairs operator makes more sense, at least to start with. Wanting to compare with previous value is the most common need, people can still use scanLeft manually for other use cases. For naming, I would suggest "pairwise" to match RX, as I don't see the pairs name in the Scala collections lib.

remove the step parameter?

Let's keep the optional step arg to match the collections lib

In my eyes, the most unclear operation here would be stride - it seems perhaps a little bit contrived. It's only there because of sliding

Aside from implementation details, it does not seem that there is a compelling use-case for the stride operator, right? In that case I would rather skip it.

I would like to include grouped if at all possible. It matches well with sliding and the sematics are fairly clear.

I'm ok to include the grouped functionality as the warmup argument to sliding that you already have. I'm not sure if we should also expose it under the grouped name, this name is present in scala collections but it does not seem to be a common operator name in observable libraries, e.g. in RX it's called slidingBuffer instead, and they use "groupBy" naming for what we call "split".

The remaining three (count, zipWithIndex, filterIndex) are thematically linked so I would say take all 3 or none. If you're unsure, we can also just make them private for the time being. As far as I can tell, there's not yet any concept of event indices in Airstream. I can't say whether this speaks for or against such an inclusion, though.

Currently we only directly expose event indices in *withStatus operators, in the Resolved type. But we do have operators that work with indices internally: DropStream and TakeStream. I would err on the side of none unless there's a compelling use case for them for end users (not just implementation details).

While it seems unlikely to me that someone would ever exceed 9,223,372,036,854,775,807 events on the client, the implementation given technically isn't robust against overflow

Nah, those are crazy numbers. We don't even need a Long, an Int would do fine. We use Int-s for event indices elsewhere. There's zero chance anyone would actually emit 2 billion events in a single observable, let alone want to read the index of all those events. If Airstream was used on the backend I would review this assumption, but as we're working with client side code, Long is way overkill for this. The downside of Long-s is that it's awkward to define them for values that are typically very small, and they are also not very efficient on Scala.js. Currently Airstream does not use Long-s anywhere except in one interface where it's required for interop with Java's FlowPublisher.

So either the counts reset to 0 after a restart or they stay where they are. Would it sound reasonable to you if we were to go with the latter, since this seems to match other behaviour in Airstream?

If we have to choose then the prevailing Airstream semantic is to not to reset, so yes, we should follow that. Although it would be good to provide a resetOnStop config parameter, but I guess that could potentially be added later.

Error handling

I thought about this for a bit and I think one robust way to solve this is to add two parameters to the existing scanLeft operator:

  • resetOnStop (defaults to false to match current behaviour)
  • ignoreErrors (default to false to match current behaviour)

I'm not sure where to add them – in the first argument list or the second. Probably the first. It'll be kinda ugly anyway, if the user actually specified those params, but... seems unavoidable.

if resetOnStop is true, then:

  • When scanLeft is starting after being stopped, it should evaluate the initial value again and update itself
  • (otherwise, behave as-is)

if ignoreErrors is true, then:

  • When scanLeft sees an error in the parent observable, it should ignore / skip that value instead of failing as it does now. Subsequent events should resume normal scanLeft accumulating logic.
  • Except...
    • Per Airstream contract, Signals can't skip their initial value, they must always have a value
    • Thus, if the error is encountered when evaluating scanLeft's initial value, we need special handling
    • Because we're unable to fulfill the ignoreErrors = true intent in this case, this is not a nominal situation, so we should probably set scanLeft value to this error, but wrapped in Airstream's InitialValueError.
    • I'm not yet 100% sure what we should do after this error happens in the initial value.
    • I think, if parent observable emits another (non-error) event comes after this error, we probably want to try and evaluate scanLeft's initial value from that event, trying to recover into normal operation. That seems consistent with the ignoreErrors = true intent.

This is for the scanLeft public API, but scanLeft is implemented in terms of scanLeftRecover, which should also have the resetOnStop argument, but the public scanLeftRecover method should probably should not expose a ignoreErrors argument, since scanLeftRecover already has built-in logic to handle upstream errors (the Failure cases). So maybe both scanLeft and scanLeftRecover should just call into the ScanLefSignal class.

And scanLeftRecover brings me to...

scanLeftRecover says "Must not throw!". This seems like the reverse of what your comments would suggest - is this a typo or am I misunderstanding something?

Not a typo. When user provides an A => B, we wrap it into Try: upstreamTry.map(A => B) – this code can't throw regardless of what's in A => B. But if user provides the entire Try[A] => Try[B], it's assumed that the user has handled the errors, and we don't do such wrapping. And so, Airstream typically does not allow any Try[A] => Try[B] functions to throw.

I wouldn't say that there is a good justification for this, I did this as a speculative performance tradeoff, but honestly I don't know if it's even a good tradeoff, especially now years later. I would be open to reviewing this decision throughout Airstream with a careful implementation that is the lowest runtime overhead possible (e.g. ideally don't want to create stray instances of Try on every event, so probably a try-catch based wrapper, unless it notably affects performance in the typical no-catch case), but that's a separate question for which the solution would be an all-encompassing upgrade throughout Airstream.

As far as the operators in this PR are concerned, we can keep using the same assumption that the user code will not to throw if provided as a Try[A] => Try[B] function.

Signal versions of these operators

On a high level, we think of Signals as holders of state, and streams as pipelines transforming events, but the line in the sand can get blurry sometimes, and the implementation details, such as Signals having memory of their last value, or streams having async operators, sometimes dictate their use and integration with each other.

I'm trying not to withhold operators from Signals unless they truly don't make sense. I would say that operators that boil down to a variation of scanLeft, which itself is a variation of map + <keeping track of internal state>, should generally make ok sense on any Observable and should be available on signals as well, whereas stuff that has filter semantics or async logic should generally be left for streams only.

reduceLeft aka scan should definitely have a Signal version that returns a Signal, for example. Pretty sure about sliding and pairwise as well. Whereas filterWithIndex – no, because it's a filter.

<aside>

Why no filter on Signals: because signals always need to have a value, so we can't let you filter out the signal's initial value.

Why async: because it's essentially a filter, and also because pulling from pull-based signals (which we're increasingly adding support for) doesn't really work the way you'd expect with async delays.

"But what if I want filter (or async) ops on a signal and I'm ok with not filter-ing (or async-ing) the signal's initial value:" then you can use signal.composeChanges(_.filter(...)) to explicitly apply those stream operators to the signal except for its initial value.

</aside>

Misc

I ran scalafmtAll on master to reduce unrelated diff

@raquo
Copy link
Owner

raquo commented Feb 26, 2026

Returning signals vs streams

One more issue, since we went quite deep already.

The scanLeft operator has the semantics of accumulating values into a state. Accumulating state is Signal domain, and so scanLeft returns a Signal even if it's called on a stream.

But, all the operators that are implemented in terms of scanLeft are fundamentally doing the same thing, so, we need to seriously consider if they should return signals rather than streams, even when called on streams.

Take reduceLeft aka scan – it's basically a slightly restricted version of scanLeft that uses the same type for the accumulator as for the events. Its close resemblance to scanLeft means it should probably return a signal too.

The other ones, sliding and pairwise, seem less obvious. Instead of accumulating custom internal state, they simply remember and reproduce recently emitted events.

If you look at the operators available on streams, those of them that have any kind of internal state and can be implemented as signals (i.e. don't have filtering or async logic), are all implemented as signals. The only exceptions are:

  • take, whose internal state is the event index
    • Although technically it could be implemented as a signal, it's conceptually filter-like
  • distinct*, whose internal state is the previous event

In general, stream operators that return streams don't tend to track any significant state, and that's for a good reason – streams aren't well designed to manage state, signals are.

So with pairwise for example, I'm not sure if the output type should be a stream or a signal. On one hand, it's implemented in terms of scanLeft, but on the other hand, it's quite close to distinct in terms of what kind of state it stores, and the minimal processing it applies to incoming values.

So can we reasonably make stream.pairwise return a stream, like the distinct operator? Well, if a stream.distinct stream re-starts after being stopped, it remembers the last value it's seen, and can execute its logic just as well – when the next event comes in, it will emit said event only if it's different from the last event it's seen. Seems to work in practice.

I'm guessing stream.pairwise would work similarly ok as a stream. And I'm guessing we could extend this same logic to sliding – they are very similar conceptually, it would be strange if they behaved differently. But reduceLeft aka scan – this has room for custom state logic much like scanLeft, and I think this one should probably return a signal.

@SgtSwagrid
Copy link
Contributor Author

SgtSwagrid commented Mar 9, 2026

A quick note on nomenclature:

The reduction-style operators of the Scala stdlib differ along two dimensions:

  • with vs without an initial value
  • keeping only the final value vs keeping every intermediate value
Final Value All Values
With Initial fold scan
Without Initial reduce -

In our case, whether or not this should count as keeping all values or only the final one is open to interpretation. I suppose your use of scan should indicate that you've made up your mind here.

If Airstream's fold weren't renamed to scan, then reduce would be the clear choice for the version suggested here. But as it stands, we're in the bottom-right corner, with no stdlib analogue and so no obvious choice.

Some options include:

  • Variants of reduce: reduce / reduceRolling / reduceScan / reducePrefix / reduceIncremental
  • Variants of scan: scanPartial / scanAssociative / scanWithoutSeed / scanSeedless / scanFromFirst / scanReduce
  • Something else entirely: merge, accumulate, aggregate, cascade

I still feel like reduce is the least bad option, but you may feel differently.

@SgtSwagrid
Copy link
Contributor Author

SgtSwagrid commented Mar 10, 2026

Except...

* Per Airstream contract, Signals can't skip their initial value, they must always have a value

* Thus, if the error is encountered when evaluating scanLeft's initial value, we need special handling

* Because we're unable to fulfill the `ignoreErrors = true` intent in this case, this is not a nominal situation, so we should probably set scanLeft value to this error, but wrapped in Airstream's `InitialValueError`.

* I'm not yet 100% sure what we should do after this error happens in the initial value.

* I think, if parent observable emits another (non-error) event comes after this error, we probably want to try and evaluate scanLeft's initial value from that event, trying to recover into normal operation. That seems consistent with the `ignoreErrors = true` intent.

If e.g. an EventStream[A] is scanned to a Signal[B] for some A and B, then the only instance of type B that is provided is that initial value. To make more values of type B with the merging function, you need at least one B to start with. Without that value, you're stuck, with no hope of recovery. You only have As. You may have some luck if the initial value is generated lazily and it only sometimes fails - one can simply try again - but is that worth it?

I would suggest that the initial value should evaluate eagerly just throw the error up the current call stack. Otherwise the only other option is to make the fold fail forever (until restart).

@SgtSwagrid
Copy link
Contributor Author

I thought about this for a bit and I think one robust way to solve this is to add two parameters to the existing scanLeft operator:

* resetOnStop (defaults to false to match current behaviour)

* ignoreErrors (default to false to match current behaviour)

This has been implemented in #153.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants