@@ -15,6 +15,7 @@ import kotlin.contracts.InvocationKind
1515import kotlin.contracts.contract
1616import kotlin.coroutines.CoroutineContext
1717import kotlin.coroutines.coroutineContext
18+ import kotlin.js.JsName
1819
1920/* *
2021 * Stream scope handles all RPC streams that are launched inside it.
@@ -26,33 +27,46 @@ import kotlin.coroutines.coroutineContext
2627 * Stream scope is a child of the [CoroutineContext] it was created in.
2728 * Failure of one request will not cancel all streams in the others.
2829 */
29- @InternalRPCApi
3030@OptIn(InternalCoroutinesApi ::class )
31- public class StreamScope (
31+ public class StreamScope internal constructor (
3232 parentContext : CoroutineContext ,
3333 internal val role : Role ,
34- ) : CoroutineContext.Element, AutoCloseable {
35- internal companion object Key : CoroutineContext.Key<StreamScope>
34+ ): AutoCloseable {
35+ internal class Element (internal val scope : StreamScope ) : CoroutineContext.Element {
36+ override val key: CoroutineContext .Key <Element > = Key
37+
38+ internal companion object Key : CoroutineContext.Key<Element>
39+ }
3640
37- override val key : CoroutineContext . Key < StreamScope > = Key
41+ internal val contextElement = Element ( this )
3842
3943 private val scopeJob = SupervisorJob (parentContext.job)
4044
4145 private val requests = ConcurrentHashMap <String , CoroutineScope >()
4246
47+ init {
48+ scopeJob.invokeOnCompletion {
49+ close()
50+ }
51+ }
52+
53+ @InternalRPCApi
4354 public fun onScopeCompletion (handler : (Throwable ? ) -> Unit ) {
4455 scopeJob.invokeOnCompletion(handler)
4556 }
4657
58+ @InternalRPCApi
4759 public fun onScopeCompletion (callId : String , handler : (Throwable ? ) -> Unit ) {
4860 getRequestScope(callId).coroutineContext.job.invokeOnCompletion(onCancelling = true , handler = handler)
4961 }
5062
63+ @InternalRPCApi
5164 public fun cancelRequestScopeById (callId : String , message : String , cause : Throwable ? ): Job ? {
5265 return requests.remove(callId)?.apply { cancel(message, cause) }?.coroutineContext?.job
5366 }
5467
5568 // Group stream launches by callId. In case one fails, so do others
69+ @InternalRPCApi
5670 public fun launch (callId : String , block : suspend CoroutineScope .() -> Unit ): Job {
5771 return getRequestScope(callId).launch(block = block)
5872 }
@@ -86,19 +100,19 @@ public fun CoroutineContext.withServerStreamScope(): CoroutineContext = withStre
86100
87101@OptIn(InternalCoroutinesApi ::class )
88102internal fun CoroutineContext.withStreamScope (role : StreamScope .Role ): CoroutineContext {
89- return this + StreamScope (this , role).apply {
90- this @withStreamScope.job.invokeOnCompletion(onCancelling = true ) { close() }
103+ return this + StreamScope (this , role).contextElement. apply {
104+ this @withStreamScope.job.invokeOnCompletion(onCancelling = true ) { scope. close() }
91105 }
92106}
93107
94108@InternalRPCApi
95109public suspend fun streamScopeOrNull (): StreamScope ? {
96- return currentCoroutineContext()[StreamScope .Key ]
110+ return currentCoroutineContext()[StreamScope .Element . Key ]?.scope
97111}
98112
99113@InternalRPCApi
100114public fun streamScopeOrNull (scope : CoroutineScope ): StreamScope ? {
101- return scope.coroutineContext[StreamScope .Key ]
115+ return scope.coroutineContext[StreamScope .Element . Key ]?.scope
102116}
103117
104118internal fun noStreamScopeError (): Nothing {
@@ -165,22 +179,53 @@ public suspend fun <T> streamScoped(block: suspend CoroutineScope.() -> T): T {
165179 }
166180
167181 val context = currentCoroutineContext()
182+ .apply {
183+ checkContextForStreamScope()
184+ }
185+
186+ val streamScope = StreamScope (context, StreamScope .Role .Client )
187+
188+ return withContext(streamScope.contextElement) {
189+ streamScope.use {
190+ block()
191+ }
192+ }
193+ }
168194
169- if (context[StreamScope .Key ] != null ) {
195+ private fun CoroutineContext.checkContextForStreamScope () {
196+ if (this [StreamScope .Element ] != null ) {
170197 error(
171198 " One of the following caused a failure: \n " +
172199 " - nested 'streamScoped' calls are not allowed.\n " +
173200 " - 'streamScoped' calls are not allowed in server RPC services."
174201 )
175202 }
203+ }
176204
177- val streamScope = StreamScope (context, StreamScope .Role .Client )
205+ /* *
206+ * Creates a [StreamScope] entity for manual stream management.
207+ */
208+ @JsName(" StreamScope_fun" )
209+ @ExperimentalRPCApi
210+ public fun StreamScope (parent : CoroutineContext ): StreamScope {
211+ parent.checkContextForStreamScope()
178212
179- return withContext(streamScope) {
180- streamScope.use {
181- block()
182- }
213+ return StreamScope (parent, StreamScope .Role .Client )
214+ }
215+
216+ /* *
217+ * Adds manually managed [StreamScope] to the current context.
218+ */
219+ @OptIn(ExperimentalContracts ::class )
220+ @ExperimentalRPCApi
221+ public suspend fun <T > withStreamScope (scope : StreamScope , block : suspend CoroutineScope .() -> T ): T {
222+ contract {
223+ callsInPlace(block, InvocationKind .EXACTLY_ONCE )
183224 }
225+
226+ currentCoroutineContext().checkContextForStreamScope()
227+
228+ return withContext(scope.contextElement, block)
184229}
185230
186231/* *
0 commit comments