Skip to content

Commit 0c9f22a

Browse files
authored
[core] Add Channel.takeWith for inline take+map (#1475)
1 parent 306168a commit 0c9f22a

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

kyo-core/shared/src/main/scala/kyo/Channel.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,24 @@ object Channel:
162162
* The taken element
163163
*/
164164
def take(using Frame): A < (Abort[Closed] & Async) =
165+
takeWith(identity)
166+
167+
/** Takes an element from the channel and applies an inline function, avoiding a `.map` closure allocation.
168+
*
169+
* @return
170+
* The result of applying the function to the taken element
171+
*/
172+
inline def takeWith[B, S](inline f: A => B < S)(using Frame): B < (S & Abort[Closed] & Async) =
165173
Sync.Unsafe.defer {
166174
self.poll().foldError(
167175
{
168-
case Present(value) => value
169-
case Absent => self.takeFiber().safe.get
176+
case Present(value) => f(value)
177+
case Absent => self.takeFiber().safe.use(f)
170178
},
171179
Abort.error
172180
)
173181
}
174-
end take
182+
end takeWith
175183

176184
/** Takes [[n]] elements from the channel, semantically blocking until enough elements are present. Note that if enough elements are
177185
* not added to the channel it can block indefinitely.

kyo-core/shared/src/test/scala/kyo/ChannelTest.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,33 @@ class ChannelTest extends Test:
122122
v <- f.get
123123
yield assert(!d1 && v == 1)
124124
}
125+
"takeWith" - {
126+
"applies function to taken value" in run {
127+
for
128+
c <- Channel.init[Int](2)
129+
_ <- c.put(1)
130+
v <- c.takeWith(_ * 10)
131+
yield assert(v == 10)
132+
}
133+
"blocks when empty then applies function" in run {
134+
for
135+
c <- Channel.init[Int](2)
136+
f <- Fiber.initUnscoped(c.takeWith(_ + 5))
137+
_ <- Async.sleep(10.millis)
138+
d1 <- f.done
139+
_ <- c.put(3)
140+
_ <- untilTrue(f.done)
141+
v <- f.get
142+
yield assert(!d1 && v == 8)
143+
}
144+
"fails on closed channel" in run {
145+
for
146+
c <- Channel.init[Int](2)
147+
_ <- c.close
148+
r <- Abort.run[Closed](c.takeWith(_ * 2))
149+
yield assert(r.isFailure)
150+
}
151+
}
125152
"putBatch" - {
126153
"non-nested" - {
127154
"should put a batch" in run {

0 commit comments

Comments
 (0)