Skip to content

Commit 03d2ff7

Browse files
qwwdfsadelizarov
authored andcommitted
Rename DelayChannel to ticker
Make factory methods consistent Make ticker hibernation aware, call nanoTime only once per produced item Add optional coroutine context Change initialDelay default value to delay
1 parent 1dbc25e commit 03d2ff7

File tree

9 files changed

+234
-161
lines changed

9 files changed

+234
-161
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,15 @@ public final class kotlinx/coroutines/experimental/channels/SubscriptionReceiveC
848848
public static fun close (Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;)V
849849
}
850850

851+
public final class kotlinx/coroutines/experimental/channels/TickerChannelsKt {
852+
public static final fun adjustingTicker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
853+
public static synthetic fun adjustingTicker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
854+
public static final fun fixedTicker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
855+
public static synthetic fun fixedTicker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
856+
public static final fun ticker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;Z)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
857+
public static synthetic fun ticker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;ZILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
858+
}
859+
851860
public final class kotlinx/coroutines/experimental/intrinsics/CancellableKt {
852861
public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
853862
public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt

Lines changed: 0 additions & 72 deletions
This file was deleted.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package kotlinx.coroutines.experimental.channels
2+
3+
import kotlinx.coroutines.experimental.*
4+
import kotlinx.coroutines.experimental.timeunit.*
5+
import kotlin.coroutines.experimental.*
6+
7+
8+
/**
9+
* Creates rendezvous channel which produces the first item after the given initial delay and subsequent items with the
10+
* given delay between them. Backpressure is guaranteed by [RendezvousChannel].
11+
*
12+
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
13+
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
14+
*
15+
* @param initialDelay delay after which the first item will be produced
16+
* @param delay delay between each element
17+
* @param unit unit of time that applies to [initialDelay] and [delay]
18+
* @param coroutineContext context of the producing coroutine
19+
* @param fixedPeriod whether producer should use fixed delay or should attempt to adjust delay when consumer cannot keep up
20+
*/
21+
public fun ticker(
22+
delay: Long,
23+
unit: TimeUnit = TimeUnit.MILLISECONDS,
24+
initialDelay: Long = delay,
25+
coroutineContext: CoroutineContext = Unconfined,
26+
fixedPeriod: Boolean = false
27+
): ReceiveChannel<Unit> {
28+
return if (fixedPeriod) fixedTicker(delay, unit, initialDelay, coroutineContext)
29+
else adjustingTicker(delay, unit, initialDelay, coroutineContext)
30+
}
31+
32+
/**
33+
* Creates rendezvous channel which produces the first item after the given initial delay and subsequent items after
34+
* given delay.
35+
*
36+
* Producer to resulting channel tries to adjust delay if consumer cannot keep up:
37+
* ```
38+
* val channel = adjustingTicker(delay = 100)
39+
* delay(350) // 250 ms late
40+
* println(channel.poll()) // prints Unit
41+
* println(channel.poll()) // prints null
42+
*
43+
* delay(50)
44+
* println(channel.poll() // prints Unit, delay was adjusted
45+
* delay(50)
46+
* println(channel.poll() // prints null, we'are not late relatively to previous element
47+
* ```
48+
*
49+
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
50+
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
51+
*
52+
* @param initialDelay delay after which the first item will be produced
53+
* @param delay delay between each elements
54+
* @param unit unit of time that applies to [initialDelay] and [delay]
55+
* @param coroutineContext context of the producing coroutine
56+
*/
57+
public fun adjustingTicker(
58+
delay: Long,
59+
unit: TimeUnit = TimeUnit.MILLISECONDS,
60+
initialDelay: Long = delay,
61+
coroutineContext: CoroutineContext = Unconfined
62+
): ReceiveChannel<Unit> {
63+
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
64+
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
65+
66+
val result = RendezvousChannel<Unit>()
67+
launch(coroutineContext) {
68+
delay(initialDelay, unit)
69+
70+
val delayNs = unit.toNanos(delay)
71+
var deadline = timeSource.nanoTime()
72+
while (true) {
73+
deadline += delayNs
74+
result.send(Unit)
75+
val now = timeSource.nanoTime()
76+
val nextDelay = (deadline - now).coerceAtLeast(0)
77+
if (nextDelay == 0L && delayNs != 0L) {
78+
val adjustedDelay = delayNs - (now - deadline) % delayNs
79+
deadline = now + adjustedDelay
80+
delay(adjustedDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
81+
} else {
82+
delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
83+
}
84+
}
85+
}
86+
87+
return result
88+
}
89+
90+
/**
91+
* Creates rendezvous channel which produces the first item after the given initial delay and subsequent items after
92+
* given delay **after** consumption of the previous item.
93+
*
94+
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
95+
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
96+
*
97+
* @param initialDelay delay after which the first item will be produced
98+
* @param delay delay between each elements
99+
* @param unit unit of time that applies to [initialDelay] and [delay]
100+
* @param coroutineContext context of the producing coroutine
101+
*/
102+
public fun fixedTicker(
103+
delay: Long,
104+
unit: TimeUnit = TimeUnit.MILLISECONDS,
105+
initialDelay: Long = delay,
106+
coroutineContext: CoroutineContext = Unconfined
107+
): ReceiveChannel<Unit> {
108+
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
109+
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
110+
111+
val result = RendezvousChannel<Unit>()
112+
launch(coroutineContext) {
113+
delay(initialDelay, unit)
114+
while (true) {
115+
result.send(Unit)
116+
delay(delay, unit)
117+
}
118+
}
119+
120+
return result
121+
}

core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,27 @@ package guide.channel.example10
1919

2020
import kotlinx.coroutines.experimental.*
2121
import kotlinx.coroutines.experimental.channels.*
22-
import kotlin.coroutines.experimental.*
2322

2423
fun main(args: Array<String>) = runBlocking<Unit> {
25-
val delayChannel = DelayChannel(delay = 100, initialDelay = 0) // create delay channel
26-
var nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
27-
println("Initial element is available immediately: $nextElement") // Initial delay haven't passed yet
24+
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
25+
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
26+
println("Initial element is available immediately: $nextElement") // Initial delay hasn't passed yet
2827

29-
nextElement = withTimeoutOrNull(50) { delayChannel.receive() } // All subsequent elements has 100ms delay
28+
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // All subsequent elements has 100ms delay
3029
println("Next element is not ready in 50 ms: $nextElement")
3130

32-
nextElement = withTimeoutOrNull(51) { delayChannel.receive() }
31+
nextElement = withTimeoutOrNull(51) { tickerChannel.receive() }
3332
println("Next element is ready in 100 ms: $nextElement")
3433

3534
// Emulate large consumption delays
3635
println("Consumer pause in 150ms")
3736
delay(150)
3837
// Next element is available immediately
39-
nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
38+
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
4039
println("Next element is available immediately after large consumer delay: $nextElement")
4140
// Note that the pause between `receive` calls is taken into account and next element arrives faster
42-
nextElement = withTimeoutOrNull(60) { delayChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
41+
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
4342
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
4443

45-
delayChannel.cancel() // indicate that no more elements are needed
44+
tickerChannel.cancel() // indicate that no more elements are needed
4645
}

core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -361,16 +361,6 @@ class GuideTest {
361361
)
362362
}
363363

364-
@Test
365-
fun testGuideChannelExample09() {
366-
test("GuideChannelExample09") { guide.channel.example09.main(emptyArray()) }.verifyLines(
367-
"ping Ball(hits=1)",
368-
"pong Ball(hits=2)",
369-
"ping Ball(hits=3)",
370-
"pong Ball(hits=4)"
371-
)
372-
}
373-
374364
@Test
375365
fun testGuideChannelExample10() {
376366
test("GuideChannelExample10") { guide.channel.example10.main(emptyArray()) }.verifyLines(
@@ -383,6 +373,16 @@ class GuideTest {
383373
)
384374
}
385375

376+
@Test
377+
fun testGuideChannelExample09() {
378+
test("GuideChannelExample09") { guide.channel.example09.main(emptyArray()) }.verifyLines(
379+
"ping Ball(hits=1)",
380+
"pong Ball(hits=2)",
381+
"ping Ball(hits=3)",
382+
"pong Ball(hits=4)"
383+
)
384+
}
385+
386386
@Test
387387
fun testGuideSyncExample01() {
388388
test("GuideSyncExample01") { guide.sync.example01.main(emptyArray()) }.verifyLinesStart(

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)