Skip to content

Commit b5328a7

Browse files
committed
Leave only public ticker function, introduce TickerMode enum
1 parent 03d2ff7 commit b5328a7

File tree

7 files changed

+112
-124
lines changed

7 files changed

+112
-124
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -849,12 +849,15 @@ public final class kotlinx/coroutines/experimental/channels/SubscriptionReceiveC
849849
}
850850

851851
public final class kotlinx/coroutines/experimental/channels/TickerChannelsKt {
852-
public static final fun adjustingTicker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
853-
public static synthetic fun adjustingTicker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
854-
public static final fun fixedTicker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
855-
public static synthetic fun fixedTicker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
856-
public static final fun ticker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;Z)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
857-
public static synthetic fun ticker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;ZILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
852+
public static final fun ticker (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/channels/TickerMode;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
853+
public static synthetic fun ticker$default (JLjava/util/concurrent/TimeUnit;JLkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/channels/TickerMode;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
854+
}
855+
856+
public final class kotlinx/coroutines/experimental/channels/TickerMode : java/lang/Enum {
857+
public static final field FIXED_DELAY Lkotlinx/coroutines/experimental/channels/TickerMode;
858+
public static final field FIXED_PERIOD Lkotlinx/coroutines/experimental/channels/TickerMode;
859+
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/experimental/channels/TickerMode;
860+
public static fun values ()[Lkotlinx/coroutines/experimental/channels/TickerMode;
858861
}
859862

860863
public final class kotlinx/coroutines/experimental/intrinsics/CancellableKt {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/TickerChannels.kt

Lines changed: 75 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -4,118 +4,103 @@ import kotlinx.coroutines.experimental.*
44
import kotlinx.coroutines.experimental.timeunit.*
55
import kotlin.coroutines.experimental.*
66

7-
87
/**
9-
* Creates rendezvous channel which produces the first item after the given initial delay and subsequent items with the
10-
* given delay between them. Backpressure is guaranteed by [RendezvousChannel].
11-
*
12-
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
13-
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
14-
*
15-
* @param initialDelay delay after which the first item will be produced
16-
* @param delay delay between each element
17-
* @param unit unit of time that applies to [initialDelay] and [delay]
18-
* @param coroutineContext context of the producing coroutine
19-
* @param fixedPeriod whether producer should use fixed delay or should attempt to adjust delay when consumer cannot keep up
8+
* Mode for [ticker] function.
209
*/
21-
public fun ticker(
22-
delay: Long,
23-
unit: TimeUnit = TimeUnit.MILLISECONDS,
24-
initialDelay: Long = delay,
25-
coroutineContext: CoroutineContext = Unconfined,
26-
fixedPeriod: Boolean = false
27-
): ReceiveChannel<Unit> {
28-
return if (fixedPeriod) fixedTicker(delay, unit, initialDelay, coroutineContext)
29-
else adjustingTicker(delay, unit, initialDelay, coroutineContext)
10+
enum class TickerMode {
11+
/**
12+
* Adjust delay to maintain fixed period if consumer cannot keep up or is otherwise slow.
13+
* **This is a default mode.**
14+
*
15+
* ```
16+
* val channel = ticker(delay = 100)
17+
* delay(350) // 250 ms late
18+
* println(channel.poll()) // prints Unit
19+
* println(channel.poll()) // prints null
20+
*
21+
* delay(50)
22+
* println(channel.poll()) // prints Unit, delay was adjusted
23+
* delay(50)
24+
* println(channel.poll()) // prints null, we'are not late relatively to previous element
25+
* ```
26+
*/
27+
FIXED_PERIOD,
28+
29+
/**
30+
* Maintains fixed delay between produced elements if consumer cannot keep up or it otherwise slow.
31+
*/
32+
FIXED_DELAY
3033
}
3134

3235
/**
33-
* Creates rendezvous channel which produces the first item after the given initial delay and subsequent items after
34-
* given delay.
36+
* Creates a channel that produces the first item after the given initial delay and subsequent items with the
37+
* given delay between them.
3538
*
36-
* Producer to resulting channel tries to adjust delay if consumer cannot keep up:
37-
* ```
38-
* val channel = adjustingTicker(delay = 100)
39-
* delay(350) // 250 ms late
40-
* println(channel.poll()) // prints Unit
41-
* println(channel.poll()) // prints null
42-
*
43-
* delay(50)
44-
* println(channel.poll() // prints Unit, delay was adjusted
45-
* delay(50)
46-
* println(channel.poll() // prints null, we'are not late relatively to previous element
47-
* ```
39+
* The resulting channel is a [rendezvous channel][RendezvousChannel]. When receiver from this channel does not keep
40+
* up with receiving the elements from this channel, they are not being being send due to backpressure. The actual
41+
* timing behavior of ticker in this case is controlled by [mode] parameter which
42+
* is set to [TickerMode.FIXED_PERIOD] by default. See [TickerMode] for other details.
4843
*
4944
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
50-
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
5145
*
52-
* @param initialDelay delay after which the first item will be produced
53-
* @param delay delay between each elements
54-
* @param unit unit of time that applies to [initialDelay] and [delay]
55-
* @param coroutineContext context of the producing coroutine
46+
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher by default and started eagerly.
47+
*
48+
* @param delay delay between each element.
49+
* @param unit unit of time that applies to [initialDelay] and [delay] (in milliseconds by default).
50+
* @param initialDelay delay after which the first element will be produced (it is equal to [delay] by default).
51+
* @param context context of the producing coroutine.
52+
* @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
5653
*/
57-
public fun adjustingTicker(
54+
public fun ticker(
5855
delay: Long,
5956
unit: TimeUnit = TimeUnit.MILLISECONDS,
6057
initialDelay: Long = delay,
61-
coroutineContext: CoroutineContext = Unconfined
58+
context: CoroutineContext = EmptyCoroutineContext,
59+
mode: TickerMode = TickerMode.FIXED_PERIOD
6260
): ReceiveChannel<Unit> {
6361
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
6462
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
65-
66-
val result = RendezvousChannel<Unit>()
67-
launch(coroutineContext) {
68-
delay(initialDelay, unit)
69-
70-
val delayNs = unit.toNanos(delay)
71-
var deadline = timeSource.nanoTime()
72-
while (true) {
73-
deadline += delayNs
74-
result.send(Unit)
75-
val now = timeSource.nanoTime()
76-
val nextDelay = (deadline - now).coerceAtLeast(0)
77-
if (nextDelay == 0L && delayNs != 0L) {
78-
val adjustedDelay = delayNs - (now - deadline) % delayNs
79-
deadline = now + adjustedDelay
80-
delay(adjustedDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
81-
} else {
82-
delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
83-
}
63+
return produce(Unconfined + context, capacity = 0) {
64+
when(mode) {
65+
TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, unit, initialDelay, channel)
66+
TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, unit, initialDelay, channel)
8467
}
8568
}
86-
87-
return result
8869
}
8970

90-
/**
91-
* Creates rendezvous channel which produces the first item after the given initial delay and subsequent items after
92-
* given delay **after** consumption of the previous item.
93-
*
94-
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
95-
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
96-
*
97-
* @param initialDelay delay after which the first item will be produced
98-
* @param delay delay between each elements
99-
* @param unit unit of time that applies to [initialDelay] and [delay]
100-
* @param coroutineContext context of the producing coroutine
101-
*/
102-
public fun fixedTicker(
71+
private suspend fun fixedPeriodTicker(
10372
delay: Long,
104-
unit: TimeUnit = TimeUnit.MILLISECONDS,
105-
initialDelay: Long = delay,
106-
coroutineContext: CoroutineContext = Unconfined
107-
): ReceiveChannel<Unit> {
108-
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
109-
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
110-
111-
val result = RendezvousChannel<Unit>()
112-
launch(coroutineContext) {
113-
delay(initialDelay, unit)
114-
while (true) {
115-
result.send(Unit)
116-
delay(delay, unit)
73+
unit: TimeUnit,
74+
initialDelay: Long,
75+
channel: SendChannel<Unit>
76+
) {
77+
var deadline = timeSource.nanoTime() + unit.toNanos(initialDelay)
78+
delay(initialDelay, unit)
79+
val delayNs = unit.toNanos(delay)
80+
while (true) {
81+
deadline += delayNs
82+
channel.send(Unit)
83+
val now = timeSource.nanoTime()
84+
val nextDelay = (deadline - now).coerceAtLeast(0)
85+
if (nextDelay == 0L && delayNs != 0L) {
86+
val adjustedDelay = delayNs - (now - deadline) % delayNs
87+
deadline = now + adjustedDelay
88+
delay(adjustedDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
89+
} else {
90+
delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
11791
}
11892
}
93+
}
11994

120-
return result
95+
private suspend fun fixedDelayTicker(
96+
delay: Long,
97+
unit: TimeUnit,
98+
initialDelay: Long,
99+
channel: SendChannel<Unit>
100+
) {
101+
delay(initialDelay, unit)
102+
while (true) {
103+
channel.send(Unit)
104+
delay(delay, unit)
105+
}
121106
}

core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,24 @@ import kotlinx.coroutines.experimental.*
2121
import kotlinx.coroutines.experimental.channels.*
2222

2323
fun main(args: Array<String>) = runBlocking<Unit> {
24-
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
24+
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
2525
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
2626
println("Initial element is available immediately: $nextElement") // Initial delay hasn't passed yet
2727

2828
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // All subsequent elements has 100ms delay
2929
println("Next element is not ready in 50 ms: $nextElement")
3030

31-
nextElement = withTimeoutOrNull(51) { tickerChannel.receive() }
31+
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
3232
println("Next element is ready in 100 ms: $nextElement")
3333

3434
// Emulate large consumption delays
35-
println("Consumer pause in 150ms")
35+
println("Consumer pauses for 150ms")
3636
delay(150)
3737
// Next element is available immediately
3838
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
3939
println("Next element is available immediately after large consumer delay: $nextElement")
4040
// Note that the pause between `receive` calls is taken into account and next element arrives faster
41-
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
41+
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
4242
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
4343

4444
tickerChannel.cancel() // indicate that no more elements are needed

core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ class GuideTest {
367367
"Initial element is available immediately: kotlin.Unit",
368368
"Next element is not ready in 50 ms: null",
369369
"Next element is ready in 100 ms: kotlin.Unit",
370-
"Consumer pause in 150ms",
370+
"Consumer pauses for 150ms",
371371
"Next element is available immediately after large consumer delay: kotlin.Unit",
372372
"Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit"
373373
)

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TickerChannelCommonTest.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlin.test.*
99

1010
@RunWith(Parameterized::class)
1111
class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
12-
1312
companion object {
1413
@Parameterized.Parameters(name = "{0}")
1514
@JvmStatic
@@ -18,12 +17,14 @@ class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
1817
}
1918

2019
enum class Channel {
21-
DELAY {
22-
override fun invoke(delay: Long, initialDelay: Long) = adjustingTicker(delay, initialDelay = initialDelay)
20+
FIXED_PERIOD {
21+
override fun invoke(delay: Long, initialDelay: Long) =
22+
ticker(delay, initialDelay = initialDelay, mode = TickerMode.FIXED_PERIOD)
2323
},
2424

2525
FIXED_DELAY {
26-
override fun invoke(delay: Long, initialDelay: Long) = fixedTicker(delay, initialDelay = initialDelay)
26+
override fun invoke(delay: Long, initialDelay: Long) =
27+
ticker(delay, initialDelay = initialDelay, mode = TickerMode.FIXED_DELAY)
2728
};
2829

2930
abstract operator fun invoke(delay: Long, initialDelay: Long = 0): ReceiveChannel<Unit>

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TickerChannelTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@ import kotlinx.coroutines.experimental.*
44
import org.junit.*
55

66
class TickerChannelTest : TestBase() {
7-
87
@Test
98
fun testFixedDelayChannelBackpressure() = runTest {
10-
val delayChannel = fixedTicker(delay = 100, initialDelay = 0)
9+
val delayChannel = ticker(delay = 100, initialDelay = 0, mode = TickerMode.FIXED_DELAY)
1110
delayChannel.checkNotEmpty()
1211
delayChannel.checkEmpty()
1312

coroutines-guide.md

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,36 +1669,36 @@ The first four elements are added to the buffer and the sender suspends when try
16691669

16701670
### Ticker channels
16711671

1672-
Ticker channel is a special rendezvous channel, which produces `Unit` every time given delay passes since last consumption from this channel.
1673-
Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] operators, using this channel as one of [select] clauses and performing "timeout" action in its [onReceive][ReceiveChannel.onReceive].
1672+
Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel.
1673+
Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce]
1674+
pipelines and operators that do windowing and other time-dependend processing.
1675+
Ticker channel can be used in [select] to perform "on tick" action.
16741676

1675-
To create such channel, use factory method [ticker] and to indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
1677+
To create such channel use a factory method [ticker].
1678+
To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
16761679

16771680
Now let's see how it works in practice:
1678-
<!--- INCLUDE
1679-
import kotlinx.coroutines.experimental.channels.*
1680-
-->
16811681

16821682
```kotlin
16831683
fun main(args: Array<String>) = runBlocking<Unit> {
1684-
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
1684+
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
16851685
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
16861686
println("Initial element is available immediately: $nextElement") // Initial delay hasn't passed yet
16871687

16881688
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // All subsequent elements has 100ms delay
16891689
println("Next element is not ready in 50 ms: $nextElement")
16901690

1691-
nextElement = withTimeoutOrNull(51) { tickerChannel.receive() }
1691+
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
16921692
println("Next element is ready in 100 ms: $nextElement")
16931693

16941694
// Emulate large consumption delays
1695-
println("Consumer pause in 150ms")
1695+
println("Consumer pauses for 150ms")
16961696
delay(150)
16971697
// Next element is available immediately
16981698
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
16991699
println("Next element is available immediately after large consumer delay: $nextElement")
17001700
// Note that the pause between `receive` calls is taken into account and next element arrives faster
1701-
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
1701+
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
17021702
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
17031703

17041704
tickerChannel.cancel() // indicate that no more elements are needed
@@ -1713,16 +1713,18 @@ It prints following lines:
17131713
Initial element is available immediately: kotlin.Unit
17141714
Next element is not ready in 50 ms: null
17151715
Next element is ready in 100 ms: kotlin.Unit
1716-
Consumer pause in 150ms
1716+
Consumer pauses for 150ms
17171717
Next element is available immediately after large consumer delay: kotlin.Unit
17181718
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
17191719
```
17201720

17211721
<!--- TEST -->
17221722

1723-
Note that [ticker] is aware of possible consumer pauses and adapts next element delay if a pause occurs.
1724-
[fixedTicker] doesn't do so and produces elements with fixed delay after consumption, but both have built-in backpressure.
1725-
via [RendezvousChannel]
1723+
Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element
1724+
delay if a pause occurs, trying to maintain a fixed rate of produced elements.
1725+
1726+
Optionally, a `mode` parameters equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed
1727+
delay between elements.
17261728

17271729
### Channels are fair
17281730

@@ -2502,12 +2504,10 @@ Channel was closed
25022504
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
25032505
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
25042506
[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html
2505-
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
25062507
[ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/ticker.html
25072508
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/cancel.html
2508-
[fixedTicker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/fixed-ticker.html
2509-
[RendezvousChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-rendezvous-channel/index.html
25102509
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
2510+
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
25112511
[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html
25122512
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html
25132513
<!--- INDEX kotlinx.coroutines.experimental.selects -->

0 commit comments

Comments
 (0)