Skip to content

Commit fc3c8b9

Browse files
committed
Use kotlinx datetime instead of java.time
1 parent 3ac3749 commit fc3c8b9

File tree

15 files changed

+77
-95
lines changed

15 files changed

+77
-95
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointStrategy.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dev.helight.krescent.checkpoint
22

33
import dev.helight.krescent.event.EventMessage
4-
import java.time.Instant
4+
import kotlinx.datetime.Clock
55
import kotlin.concurrent.atomics.AtomicLong
66
import kotlin.concurrent.atomics.ExperimentalAtomicApi
77
import kotlin.concurrent.atomics.incrementAndFetch
@@ -58,9 +58,9 @@ class FixedTimeRateCheckpointStrategy(
5858

5959
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
6060
if (lastCheckpoint == null) return true
61-
val currentTime = Instant.now()
62-
val duration = currentTime.minusMillis(lastCheckpoint.timestamp.toEpochMilli()).toEpochMilli()
63-
return duration >= rate.inWholeMilliseconds
61+
val currentTime = Clock.System.now()
62+
val duration = currentTime - (lastCheckpoint.timestamp)
63+
return duration >= rate
6464
}
6565
}
6666

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointingEventSourceConsumer.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import dev.helight.krescent.source.EventSourceConsumer
55
import dev.helight.krescent.source.EventSourcingStrategy
66
import dev.helight.krescent.source.StreamingEventSource
77
import dev.helight.krescent.source.StreamingToken
8-
import java.time.Instant
8+
import kotlinx.datetime.Clock
99

1010
class CheckpointingEventSourceConsumer(
1111
val namespace: String,
@@ -73,7 +73,7 @@ class CheckpointingEventSourceConsumer(
7373
namespace = namespace,
7474
version = version,
7575
position = position.serialize(),
76-
timestamp = Instant.now(),
76+
timestamp = Clock.System.now(),
7777
data = bucket
7878
)
7979
}
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package dev.helight.krescent.checkpoint
22

3-
import dev.helight.krescent.serialization.InstantSerializer
3+
import kotlinx.datetime.Instant
44
import kotlinx.serialization.Serializable
5-
import java.time.Instant
65

76
@Serializable
87
data class StoredCheckpoint(
98
val namespace: String,
109
val version: String,
1110
val position: String,
12-
@Serializable(with = InstantSerializer::class)
1311
val timestamp: Instant,
1412
val data: CheckpointBucket,
1513
)

krescent-core/src/main/kotlin/dev/helight/krescent/event/Event.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dev.helight.krescent.event
22

33
import dev.helight.krescent.source.StreamingToken
4-
import java.time.Instant
4+
import kotlinx.datetime.Instant
55

66
/**
77
* Base class for all events in the Krescent framework.

krescent-core/src/main/kotlin/dev/helight/krescent/event/EventMessage.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package dev.helight.krescent.event
22

3-
import dev.helight.krescent.serialization.InstantSerializer
3+
import kotlinx.datetime.Clock
4+
import kotlinx.datetime.Instant
45
import kotlinx.serialization.Serializable
56
import kotlinx.serialization.json.JsonElement
6-
import java.time.Instant
77
import java.util.*
88

99
/**
@@ -18,8 +18,7 @@ import java.util.*
1818
@Serializable
1919
data class EventMessage(
2020
val id: String = UUID.randomUUID().toString(),
21-
@Serializable(with = InstantSerializer::class)
22-
val timestamp: Instant = Instant.now(),
21+
val timestamp: Instant = Clock.System.now(),
2322
val type: String,
2423
val payload: JsonElement,
2524
)

krescent-core/src/main/kotlin/dev/helight/krescent/serialization/InstantSerializer.kt

Lines changed: 0 additions & 25 deletions
This file was deleted.

krescent-core/src/main/kotlin/dev/helight/krescent/source/StreamingEventSource.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package dev.helight.krescent.source
22

33
import dev.helight.krescent.event.EventMessage
44
import kotlinx.coroutines.flow.Flow
5-
import java.time.Instant
5+
import kotlinx.datetime.Instant
66

77
/**
88
* Interface for an event source that provides a replayable stream of events.

krescent-core/src/main/kotlin/dev/helight/krescent/source/impl/InMemoryEventStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ import dev.helight.krescent.source.SubscribingEventSource
99
import kotlinx.coroutines.flow.*
1010
import kotlinx.coroutines.sync.Mutex
1111
import kotlinx.coroutines.sync.withLock
12+
import kotlinx.datetime.Instant
1213
import kotlinx.serialization.Serializable
1314
import kotlinx.serialization.encodeToString
1415
import kotlinx.serialization.json.Json
1516
import java.nio.charset.StandardCharsets
16-
import java.time.Instant
1717

1818
class InMemoryEventStore(
1919
private val events: MutableList<EventMessage> = mutableListOf(),

krescent-core/src/main/kotlin/dev/helight/krescent/source/impl/MergingStreamingEventSource.kt

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import kotlinx.coroutines.*
77
import kotlinx.coroutines.flow.*
88
import kotlinx.coroutines.sync.Mutex
99
import kotlinx.coroutines.sync.withLock
10+
import kotlinx.datetime.Clock
11+
import kotlinx.datetime.Instant
1012
import kotlinx.serialization.encodeToString
1113
import kotlinx.serialization.json.Json
12-
import java.time.Instant
1314
import java.util.*
15+
import kotlin.time.Duration.Companion.milliseconds
16+
1417

1518
/**
1619
* A composite streaming event source that merges and manages multiple [StreamingEventSource] instances
@@ -81,14 +84,14 @@ class MergingStreamingEventSource(
8184
val buffers = keys.associateWith { LinkedList<Pair<EventMessage, StreamingToken<*>>>() }
8285
val cursors = initialToken.positions.toMutableMap()
8386
val terminated = mutableSetOf<String>()
84-
val deadline = Instant.now().minusMillis(minAge)!!
87+
val deadline = Clock.System.now().minus(minAge.milliseconds)
8588
val mutex = Mutex()
8689
var token: CompositeStreamingToken = initialToken
8790

8891
suspend fun fetchBatch(key: String) {
8992
val cursor = cursors[key]
9093
val events = sources[key]?.fetchEventsAfter(cursor, batchSize)?.filter { (event, _) ->
91-
event.timestamp.isBefore(deadline) // Remove events that are too recent
94+
event.timestamp < deadline // Remove events that are too recent
9295
}?.toList() ?: emptyList()
9396

9497
mutex.withLock {
@@ -143,7 +146,7 @@ class MergingStreamingEventSource(
143146
val mutex = Mutex()
144147
var token: CompositeStreamingToken = initialToken
145148
var incomingBuffer = PriorityQueue<IncomingMessage>()
146-
var lastAcknowledged: Instant = Instant.ofEpochMilli(0)
149+
var lastAcknowledged: Instant = Instant.fromEpochSeconds(0)
147150

148151
suspend fun perform(): Flow<Pair<EventMessage, StreamingToken<*>>> = coroutineScope {
149152
channelFlow {
@@ -175,12 +178,12 @@ class MergingStreamingEventSource(
175178
val outgoing = ArrayDeque<Pair<EventMessage, CompositeStreamingToken>>()
176179
while (true) {
177180
// Poll old enough messages, tick internal cursors and then add them to the outgoing queue
178-
val deadline = Instant.now().minusMillis(minAge)
181+
val deadline = Clock.System.now() - minAge.milliseconds
179182
mutex.withLock {
180-
while (incomingBuffer.peek()?.message?.timestamp?.isBefore(deadline) ?: false) {
183+
while (incomingBuffer.peek()?.message?.timestamp?.let { it < deadline } ?: false) {
181184
val oldest = incomingBuffer.poll()
182185
val timestamp = oldest.message.timestamp
183-
if (timestamp.isBefore(lastAcknowledged)) {
186+
if (timestamp < lastAcknowledged) {
184187
throw OutOfOrderStreamException(
185188
sourceId = oldest.source, sourceToken = oldest.position, timestamp = timestamp
186189
)

krescent-core/src/test/kotlin/dev/helight/krescent/source/impl/ChronoBufferedMergeStreamTest.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package dev.helight.krescent.source.impl
33
import dev.helight.krescent.event.EventMessage
44
import kotlinx.coroutines.flow.toList
55
import kotlinx.coroutines.runBlocking
6+
import kotlinx.datetime.Instant
67
import kotlinx.serialization.json.JsonNull
78
import org.junit.jupiter.api.Assertions
8-
import java.time.Instant
99
import kotlin.test.Test
1010

1111
class ChronoBufferedMergeStreamTest {
@@ -14,22 +14,22 @@ class ChronoBufferedMergeStreamTest {
1414
fun test() = runBlocking {
1515
val a = InMemoryEventStore(
1616
mutableListOf(
17-
EventMessage(type = "1", payload = JsonNull, timestamp = Instant.ofEpochMilli(1)),
18-
EventMessage(type = "5", payload = JsonNull, timestamp = Instant.ofEpochMilli(5)),
17+
EventMessage(type = "1", payload = JsonNull, timestamp = Instant.fromEpochMilliseconds(1)),
18+
EventMessage(type = "5", payload = JsonNull, timestamp = Instant.fromEpochMilliseconds(5)),
1919
)
2020
)
2121

2222
val b = InMemoryEventStore(
2323
mutableListOf(
24-
EventMessage(type = "2", payload = JsonNull, timestamp = Instant.ofEpochMilli(2)),
25-
EventMessage(type = "6", payload = JsonNull, timestamp = Instant.ofEpochMilli(6)),
24+
EventMessage(type = "2", payload = JsonNull, timestamp = Instant.fromEpochMilliseconds(2)),
25+
EventMessage(type = "6", payload = JsonNull, timestamp = Instant.fromEpochMilliseconds(6)),
2626
)
2727
)
2828

2929
val c = InMemoryEventStore(
3030
mutableListOf(
31-
EventMessage(type = "3", payload = JsonNull, timestamp = Instant.ofEpochMilli(3)),
32-
EventMessage(type = "4", payload = JsonNull, timestamp = Instant.ofEpochMilli(4)),
31+
EventMessage(type = "3", payload = JsonNull, timestamp = Instant.fromEpochMilliseconds(3)),
32+
EventMessage(type = "4", payload = JsonNull, timestamp = Instant.fromEpochMilliseconds(4)),
3333
)
3434
)
3535
val merged = ChronoBufferedMergeStreamEventSource.Companion.create(listOf(a, b, c)).fetchEventsAfter().toList()

0 commit comments

Comments
 (0)