Skip to content

Commit 6c81385

Browse files
committed
Fixed failing unit tests
- Added the ability to use the `on` method with a callback also for `Unit` result type - Move the logic for sending `Client did not provide a result.` before the message is emitted - Perform the `resultProviderRegistry.contains` check when `on` method is called - Always include the catched exception when rethrowing `RuntimeException` - Simplify tests by using a base class and Unconfined dispatcher
1 parent f22ce63 commit 6c81385

File tree

13 files changed

+911
-1119
lines changed

13 files changed

+911
-1119
lines changed

buildSrc/src/main/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationTask.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ open class HubCommunicationTask : DefaultTask() {
307307
returns = Unit::class.asTypeName(),
308308
body = {
309309
if (!reified) {
310-
addStatement("%L", "on(target = target, hasResult = true)")
310+
addStatement("%L", "on(target = target, hasResult = resultType != Unit::class)")
311311
.addCode(
312312
format = "%L",
313313
"""

signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package eu.lepicekmichal.signalrkore
22

33
import io.ktor.client.HttpClient
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Dispatchers
46
import kotlinx.serialization.json.Json
57
import kotlin.time.Duration
68
import kotlin.time.Duration.Companion.seconds
@@ -12,8 +14,16 @@ class HttpHubConnectionBuilder(private val url: String) {
1214
*/
1315
var transportEnum: TransportEnum = TransportEnum.All
1416

17+
/**
18+
* The [Transport] to be used by the [eu.lepicekmichal.signalrkore.HubConnection]
19+
*/
1520
internal var transport: Transport? = null
1621

22+
/**
23+
* The [CoroutineDispatcher] to be used by the [eu.lepicekmichal.signalrkore.HubConnection]
24+
*/
25+
internal var dispatcher: CoroutineDispatcher = Dispatchers.IO
26+
1727
/**
1828
* The [HttpClient] to be used by the [eu.lepicekmichal.signalrkore.HubConnection]
1929
*/
@@ -78,5 +88,6 @@ class HttpHubConnectionBuilder(private val url: String) {
7888
transportEnum,
7989
json,
8090
logger,
91+
dispatcher
8192
)
8293
}

signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import kotlinx.coroutines.CoroutineScope
44
import kotlinx.coroutines.cancel
55
import kotlinx.coroutines.channels.awaitClose
66
import kotlinx.coroutines.flow.Flow
7-
import kotlinx.coroutines.flow.SharedFlow
7+
import kotlinx.coroutines.flow.MutableSharedFlow
88
import kotlinx.coroutines.flow.callbackFlow
99
import kotlinx.coroutines.flow.catch
1010
import kotlinx.coroutines.flow.filter
@@ -13,7 +13,6 @@ import kotlinx.coroutines.flow.map
1313
import kotlinx.coroutines.flow.onCompletion
1414
import kotlinx.coroutines.flow.onEach
1515
import kotlinx.coroutines.flow.onStart
16-
import kotlinx.coroutines.flow.onSubscription
1716
import kotlinx.coroutines.launch
1817
import kotlinx.serialization.InternalSerializationApi
1918
import kotlinx.serialization.SerializationException
@@ -26,9 +25,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
2625

2726
protected abstract val scope: CoroutineScope
2827

29-
protected abstract val receivedInvocations: SharedFlow<HubMessage.Invocation>
30-
protected abstract val receivedCompletions: SharedFlow<HubMessage.Completion>
31-
protected abstract val receivedStreamItems: SharedFlow<HubMessage.StreamItem>
28+
private val receivedInvocations = MutableSharedFlow<HubMessage.Invocation>()
29+
private val receivedCompletions = MutableSharedFlow<HubMessage.Completion>()
30+
private val receivedStreamItems = MutableSharedFlow<HubMessage.StreamItem>()
3231

3332
private val resultProviderRegistry: MutableSet<String> = mutableSetOf()
3433

@@ -96,9 +95,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
9695
try {
9796
it.result.fromJson(resultType)
9897
} catch (ex: SerializationException) {
99-
throw RuntimeException("Completion result could not be parsed as ${resultType.simpleName}: ${it.result}")
98+
throw RuntimeException("Completion result could not be parsed as ${resultType.simpleName}: ${it.result}", ex)
10099
} catch (ex: IllegalArgumentException) {
101-
throw RuntimeException("${resultType.simpleName} could not be initialized from the completion result: ${it.result}")
100+
throw RuntimeException("${resultType.simpleName} could not be initialized from the completion result: ${it.result}", ex)
102101
}
103102
},
104103
)
@@ -162,9 +161,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
162161
try {
163162
it.item.fromJson(itemType)
164163
} catch (ex: SerializationException) {
165-
throw RuntimeException("Completion result could not be parsed as ${itemType.simpleName}: ${it.item}")
164+
throw RuntimeException("Completion result could not be parsed as ${itemType.simpleName}: ${it.item}", ex)
166165
} catch (ex: IllegalArgumentException) {
167-
throw RuntimeException("${itemType.simpleName} could not be initialized from the completion result: ${it.item}")
166+
throw RuntimeException("${itemType.simpleName} could not be initialized from the completion result: ${it.item}", ex)
168167
}
169168
}
170169
.collect { if (!isClosedForSend) send(it) }
@@ -189,6 +188,31 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
189188
}
190189
}
191190

191+
protected suspend fun processReceivedInvocation(message: HubMessage.Invocation) {
192+
if (message is HubMessage.Invocation.Blocking && !resultProviderRegistry.contains(message.target)) {
193+
logger.log(
194+
severity = Logger.Severity.WARNING,
195+
message = "There is no result provider for '${message.target}' despite server expecting it.",
196+
cause = null,
197+
)
198+
199+
complete(
200+
HubMessage.Completion.Error(
201+
invocationId = message.invocationId,
202+
error = "Client did not provide a result."
203+
),
204+
)
205+
}
206+
207+
receivedInvocations.emit(message)
208+
}
209+
210+
protected suspend fun processReceivedStreamItem(message: HubMessage.StreamItem) =
211+
receivedStreamItems.emit(message)
212+
213+
protected suspend fun processReceivedCompletion(message: HubMessage.Completion) =
214+
receivedCompletions.emit(message)
215+
192216
final override fun <T : Any> Flow<HubMessage.Invocation>.handleIncomingInvocation(
193217
resultType: KClass<T>,
194218
callback: suspend (HubMessage.Invocation) -> T,
@@ -245,36 +269,16 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
245269
}
246270

247271
final override fun on(target: String, hasResult: Boolean): Flow<HubMessage.Invocation> {
248-
if (hasResult && resultProviderRegistry.contains(target)) {
272+
if (hasResult && !resultProviderRegistry.add(target)) {
249273
throw IllegalStateException("There can be only one function for returning result on blocking invocation (method: $target)")
250274
}
251275
return receivedInvocations
252276
.run {
253277
if (!hasResult) this
254278
else this
255-
.onSubscription { resultProviderRegistry.add(target) }
256279
.onCompletion { resultProviderRegistry.remove(target) }
257280
}
258281
.filter { it.target == target }
259-
.run {
260-
if (hasResult) this
261-
else this.onEach {
262-
if (it is HubMessage.Invocation.Blocking) {
263-
logger.log(
264-
severity = Logger.Severity.WARNING,
265-
message = "There is no result provider for ${it.target} despite server expecting it.",
266-
cause = null,
267-
)
268-
269-
complete(
270-
HubMessage.Completion.Error(
271-
invocationId = it.invocationId,
272-
error = "Client did not provide a result."
273-
),
274-
)
275-
}
276-
}
277-
}
278282
.onEach { logger.log(Logger.Severity.INFO, "Received invocation: $it", null) }
279283
}
280284
}

signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import io.ktor.serialization.kotlinx.json.*
1515
import io.ktor.util.*
1616
import io.ktor.utils.io.*
1717
import io.ktor.utils.io.core.*
18+
import kotlinx.coroutines.CoroutineDispatcher
1819
import kotlinx.coroutines.CoroutineScope
1920
import kotlinx.coroutines.Dispatchers
2021
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -55,10 +56,11 @@ class HubConnection private constructor(
5556
private val automaticReconnect: AutomaticReconnect,
5657
override val logger: Logger,
5758
json: Json,
59+
dispatcher: CoroutineDispatcher,
5860
) : HubCommunicationLink(json) {
5961

6062
private val job = SupervisorJob()
61-
override val scope = CoroutineScope(job + Dispatchers.IO)
63+
override val scope = CoroutineScope(job + dispatcher)
6264

6365
private val pingReset = MutableSharedFlow<Unit>(extraBufferCapacity = 1)
6466
private val pingTicker = pingReset
@@ -82,10 +84,6 @@ class HubConnection private constructor(
8284
}
8385
}
8486

85-
override val receivedInvocations = MutableSharedFlow<HubMessage.Invocation>()
86-
override val receivedStreamItems = MutableSharedFlow<HubMessage.StreamItem>()
87-
override val receivedCompletions = MutableSharedFlow<HubMessage.Completion>()
88-
8987
private val _connectionState: MutableStateFlow<HubConnectionState> = MutableStateFlow(HubConnectionState.DISCONNECTED)
9088
val connectionState: StateFlow<HubConnectionState> = _connectionState.asStateFlow()
9189

@@ -103,6 +101,7 @@ class HubConnection private constructor(
103101
transportEnum: TransportEnum,
104102
json: Json,
105103
logger: Logger,
104+
dispatcher: CoroutineDispatcher,
106105
) : this(
107106
baseUrl = url.takeIf { it.isNotBlank() } ?: throw IllegalArgumentException("A valid url is required."),
108107
protocol = protocol,
@@ -118,6 +117,7 @@ class HubConnection private constructor(
118117
automaticReconnect = automaticReconnect,
119118
json = json,
120119
logger = logger,
120+
dispatcher = dispatcher,
121121
) {
122122
if (transport != null) {
123123
this.transport = transport
@@ -394,23 +394,12 @@ class HubConnection private constructor(
394394
else stop(message.error)
395395
}
396396

397-
is HubMessage.Invocation -> {
398-
if (message is HubMessage.Invocation.Blocking && !resultProviderRegistry.contains(message.target)) {
399-
logger.log(Logger.Level.ERROR, "There is no result provider for '${message.target}' despite server expecting it.")
400-
401-
complete(HubMessage.Completion.Error(
402-
invocationId = message.invocationId,
403-
error = "Client did not provide a result."),
404-
)
405-
}
406-
407-
receivedInvocations.emit(message)
408-
}
397+
is HubMessage.Invocation -> processReceivedInvocation(message)
409398
is HubMessage.StreamInvocation -> Unit // not supported yet
410399
is HubMessage.Ping -> Unit
411400
is HubMessage.CancelInvocation -> Unit // this should not happen according to standard
412-
is HubMessage.StreamItem -> receivedStreamItems.emit(message)
413-
is HubMessage.Completion -> receivedCompletions.emit(message)
401+
is HubMessage.StreamItem -> processReceivedStreamItem(message)
402+
is HubMessage.Completion -> processReceivedCompletion(message)
414403
}
415404
}
416405
}

signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt renamed to signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Completable.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package eu.lepicekmichal.signalrkore
22

33
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.FlowPreview
45
import kotlinx.coroutines.flow.MutableStateFlow
56
import kotlinx.coroutines.flow.filter
67
import kotlinx.coroutines.flow.first
@@ -9,10 +10,11 @@ import kotlinx.coroutines.withContext
910
import kotlin.time.Duration
1011
import kotlin.time.Duration.Companion.seconds
1112

12-
class CompletableSubject {
13+
class Completable {
1314
private val stateFlow = MutableStateFlow(false)
1415

15-
suspend fun waitForCompletion(timeout: Duration = 30.seconds) = withContext(Dispatchers.Default) {
16+
@OptIn(FlowPreview::class)
17+
suspend fun waitForCompletion(timeout: Duration = 5.seconds) = withContext(Dispatchers.Default) {
1618
stateFlow.filter { it }.timeout(timeout).first()
1719
}
1820

0 commit comments

Comments
 (0)