Skip to content

Commit 40fda3d

Browse files
committed
Add StreamBuilder and StreamMatcher for event stream testing
1 parent e40ee24 commit 40fda3d

File tree

3 files changed

+387
-0
lines changed

3 files changed

+387
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package dev.helight.krescent.test
2+
3+
import dev.helight.krescent.event.Event
4+
import dev.helight.krescent.event.EventCatalog
5+
import dev.helight.krescent.event.EventMessage
6+
import dev.helight.krescent.source.StoredEventSource
7+
import dev.helight.krescent.source.impl.InMemoryEventStore
8+
import kotlinx.coroutines.flow.map
9+
import kotlinx.datetime.Instant
10+
import kotlin.time.Duration
11+
12+
suspend fun buildStream(catalog: EventCatalog, block: StreamBuilder.() -> Unit): InMemoryEventStore {
13+
val builder = StreamBuilder(catalog)
14+
builder.block()
15+
return builder.build()
16+
}
17+
18+
class StreamBuilder(
19+
val catalog: EventCatalog,
20+
) {
21+
private var list: MutableList<EventMessage> = mutableListOf()
22+
private var time = 0L
23+
24+
fun <T : Event> event(event: T): StreamBuilder {
25+
val message = catalog.create(event).copy(timestamp = Instant.fromEpochMilliseconds(time))
26+
list.add(message)
27+
return this
28+
}
29+
30+
operator fun Event.unaryPlus() {
31+
val messsage = catalog.create(this).copy(timestamp = Instant.fromEpochMilliseconds(time))
32+
list.add(messsage)
33+
}
34+
35+
fun at(time: Instant): StreamBuilder {
36+
this.time = time.toEpochMilliseconds()
37+
return this
38+
}
39+
40+
fun sleep(millis: Long): StreamBuilder {
41+
time += millis
42+
return this
43+
}
44+
45+
fun sleep(duration: Duration): StreamBuilder {
46+
time += duration.inWholeMilliseconds
47+
return this
48+
}
49+
50+
fun include(vararg events: EventMessage): StreamBuilder {
51+
list.addAll(events)
52+
return this
53+
}
54+
55+
suspend fun include(source: StoredEventSource): StreamBuilder {
56+
source.fetchEventsAfter().map { it.first }.collect {
57+
list.add(it.copy(timestamp = Instant.fromEpochMilliseconds(time)))
58+
}
59+
return this
60+
}
61+
62+
suspend fun combine(source: StoredEventSource): StreamBuilder {
63+
source.fetchEventsAfter().map { it.first }.collect {
64+
list.add(it)
65+
}
66+
return this
67+
}
68+
69+
internal suspend fun build(): InMemoryEventStore = InMemoryEventStore().also {
70+
list.sortBy { message -> message.timestamp }
71+
it.publishAll(list)
72+
}
73+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package dev.helight.krescent.test
2+
3+
import dev.helight.krescent.event.Event
4+
import dev.helight.krescent.event.EventCatalog
5+
import dev.helight.krescent.event.SystemEvent
6+
import dev.helight.krescent.source.StoredEventSource
7+
import kotlinx.coroutines.flow.filter
8+
import kotlinx.coroutines.flow.mapNotNull
9+
import kotlinx.coroutines.flow.toList
10+
import kotlin.test.assertFails
11+
import kotlin.test.asserter
12+
13+
internal suspend inline fun assertStreamFails(
14+
catalog: EventCatalog,
15+
source: StoredEventSource,
16+
crossinline block: suspend StreamMatcher.() -> Unit,
17+
) = assertFails {
18+
assertStream(catalog, source, includeSystemEvents = false, block = block)
19+
}
20+
21+
suspend inline fun assertStream(
22+
catalog: EventCatalog,
23+
source: StoredEventSource,
24+
includeSystemEvents: Boolean = false,
25+
crossinline block: suspend StreamMatcher.() -> Unit,
26+
) {
27+
val list = source.fetchEventsAfter()
28+
.mapNotNull { catalog.decode(it.first, it.second) }
29+
.filter { includeSystemEvents || it !is SystemEvent }
30+
.toList()
31+
32+
val matcher = StreamMatcher(list, catalog)
33+
matcher.block()
34+
}
35+
36+
class StreamMatcher(
37+
val events: List<Event>,
38+
private val catalog: EventCatalog,
39+
) {
40+
@PublishedApi
41+
internal var cursor = 0
42+
43+
@PublishedApi
44+
internal val size: Int
45+
get() = events.size
46+
47+
@PublishedApi
48+
internal val available: Int
49+
get() = events.size - cursor
50+
51+
fun countExact(count: Int): List<Event> {
52+
asserter.assertTrue({
53+
"Expected $count events, but got ${events.size}"
54+
}, count == events.size)
55+
return events.subList(cursor, cursor + count)
56+
}
57+
58+
fun countAtLeast(count: Int): List<Event> {
59+
asserter.assertTrue({
60+
"Expected at least $count more event(s), but got $available"
61+
}, available >= count)
62+
return events.subList(cursor, cursor + count)
63+
}
64+
65+
fun countAvailableExact(count: Int): List<Event> {
66+
asserter.assertTrue({
67+
"Expected $count available events, but got $available"
68+
}, available == count)
69+
return events.subList(cursor, cursor + count)
70+
}
71+
72+
fun countAvailableAtLeast(count: Int): List<Event> {
73+
asserter.assertTrue({
74+
"Expected at least $count available events, but got $available"
75+
}, available >= count)
76+
return events.subList(cursor, cursor + count)
77+
}
78+
79+
fun peek(): Event? {
80+
val hasNext = available >= 1
81+
if (!hasNext) return null
82+
return events[cursor]
83+
}
84+
85+
fun nextEvent(): Event {
86+
countAtLeast(1)
87+
val event = events[cursor]
88+
cursor++
89+
return event
90+
}
91+
92+
@JvmName("nextEventTyped")
93+
inline fun <reified T : Event> nextEvent(): T {
94+
val evt = nextEvent()
95+
asserter.assertTrue(
96+
{ "Expected an event of type ${T::class.java.simpleName} but got ${evt::class.java.simpleName}" },
97+
evt is T
98+
)
99+
return evt as T
100+
}
101+
102+
@JvmName("nextEventTyped")
103+
inline fun <reified T : Event> nextEvent(message: String? = null, crossinline matcher: (T) -> Boolean): T {
104+
val evt = nextEvent<T>()
105+
asserter.assertTrue(
106+
{ message ?: "Event of type ${T::class.java.simpleName} did not match the expected criteria." },
107+
matcher(evt)
108+
)
109+
return evt
110+
}
111+
112+
inline fun <reified T : Event> ifNextEvent(message: String? = null, crossinline matcher: (T) -> Boolean): T? {
113+
if (available == 0) return null
114+
val evt = nextEvent<T>()
115+
asserter.assertTrue(
116+
{ message ?: "Event of type ${T::class.java.simpleName} did not match the expected criteria." },
117+
matcher(evt)
118+
)
119+
return evt
120+
}
121+
122+
fun hasEvent() {
123+
asserter.assertTrue(
124+
{ "Expected at least one event, but none was found." },
125+
events.isNotEmpty()
126+
)
127+
}
128+
129+
@JvmName("hasEventTyped")
130+
inline fun <reified T : Event> hasEvent() {
131+
val found = events.any { it is T }
132+
asserter.assertTrue(
133+
{ "Expected at least one event of type ${T::class.java.simpleName}, but none was found." },
134+
found
135+
)
136+
}
137+
138+
@JvmName("hasEventTyped")
139+
inline fun <reified T : Event> hasEvent(message: String? = null, crossinline matcher: (T) -> Boolean) {
140+
hasEvent<T>()
141+
val found = events.any { it is T && matcher(it) }
142+
asserter.assertTrue(
143+
{ message ?: "No event of type ${T::class.java.simpleName} matched the expected criteria." },
144+
found
145+
)
146+
}
147+
148+
fun hasNoEvent() {
149+
asserter.assertTrue(
150+
{ "Expected no events, but some were found." },
151+
events.isEmpty()
152+
)
153+
}
154+
155+
@JvmName("hasNoEventTyped")
156+
inline fun <reified T : Event> hasNoEvent() {
157+
val found = events.any { it is T }
158+
asserter.assertTrue(
159+
{ "Expected no events of type ${T::class.java.simpleName}, but some were found." },
160+
!found
161+
)
162+
}
163+
164+
@JvmName("hasNoEventTyped")
165+
inline fun <reified T : Event> hasNoEvent(message: String? = null, crossinline matcher: (T) -> Boolean) {
166+
val found = events.any { it is T && matcher(it) }
167+
asserter.assertTrue(
168+
{ message ?: "Some events of type ${T::class.java.simpleName} matched the unexpected criteria." },
169+
!found
170+
)
171+
}
172+
173+
fun isTail() {
174+
asserter.assertTrue(
175+
{ "Expected to be at the end of the stream, but $available event(s) are still available." },
176+
available == 0
177+
)
178+
}
179+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package dev.helight.krescent.test
2+
3+
import dev.helight.krescent.event.Event
4+
import dev.helight.krescent.event.buildEventCatalog
5+
import kotlinx.coroutines.runBlocking
6+
import kotlinx.serialization.Serializable
7+
import kotlin.test.Test
8+
import kotlin.test.assertEquals
9+
10+
class StreamAssertionTest {
11+
12+
@Test
13+
fun `Check next event and event counting`(): Unit = runBlocking {
14+
val stream = buildStream(sampleCatalog) {
15+
+EventA
16+
+EventB
17+
+EventC(42)
18+
}
19+
20+
assertStream(sampleCatalog, stream) {
21+
val a = countAtLeast(2)
22+
assertEquals(2, a.size)
23+
24+
val b = countExact(3)
25+
assertEquals(3, b.size)
26+
27+
nextEvent<EventA>()
28+
val c = countAvailableExact(2)
29+
assertEquals(2, c.size)
30+
nextEvent<EventB>()
31+
nextEvent<EventC>()
32+
}
33+
34+
// Wrong count
35+
assertStreamFails(sampleCatalog, stream) {
36+
countExact(2)
37+
}
38+
39+
// Wrong order
40+
assertStreamFails(sampleCatalog, stream) {
41+
nextEvent<EventB>()
42+
nextEvent<EventA>()
43+
nextEvent<EventC>()
44+
}
45+
46+
// Over-read
47+
assertStreamFails(sampleCatalog, stream) {
48+
nextEvent<EventA>()
49+
nextEvent<EventB>()
50+
nextEvent<EventC>()
51+
nextEvent<EventC>()
52+
}
53+
}
54+
55+
@Test
56+
fun `Empty stream checks`(): Unit = runBlocking {
57+
val stream = buildStream(sampleCatalog) {}
58+
59+
assertStream(sampleCatalog, stream) {
60+
countExact(0)
61+
countAvailableExact(0)
62+
countAtLeast(0)
63+
countAvailableAtLeast(0)
64+
hasNoEvent()
65+
}
66+
67+
assertStreamFails(sampleCatalog, stream) { countAtLeast(1) }
68+
assertStreamFails(sampleCatalog, stream) { hasEvent() }
69+
assertStreamFails(sampleCatalog, stream) { nextEvent<EventA>() }
70+
}
71+
72+
@Test
73+
fun `Verify event presence checks`(): Unit = runBlocking {
74+
val stream = buildStream(sampleCatalog) {
75+
+EventA
76+
+EventB
77+
}
78+
79+
assertStream(sampleCatalog, stream) {
80+
hasEvent()
81+
hasEvent<EventA>()
82+
hasEvent<EventB>()
83+
hasNoEvent<EventC>()
84+
}
85+
86+
assertStreamFails(sampleCatalog, stream) { hasNoEvent() }
87+
assertStreamFails(sampleCatalog, stream) { hasEvent<EventC>() }
88+
}
89+
90+
@Test
91+
fun `Check matchers`(): Unit = runBlocking {
92+
val stream = buildStream(sampleCatalog) {
93+
+EventC(10)
94+
+EventC(20)
95+
+EventC(25)
96+
}
97+
98+
assertStream(sampleCatalog, stream) {
99+
hasEvent<EventC> { it.value == 10 }
100+
hasEvent<EventC> { it.value == 20 }
101+
hasEvent<EventC> { it.value == 25 }
102+
hasNoEvent<EventC> { it.value >= 30 }
103+
}
104+
105+
assertStream(sampleCatalog, stream) {
106+
nextEvent<EventC> { it.value == 10 }
107+
nextEvent<EventC> { it.value == 20 }
108+
nextEvent<EventC> { it.value == 25 }
109+
}
110+
111+
assertStream(sampleCatalog, stream) {
112+
nextEvent<EventC> { it.value == 10 }
113+
nextEvent<EventC> { it.value == 20 }
114+
ifNextEvent<EventC> { it.value == 25 }
115+
ifNextEvent<EventC> { it.value == 25 }
116+
}
117+
118+
}
119+
120+
}
121+
122+
val sampleCatalog = buildEventCatalog(1) {
123+
event<EventA>("event-a")
124+
event<EventB>("event-b")
125+
event<EventC>("event-c")
126+
}
127+
128+
@Serializable
129+
object EventA : Event()
130+
131+
@Serializable
132+
object EventB : Event()
133+
134+
@Serializable
135+
class EventC(val value: Int) : Event()

0 commit comments

Comments
 (0)