Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@

# Scala Steward: Reformat with scalafmt 3.10.1
e20c58d38034869ac908a00d87dbb975f2cac2cc

# Scala Steward: Reformat with scalafmt 3.10.2
097d85055d194076e79b37dac018093ab252efe0
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.10.1
version = 3.10.2

maxColumn = 120

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ class CounterSpec extends AsyncFunSuite with ActorSuite with Matchers {
val expect = (n: Int) =>
for {
a <- probe.expect[Int]
} yield for {
a <- a
} yield a.msg shouldEqual n
} yield
for {
a <- a
} yield a.msg shouldEqual n
for {
a <- expect(1)
_ <- inc
Expand Down
18 changes: 10 additions & 8 deletions actor/src/main/scala/com/evolutiongaming/akkaeffect/Ask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,21 @@ object Ask {
for {
a <- af(msg)
b <- self(a, timeout, sender)
} yield for {
b <- b
b <- bf(b)
} yield b
} yield
for {
b <- b
b <- bf(b)
} yield b
}

def narrow[A1 <: A, B1](f: B => F[B1])(implicit F: FlatMap[F]): Ask[F, A1, B1] = { (msg, timeout, sender) =>
for {
b <- self(msg, timeout, sender)
} yield for {
b <- b
b <- f(b)
} yield b
} yield
for {
b <- b
b <- f(b)
} yield b
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ class ActorVarTest extends AsyncFunSuite with Matchers {
actorVar.receive { state0 =>
for {
_ <- actions.add(Action.Updated(state0, state))
} yield for {
state <- state
} yield {
val release = actions.add(Action.Released(state))
Releasable(state, release.some)
}
} yield
for {
state <- state
} yield {
val release = actions.add(Action.Released(state))
Releasable(state, release.some)
}
}
}
_ <- GenSpawn[F].cede
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,26 @@ class ClusterShardingLocalTest extends AsyncFunSuite with ActorSuite with Matche
)

actorEffect = ActorEffect.fromActor[IO](actorRef)
} yield for {
a <- probe.expect[Int]
b <- actorEffect.ask(ShardedMsg("id", 0), 1.second)
a <- a
_ <- IO(a.msg shouldEqual 0)
_ <- IO(a.from.tell(a.msg.toString, ActorRef.noSender))
b <- b
_ <- IO(b shouldEqual "0")
a <- probe.expect[HandOffStopMsg.type]
_ <- clusterShardingLocal.rebalance
a <- a
_ <- IO(a.msg shouldEqual HandOffStopMsg)
r <- clusterShardingLocal.clusterSharding.regions
_ <- IO(r shouldEqual Set(typeName))
s <- clusterShardingLocal.clusterSharding.shards(r.head)
_ <- IO(s shouldEqual Set(ShardState("1", Set.empty)))
r <- clusterShardingLocal.clusterSharding.shardRegion(typeName)
_ <- IO(r shouldEqual actorRef)
} yield {}
} yield
for {
a <- probe.expect[Int]
b <- actorEffect.ask(ShardedMsg("id", 0), 1.second)
a <- a
_ <- IO(a.msg shouldEqual 0)
_ <- IO(a.from.tell(a.msg.toString, ActorRef.noSender))
b <- b
_ <- IO(b shouldEqual "0")
a <- probe.expect[HandOffStopMsg.type]
_ <- clusterShardingLocal.rebalance
a <- a
_ <- IO(a.msg shouldEqual HandOffStopMsg)
r <- clusterShardingLocal.clusterSharding.regions
_ <- IO(r shouldEqual Set(typeName))
s <- clusterShardingLocal.clusterSharding.shards(r.head)
_ <- IO(s shouldEqual Set(ShardState("1", Set.empty)))
r <- clusterShardingLocal.clusterSharding.shardRegion(typeName)
_ <- IO(r shouldEqual actorRef)
} yield {}

result
.use(identity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ class ClusterShardingTest extends AsyncFunSuite with ActorSuite with Matchers {
new LeastShardAllocationStrategy(1, 1),
HandOffStopMessage,
)
} yield for {
a <- probe.expect[Unit]
_ <- IO(shardRegion.tell((), probe.actorEffect.toUnsafe))
a <- a
} yield a.msg
} yield
for {
a <- probe.expect[Unit]
_ <- IO(shardRegion.tell((), probe.actorEffect.toUnsafe))
a <- a
} yield a.msg
result
.use(identity)
.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,11 @@ object Engine {
d <- Deferred[F, Either[Throwable, A]]
f <- loadOf(load, d).start
_ <- offer(f)
} yield for {
a <- d.get
a <- a.liftTo[F]
} yield a
} yield
for {
a <- d.get
a <- a.liftTo[F]
} yield a
}
}
engine <- fenced(engine)
Expand Down Expand Up @@ -312,10 +313,11 @@ object Engine {
fv <- load.start // fork `load` stage to allow multiple independent executions
fu = execute(fv.joinWithNever, d)
_ <- queue(Key.validate.some)(fu)
} yield for {
e <- d.get
a <- e.liftTo[F]
} yield a
} yield
for {
e <- d.get
a <- e.liftTo[F]
} yield a

/** Execute `load` with respect to:
* 1. failure on `load` or `validate` will be propagated to user 2. stopped Engine will not persist any events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ object Journaller {
for {
d <- MeasureDuration[F].start
r <- self.deleteTo(seqNr)
} yield for {
r <- r
d <- d
_ <- log.info(s"delete events to $seqNr in ${d.toMillis}ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.info(s"delete events to $seqNr in ${d.toMillis}ms")
} yield r
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ object Append {
for {
d <- MeasureDuration[F].start
r <- self(events)
} yield for {
r <- r
d <- d
_ <- log.debug(s"append ${events.size} events in ${d.toMillis}ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.debug(s"append ${events.size} events in ${d.toMillis}ms")
} yield r

def withFail(fail: Fail[F])(implicit F: MonadThrowable[F]): Append[F, A] = { events =>
fail.adapt(s"failed to append $events")(self(events))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ object DeleteEventsTo {
for {
d <- MeasureDuration[F].start
r <- self(seqNr)
} yield for {
r <- r
d <- d
_ <- log.info(s"delete events to $seqNr in ${d.toMillis}ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.info(s"delete events to $seqNr in ${d.toMillis}ms")
} yield r
}

def withFail(fail: Fail[F])(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,31 +99,34 @@ object Snapshotter {
for {
d <- MeasureDuration[F].start
r <- self.save(seqNr, snapshot)
} yield for {
r <- r
d <- d
_ <- log.info(s"save snapshot at $seqNr in ${d.toMillis}ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.info(s"save snapshot at $seqNr in ${d.toMillis}ms")
} yield r

def delete(seqNr: SeqNr) =
for {
d <- MeasureDuration[F].start
r <- self.delete(seqNr)
} yield for {
r <- r
d <- d
_ <- log.info(s"delete snapshot at $seqNr in ${d.toMillis}ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.info(s"delete snapshot at $seqNr in ${d.toMillis}ms")
} yield r

def delete(criteria: SnapshotSelectionCriteria) =
for {
d <- MeasureDuration[F].start
r <- self.delete(criteria)
} yield for {
r <- r
d <- d
_ <- log.info(s"delete snapshots for $criteria in ${d.toMillis}ms")
} yield r
} yield
for {
r <- r
d <- d
_ <- log.info(s"delete snapshots for $criteria in ${d.toMillis}ms")
} yield r
}

def withFail(fail: Fail[F])(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,22 @@ object InstrumentEventSourced {
_ <- record(Action.AppendEvents(events))
seqNr <- journaller.append(events)
_ <- record(Action.AppendEventsOuter)
} yield for {
seqNr <- seqNr
_ <- record(Action.AppendEventsInner(seqNr))
} yield seqNr
} yield
for {
seqNr <- seqNr
_ <- record(Action.AppendEventsInner(seqNr))
} yield seqNr

def deleteTo = (seqNr: SeqNr) =>
for {
_ <- record(Action.DeleteEventsTo(seqNr))
a <- journaller.deleteTo(seqNr)
_ <- record(Action.DeleteEventsToOuter)
} yield for {
a <- a
_ <- record(Action.DeleteEventsToInner)
} yield a
} yield
for {
a <- a
_ <- record(Action.DeleteEventsToInner)
} yield a
}

val snapshotter1 = new Instrument with Snapshotter[F, S] {
Expand All @@ -86,30 +88,33 @@ object InstrumentEventSourced {
_ <- record(Action.SaveSnapshot(seqNr, snapshot))
a <- snapshotter.save(seqNr, snapshot)
_ <- record(Action.SaveSnapshotOuter)
} yield for {
a <- a
_ <- record(Action.SaveSnapshotInner)
} yield a
} yield
for {
a <- a
_ <- record(Action.SaveSnapshotInner)
} yield a

def delete(seqNr: SeqNr) =
for {
_ <- record(Action.DeleteSnapshot(seqNr))
a <- snapshotter.delete(seqNr)
_ <- record(Action.DeleteSnapshotOuter)
} yield for {
a <- a
_ <- record(Action.DeleteSnapshotInner)
} yield a
} yield
for {
a <- a
_ <- record(Action.DeleteSnapshotInner)
} yield a

def delete(criteria: SnapshotSelectionCriteria) =
for {
_ <- record(Action.DeleteSnapshots(criteria))
a <- snapshotter.delete(criteria)
_ <- record(Action.DeleteSnapshotsOuter)
} yield for {
a <- a
_ <- record(Action.DeleteSnapshotsInner)
} yield a
} yield
for {
a <- a
_ <- record(Action.DeleteSnapshotsInner)
} yield a
}

for {
Expand Down