Skip to content

Feature: Added reduceXXX() operators.#153

Open
SgtSwagrid wants to merge 32 commits intoraquo:masterfrom
SgtSwagrid:feature-reduce
Open

Feature: Added reduceXXX() operators.#153
SgtSwagrid wants to merge 32 commits intoraquo:masterfrom
SgtSwagrid:feature-reduce

Conversation

@SgtSwagrid
Copy link
Contributor

@SgtSwagrid SgtSwagrid commented Mar 9, 2026

The goal here is to strengthen Airstream's scanLeft operator (and friends). The semantics should be crystal clear, robust, unsurprising, and well-documented; in general and especially in relation to error handling and stream restarts. Common variants that the user will likely expect should be made available, and the behaviour and interface should be consistent across all of them. The operators should also be flexible enough to handle different use-cases.

This addition was largely motivated by #147, where a desire to include new operators built on top of scanLeft showed that the existing semantics could be a bit inflexible and unclear. That change has now been deferred until this one can be finalised.

Change 1: Refactored Signal and EventSteam so as to move existing reduction operators into traits.

Created new traits ScanLeftStreamOps and ScanLeftOps alongside the already-existing ScanLeftSignalOps. ScanLeftOps is the new common supertype for the two other types. Some repeated logic could be avoided this way.

Moved the following existing functions into the new base traits:

  • scanLeft
  • scanLeftRecover
  • foldLeft (deprecated)
  • foldLeftRecover (deprecated)

All implementations (of the above and the below) are ultimately defined in terms of scanLeftRecover for streams and scanLeftGeneratedRecover for signals, which remain the only operators here whose implementation lies directly in Signal and EventStream.

Additionally, the existing scanLeft and scanLeftRecover on Signal were renamed to scanLeftGenerated and scanLeftGeneratedRecover respectively (names subject to change).

Change 2: Added new reduceLeftXXX() operators for Signal and EventStream.

See here for a brief exposition on nomenclature for the new reduce operators.

See here for a discussion of on what types these operators should be defined and what types they should return.

Operator Defined for Returns Description
reduceLeft both same as parent* similar to scanLeft but without a seed value
reduceLeftOption EventStream only** Signal has value None before first event
reduceLeftDefault EventStream only** Signal has default value before first event
reduceLeftRecover Signal only*** Signal uses Try to recover from errors
scanLeft both Signal new variant on Signal has seed value instead of generator
scanLeftRecover both Signal new variant on Signal has seed value instead of generator

* Streams can't reduce to signals because the value is undefined before the first event.
** Not meaningful for signals as there is never a missing value.
*** Technically possible for streams but implementation proved to be annoying.

Change 3: Added new boolean flags resetOnStop and skipErrors to the above functions.

See here for the discussion which motivated the inclusion of these flags.

These flags are available in all of the above functions, with the exception of XXXRecover() where ignoreErrors doesn't make sense because error handling is done by the caller in these cases.

  • resetOnStop: Whether to reset the accumulator when the observable is restarted due to all subscribers leaving. Defaults to false for consistency with existing behaviour.
  • skipErrors: Whether to ignore errors, simply omitting them from the accumulation. Defaults to false for consistency with existing behaviour, where a single error will render the stream inoperable.

Change 4: Added library of basic "recovery strategies" describing ways to handle errors in reductions.

There are three kinds of error that a recovery strategy must consider:

  • Upstream errors: Those originating from the parent or another ancestor.
  • Seed errors: Those originating from computation of the initial seed value.
  • Combine errors: Those originating from the combine function.

Added three example strategies to scan/Recover.scala to demonstrate:

  • Recover.keepErrors: Keeps all errors, same as legacy behaviour.
  • Reover.skipErrors: Ignores all errors except seed errors. Used by new ignoreErrors flag.
  • Recover.skipUpstreamErrors: Ignores upstream errors but fails on combine errors. Currently unused.

Change 5: Added unit tests to cover the new functionality.

Tests cover both Signal and EventStream, all variants of reduce and scan, and all combinations of the new resetOnStop and ignoreErrors flags. Reorganised the existing scan-related tests. All new tests and existing scan-related tests pass, but there are 5 failures which are assumed to be unrelated.

@SgtSwagrid SgtSwagrid changed the title Feature: Added reduce() operator. Feature: Added reduce() operator and new flags resetOnStop and skipErrors. Mar 10, 2026
@SgtSwagrid SgtSwagrid changed the title Feature: Added reduce() operator and new flags resetOnStop and skipErrors. Feature: Added reduceXXX() operators and new flags resetOnStop and skipErrors. Mar 10, 2026
@SgtSwagrid SgtSwagrid marked this pull request as ready for review March 10, 2026 14:33
@SgtSwagrid SgtSwagrid requested a review from raquo as a code owner March 10, 2026 14:33
Clarify error handling for `scanLeft` operator and provide alternatives for error recovery.
Copy link
Owner

@raquo raquo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's a good start.

Please see requested changes in code comments.

Also keep in mind my comments about scaladoc in the other PR. TBH I didn't read any of the scaladoc in this PR, it's just too excessive.

I haven't properly read the tests yet due to their sheer volume, but that +2000 test diff is what's concerning me. Do we really need all those tests? At first glance some of them seem to be repetitive / redundant (again, I didn't actually read much of them, so it's a genuine question).

Like, reduceLeftDefault is a method with a two-line implementation. Does it really need what looks like 100 lines of tests? What are we testing in those tests, that isn't tested in the tests for reduceLeftOption? And what are we testing in reduceLeftOption tests that isn't tested in scanLeft. From a cursory glance, the two lines in reduceLeftDefault are pretty simple, they don't justify a 50:1 ratio of tests LOC to code LOC. Tests are not free. They take time to review, update (when desired behaviour changes), run, etc.

Copy link
Owner

@raquo raquo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, now that the overloads were cleaned up, I could see that some of the remaining methods are probably not needed. Please see attached code comments.

@raquo
Copy link
Owner

raquo commented Mar 12, 2026

I'm cautiously onboard with stream.reduceLeft returning a stateful stream, as you have it implemented now. Although in general I would rather avoid stateful streams, I think we can roll with it this once, and see how it pans out in practice.

In terms of naming, this presents a conundrum: traditional observable name for reduceLeft would be scan, however I don't feel great about stream.scan returning a stream while stream.scanLeft returns a signal. Well, maybe that's the lesser evil, I dunno. We can rename later.

@SgtSwagrid SgtSwagrid requested a review from raquo March 12, 2026 14:58
Copy link
Owner

@raquo raquo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heyo, see new code comments below. Since it turned out that implementing skipErrors requires knowledge of core internals, I've implemented it in a separate commit myself to help out, please integrate that into your changes.

@SgtSwagrid SgtSwagrid changed the base branch from master to scanleft-resumeOnError March 15, 2026 12:18
@SgtSwagrid SgtSwagrid changed the title Feature: Added reduceXXX() operators and new flags resetOnStop and skipErrors. Feature: Added reduceXXX() operators ~and new flags resetOnStop and skipErrors~. Mar 15, 2026
@SgtSwagrid SgtSwagrid changed the title Feature: Added reduceXXX() operators ~and new flags resetOnStop and skipErrors~. Feature: Added reduceXXX() operators. Mar 15, 2026
@SgtSwagrid SgtSwagrid requested a review from raquo March 15, 2026 12:24
Copy link
Owner

@raquo raquo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minor stuff left – see comments.

Aside from that, there should be no outstanding issues, right? Seems fine to me.

Don't worry about the docs in readme, I'll update those later.

Comment on lines +82 to +84
case (Success(current), Success(next)) => Try(combine(current, next))
case (Failure(error), _) => Failure(error)
case (_, Failure(error)) => Failure(error)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try(combine(current.get, next.get)) is shorter and more efficient.

*/
* Concrete classes like MapSignal can extend this trait, but more general types like
* Signal[A] need to remain covariant in A for good ergonomics.
*/
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Please run scalafmtAll, it will clear this diff and more.

sourceVar.signal
.map(identity)
.scanLeft(_.id) { (acc, foo) => acc + foo.id }
.scanLeftGenerated((foo: Foo) => foo.id) { (acc, foo) => acc + foo.id }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've added type ascription here and elsewhere when using scanLeftGenerated. Did type inference break, to need the : Foo type ascription? It should be just _.id.

effects.clear()
}

it("EventStream.scanLeft(resumeOnErrors = true)") {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a merge conflict:

  • I've added new tests to this file in my commit
  • You've added tests to this file AND moved it into the scan package.

As a result, now we have two versions of this file. The tests that were in this file BEFORE me and you added our new tests are duplicated in both this file and your copy of it. For example the test "ScanLeftSignal made with Signal.scanLeft" is currently in both of these files.

To resolve the conflict, please remove (from your file!) the original tests that are duplicated in this file. To keep the diff readable, please do not change / move / rename this file, I'll move it myself after merging the PR eventually. Only change your new test files as needed to remove the duplication.

To be clear, I'm talking about removing actual duplicate tests, like "ScanLeftSignal made with EventStream.scanLeft", of which we have two copies now. I'm not sure if your new tests overlap with my new tests, if that's the case, I can live with it. Let's just get rid of the obvious duplicates.

@raquo raquo changed the base branch from scanleft-resumeOnError to master March 15, 2026 23:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants