Skip to content

Commit d0d4710

Browse files
authored
Merge pull request #905 from sproctor/io-dispatcher
Use Dispatchers.IO by default on multi-threaded platforms
2 parents 0563bbb + a21113a commit d0d4710

File tree

19 files changed

+117
-14
lines changed

19 files changed

+117
-14
lines changed

Auth/src/commonMain/kotlin/io/github/jan/supabase/auth/AuthConfig.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import io.github.jan.supabase.SupabaseSerializer
55
import io.github.jan.supabase.plugins.CustomSerializationConfig
66
import io.github.jan.supabase.plugins.MainConfig
77
import kotlinx.coroutines.CoroutineDispatcher
8-
import kotlinx.coroutines.Dispatchers
98
import kotlin.time.Duration
109
import kotlin.time.Duration.Companion.seconds
1110

@@ -52,7 +51,8 @@ open class AuthConfigDefaults : MainConfig() {
5251
/**
5352
* The dispatcher used for all auth related network requests
5453
*/
55-
var coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default
54+
@Deprecated("SupabaseClientBuilder.coroutineDispatcher should be used instead")
55+
var coroutineDispatcher: CoroutineDispatcher? = null
5656

5757
/**
5858
* The type of login flow to use. Defaults to [FlowType.IMPLICIT]

Auth/src/commonMain/kotlin/io/github/jan/supabase/auth/AuthImpl.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import io.ktor.http.HttpMethod
4040
import io.ktor.http.HttpStatusCode
4141
import kotlinx.coroutines.CoroutineScope
4242
import kotlinx.coroutines.Job
43+
import kotlinx.coroutines.SupervisorJob
4344
import kotlinx.coroutines.cancel
4445
import kotlinx.coroutines.delay
4546
import kotlinx.coroutines.ensureActive
@@ -72,7 +73,7 @@ internal class AuthImpl(
7273

7374
private val _sessionStatus = MutableStateFlow<SessionStatus>(SessionStatus.Initializing)
7475
override val sessionStatus: StateFlow<SessionStatus> = _sessionStatus.asStateFlow()
75-
internal val authScope = CoroutineScope(config.coroutineDispatcher)
76+
internal val authScope = CoroutineScope((config.coroutineDispatcher ?: supabaseClient.coroutineDispatcher) + SupervisorJob())
7677
override val sessionManager = config.sessionManager ?: createDefaultSessionManager()
7778
override val codeVerifierCache = config.codeVerifierCache ?: createDefaultCodeVerifierCache()
7879

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import io.ktor.http.path
1919
import io.ktor.util.decodeBase64String
2020
import kotlinx.atomicfu.atomic
2121
import kotlinx.coroutines.CoroutineScope
22-
import kotlinx.coroutines.Dispatchers
2322
import kotlinx.coroutines.Job
2423
import kotlinx.coroutines.SupervisorJob
2524
import kotlinx.coroutines.delay
@@ -49,7 +48,7 @@ import kotlin.io.encoding.ExperimentalEncodingApi
4948
override val status: StateFlow<Realtime.Status> = _status.asStateFlow()
5049
private val _subscriptions = AtomicMutableMap<String, RealtimeChannel>()
5150
override val subscriptions: Map<String, RealtimeChannel> = _subscriptions
52-
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
51+
private val scope = CoroutineScope(supabaseClient.coroutineDispatcher + SupervisorJob())
5352
private val mutex = Mutex()
5453
internal var accessToken by atomic<String?>(null)
5554
var heartbeatJob: Job? = null

Storage/src/commonMain/kotlin/io/github/jan/supabase/storage/resumable/ResumableClient.kt

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,20 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
125125
val fingerprint = Fingerprint(source, size)
126126
val cacheEntry = ResumableCacheEntry(uploadUrl, path, storageApi.bucketId, Clock.System.now() + 1.days, uploadOptions.upsert, uploadOptions.contentType.toString())
127127
cache.set(fingerprint, cacheEntry)
128-
return ResumableUploadImpl(fingerprint, path, cacheEntry, channel, 0, chunkSize, uploadUrl, httpClient, storageApi, { retrieveServerOffset(uploadUrl, path) }) {
129-
cache.remove(fingerprint)
130-
}
128+
return ResumableUploadImpl(
129+
fingerprint = fingerprint,
130+
path = path,
131+
cacheEntry = cacheEntry,
132+
createDataStream = channel,
133+
offset = 0,
134+
chunkSize = chunkSize,
135+
locationUrl = uploadUrl,
136+
httpClient = httpClient,
137+
storageApi = storageApi,
138+
retrieveServerOffset = { retrieveServerOffset(uploadUrl, path) },
139+
removeFromCache = { cache.remove(fingerprint) },
140+
coroutineDispatcher = storageApi.supabaseClient.coroutineDispatcher
141+
)
131142
}
132143

133144
private suspend fun resumeUpload(channel: suspend (Long) -> ByteReadChannel, entry: ResumableCacheEntry, source: String, path: String, size: Long): ResumableUploadImpl {
@@ -142,9 +153,20 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
142153
}
143154
val offset = retrieveServerOffset(entry.url, path)
144155
if(offset < size) {
145-
return ResumableUploadImpl(fingerprint, path, entry, channel, offset, chunkSize, entry.url, httpClient, storageApi, { retrieveServerOffset(entry.url, path)}) {
146-
cache.remove(fingerprint)
147-
}
156+
return ResumableUploadImpl(
157+
fingerprint = fingerprint,
158+
path = path,
159+
cacheEntry = entry,
160+
createDataStream = channel,
161+
offset = offset,
162+
chunkSize = chunkSize,
163+
locationUrl = entry.url,
164+
httpClient = httpClient,
165+
storageApi = storageApi,
166+
retrieveServerOffset = { retrieveServerOffset(entry.url, path)},
167+
removeFromCache = { cache.remove(fingerprint) },
168+
coroutineDispatcher = storageApi.supabaseClient.coroutineDispatcher
169+
)
148170
} else error("File already uploaded")
149171
}
150172

Storage/src/commonMain/kotlin/io/github/jan/supabase/storage/resumable/ResumableUpload.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import io.ktor.utils.io.cancel
2424
import io.ktor.utils.io.readFully
2525
import io.ktor.utils.io.writeFully
2626
import kotlinx.atomicfu.atomic
27+
import kotlinx.coroutines.CoroutineDispatcher
2728
import kotlinx.coroutines.CoroutineScope
28-
import kotlinx.coroutines.Dispatchers
2929
import kotlinx.coroutines.cancel
3030
import kotlinx.coroutines.delay
3131
import kotlinx.coroutines.ensureActive
@@ -84,7 +84,8 @@ internal class ResumableUploadImpl(
8484
private val httpClient: HttpClient,
8585
private val storageApi: BucketApi,
8686
private val retrieveServerOffset: suspend () -> Long,
87-
private val removeFromCache: suspend () -> Unit
87+
private val removeFromCache: suspend () -> Unit,
88+
coroutineDispatcher: CoroutineDispatcher
8889
): ResumableUpload {
8990

9091
private val size = fingerprint.size
@@ -93,7 +94,7 @@ internal class ResumableUploadImpl(
9394
private var serverOffset = 0L
9495
private val _stateFlow = MutableStateFlow<ResumableUploadState>(ResumableUploadState(fingerprint, cacheEntry, UploadStatus.Progress(offset, size), paused))
9596
override val stateFlow: StateFlow<ResumableUploadState> = _stateFlow.asStateFlow()
96-
private val scope = CoroutineScope(Dispatchers.Default)
97+
private val scope = CoroutineScope(coroutineDispatcher)
9798
private val config = storageApi.supabaseClient.storage.config.resumable
9899
private lateinit var dataStream: ByteReadChannel
99100

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.github.jan.supabase
2+
3+
import kotlinx.coroutines.Dispatchers
4+
5+
actual val defaultDispatcher = Dispatchers.IO
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.github.jan.supabase
2+
3+
import kotlinx.coroutines.CoroutineDispatcher
4+
5+
internal expect val defaultDispatcher: CoroutineDispatcher

Supabase/src/commonMain/kotlin/io/github/jan/supabase/SupabaseClient.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.github.jan.supabase.logging.i
99
import io.github.jan.supabase.network.KtorSupabaseHttpClient
1010
import io.github.jan.supabase.plugins.PluginManager
1111
import io.github.jan.supabase.plugins.SupabasePlugin
12+
import kotlinx.coroutines.CoroutineDispatcher
1213

1314
/**
1415
* The main class to interact with Supabase.
@@ -58,6 +59,12 @@ interface SupabaseClient {
5859
@SupabaseInternal
5960
val accessToken: AccessTokenProvider?
6061

62+
/**
63+
* The default dispatcher used for launching long running jobs
64+
*/
65+
@SupabaseInternal
66+
val coroutineDispatcher: CoroutineDispatcher
67+
6168
/**
6269
* Releases all resources held by the [httpClient] and all plugins the [pluginManager]
6370
*/
@@ -93,6 +100,7 @@ internal class SupabaseClientImpl(
93100
override val supabaseUrl: String = config.supabaseUrl
94101
override val supabaseKey: String = config.supabaseKey
95102
override val useHTTPS: Boolean = config.networkConfig.useHTTPS
103+
override val coroutineDispatcher: CoroutineDispatcher = config.coroutineDispatcher
96104

97105
init {
98106
SupabaseClient.LOGGER.i {

Supabase/src/commonMain/kotlin/io/github/jan/supabase/SupabaseClientBuilder.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import io.github.jan.supabase.serializer.KotlinXSerializer
1010
import io.ktor.client.HttpClientConfig
1111
import io.ktor.client.engine.HttpClientEngine
1212
import io.ktor.client.plugins.HttpRequestTimeoutException
13+
import kotlinx.coroutines.CoroutineDispatcher
1314
import kotlinx.serialization.json.Json
1415
import kotlin.time.Duration
1516
import kotlin.time.Duration.Companion.seconds
@@ -69,6 +70,13 @@ class SupabaseClientBuilder @PublishedApi internal constructor(private val supab
6970
*/
7071
var defaultSerializer: SupabaseSerializer = KotlinXSerializer(Json { ignoreUnknownKeys = true })
7172

73+
/**
74+
* The CoroutineDispatcher used for launching long running jobs.
75+
*
76+
* Default: [defaultDispatcher], the IO Dispatcher on supported targets.
77+
*/
78+
var coroutineDispatcher: CoroutineDispatcher = defaultDispatcher
79+
7280
/**
7381
* Optional function for using a third-party authentication system with
7482
* Supabase. The function should return an access token or ID token (JWT) by
@@ -114,6 +122,7 @@ class SupabaseClientBuilder @PublishedApi internal constructor(private val supab
114122
requestTimeout = requestTimeout
115123
),
116124
defaultSerializer = defaultSerializer,
125+
coroutineDispatcher = coroutineDispatcher,
117126
accessToken = accessToken,
118127
plugins = plugins
119128
)

Supabase/src/commonMain/kotlin/io/github/jan/supabase/SupabaseClientConfig.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.github.jan.supabase
22

33
import io.github.jan.supabase.logging.LogLevel
44
import io.ktor.client.engine.HttpClientEngine
5+
import kotlinx.coroutines.CoroutineDispatcher
56
import kotlin.time.Duration
67

78
internal data class SupabaseClientConfig(
@@ -10,6 +11,7 @@ internal data class SupabaseClientConfig(
1011
val defaultLogLevel: LogLevel,
1112
val networkConfig: SupabaseNetworkConfig,
1213
val defaultSerializer: SupabaseSerializer,
14+
val coroutineDispatcher: CoroutineDispatcher,
1315
val accessToken: AccessTokenProvider?,
1416
val plugins: Map<String, PluginProvider>
1517
)

0 commit comments

Comments
 (0)