Skip to content

Commit e65d1f2

Browse files
authored
feat: TokenSource implementation (#769)
* TokenSource implementation * more testing * spotless * fix naming to match protobuf * changeset * add in room name and participant name values to response * Add invalidate and cachedResponse functions to CachingTokenSource * spotless * remove roomconfig from token request options and replace with user facing options * Expose more details in TokenPayload * fix tests
1 parent a122b1a commit e65d1f2

File tree

19 files changed

+1087
-6
lines changed

19 files changed

+1087
-6
lines changed

.changeset/sixty-baboons-yawn.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": minor
3+
---
4+
5+
Add TokenSource implementation for use with token servers

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dagger = "2.46"
1414
groupie = "2.9.0"
1515
junit-lib = "4.13.2"
1616
junit-jupiter = "5.5.0"
17+
jwtdecode = "2.0.2"
1718
klaxon = "5.5"
1819
kotlinx-serialization = "1.5.0"
1920
leakcanaryAndroid = "2.8.1"
@@ -49,6 +50,7 @@ dagger-lib = { module = "com.google.dagger:dagger", version.ref = "dagger" }
4950
dagger-compiler = { module = "com.google.dagger:dagger-compiler", version.ref = "dagger" }
5051
groupie = { module = "com.github.lisawray.groupie:groupie", version.ref = "groupie" }
5152
groupie-viewbinding = { module = "com.github.lisawray.groupie:groupie-viewbinding", version.ref = "groupie" }
53+
jwtdecode = { module = "com.auth0.android:jwtdecode", version.ref = "jwtdecode" }
5254
klaxon = { module = "com.beust:klaxon", version.ref = "klaxon" }
5355
noise = { module = "com.github.paramsen:noise", version.ref = "noise" }
5456
androidx-lifecycle-common-java8 = { module = "androidx.lifecycle:lifecycle-common-java8", version.ref = "androidx-lifecycle" }

livekit-android-sdk/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ dependencies {
149149
implementation libs.okhttp.coroutines
150150
api libs.audioswitch
151151
implementation libs.klaxon
152+
implementation libs.jwtdecode
152153

153154
implementation libs.androidx.annotation
154155
implementation libs.androidx.core

livekit-android-sdk/src/main/java/io/livekit/android/dagger/WebModule.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal object WebModule {
4343
@Named(InjectionNames.OVERRIDE_OKHTTP)
4444
okHttpClientOverride: OkHttpClient?,
4545
): OkHttpClient {
46-
return okHttpClientOverride ?: OkHttpClient()
46+
return okHttpClientOverride ?: globalOkHttpClient
4747
}
4848

4949
@Provides
@@ -79,3 +79,8 @@ internal object WebModule {
7979
}
8080
}
8181
}
82+
83+
/**
84+
* Singleton okhttpclient to avoid recreating each time a new Room is created.
85+
*/
86+
internal val globalOkHttpClient by lazy { OkHttpClient() }

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,6 +1270,10 @@ internal constructor(
12701270
-> {
12711271
LKLog.v { "invalid value for data packet" }
12721272
}
1273+
1274+
LivekitModels.DataPacket.ValueCase.ENCRYPTED_PACKET -> {
1275+
// TODO
1276+
}
12731277
}
12741278
}
12751279

livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,10 @@ constructor(
796796
LivekitRtc.SignalResponse.MessageCase.MEDIA_SECTIONS_REQUIREMENT -> {
797797
// TODO
798798
}
799+
800+
LivekitRtc.SignalResponse.MessageCase.SUBSCRIBED_AUDIO_CODEC_UPDATE -> {
801+
// TODO
802+
}
799803
}
800804
}
801805

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2025 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.token
18+
19+
import io.livekit.android.util.LKLog
20+
import java.util.Date
21+
import kotlin.time.Duration
22+
import kotlin.time.Duration.Companion.seconds
23+
24+
abstract class BaseCachingTokenSource(
25+
private val store: TokenStore,
26+
private val validator: TokenValidator,
27+
) {
28+
29+
/**
30+
* The entrypoint for the caching store; subclasses should call this from their fetch methods.
31+
*
32+
* If a new token is needed, [fetchFromSource] will be called.
33+
*/
34+
internal suspend fun fetchImpl(options: TokenRequestOptions?): TokenSourceResponse {
35+
val cached = store.retrieve()
36+
37+
if (cached != null && cached.options == options && validator.invoke(cached.options, cached.response)) {
38+
return cached.response
39+
}
40+
41+
val response = fetchFromSource(options)
42+
store.store(options, response)
43+
return response
44+
}
45+
46+
/**
47+
* Implement this to fetch the [TokenSourceResponse] from the token source.
48+
*/
49+
abstract suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse
50+
51+
/**
52+
* Invalidate the cached credentials, forcing a fresh fetch on the next request.
53+
*/
54+
suspend fun invalidate() {
55+
store.clear()
56+
}
57+
58+
/**
59+
* Get the cached credentials if one exists.
60+
*/
61+
suspend fun cachedResponse(): TokenSourceResponse? {
62+
return store.retrieve()?.response
63+
}
64+
}
65+
66+
class CachingFixedTokenSource(
67+
private val source: FixedTokenSource,
68+
store: TokenStore,
69+
validator: TokenValidator,
70+
) : BaseCachingTokenSource(store, validator), FixedTokenSource {
71+
override suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse {
72+
return source.fetch()
73+
}
74+
75+
override suspend fun fetch(): TokenSourceResponse {
76+
return fetchImpl(null)
77+
}
78+
}
79+
80+
class CachingConfigurableTokenSource(
81+
private val source: ConfigurableTokenSource,
82+
store: TokenStore,
83+
validator: TokenValidator,
84+
) : BaseCachingTokenSource(store, validator), ConfigurableTokenSource {
85+
override suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse {
86+
return source.fetch(options ?: TokenRequestOptions())
87+
}
88+
89+
override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse {
90+
return fetchImpl(options)
91+
}
92+
}
93+
94+
/**
95+
* Wraps the token store with a cache so that it reuses the token as long as it is valid.
96+
*/
97+
fun FixedTokenSource.cached(
98+
store: TokenStore = InMemoryTokenStore(),
99+
validator: TokenValidator = defaultValidator,
100+
) = CachingFixedTokenSource(this, store, validator)
101+
102+
/**
103+
* Wraps the token store with a cache so that it reuses the token as long as it is valid.
104+
*
105+
* If the request options passed to [ConfigurableTokenSource.fetch] change, a new token
106+
* will be fetched.
107+
*/
108+
fun ConfigurableTokenSource.cached(
109+
store: TokenStore = InMemoryTokenStore(),
110+
validator: TokenValidator = defaultValidator,
111+
) = CachingConfigurableTokenSource(this, store, validator)
112+
113+
typealias TokenValidator = (options: TokenRequestOptions?, response: TokenSourceResponse) -> Boolean
114+
115+
interface TokenStore {
116+
suspend fun retrieve(): Item?
117+
suspend fun store(options: TokenRequestOptions?, response: TokenSourceResponse)
118+
suspend fun clear()
119+
120+
data class Item(
121+
val options: TokenRequestOptions?,
122+
val response: TokenSourceResponse,
123+
)
124+
}
125+
126+
internal class InMemoryTokenStore() : TokenStore {
127+
var item: TokenStore.Item? = null
128+
override suspend fun retrieve(): TokenStore.Item? = item
129+
130+
override suspend fun store(options: TokenRequestOptions?, response: TokenSourceResponse) {
131+
item = TokenStore.Item(options, response)
132+
}
133+
134+
override suspend fun clear() {
135+
item = null
136+
}
137+
}
138+
139+
private val defaultValidator: TokenValidator = { options, response ->
140+
response.hasValidToken()
141+
}
142+
143+
/**
144+
* Validates whether the JWT token is still valid.
145+
*/
146+
fun TokenSourceResponse.hasValidToken(tolerance: Duration = 60.seconds, date: Date = Date()): Boolean {
147+
try {
148+
val jwt = TokenPayload(participantToken)
149+
val now = Date()
150+
val expiresAt = jwt.expiresAt
151+
val nbf = jwt.notBefore
152+
153+
val isBefore = nbf != null && now.before(nbf)
154+
val hasExpired = expiresAt != null && now.after(Date(expiresAt.time + tolerance.inWholeMilliseconds))
155+
156+
return !isBefore && !hasExpired
157+
} catch (e: Exception) {
158+
LKLog.i(e) { "Could not validate existing token" }
159+
return false
160+
}
161+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.token
18+
19+
internal class CustomTokenSource(val block: suspend (options: TokenRequestOptions) -> TokenSourceResponse) : ConfigurableTokenSource {
20+
override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse {
21+
return block(options)
22+
}
23+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2025 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.token
18+
19+
import io.livekit.android.dagger.globalOkHttpClient
20+
import kotlinx.coroutines.suspendCancellableCoroutine
21+
import kotlinx.serialization.ExperimentalSerializationApi
22+
import kotlinx.serialization.decodeFromString
23+
import kotlinx.serialization.encodeToString
24+
import kotlinx.serialization.json.Json
25+
import kotlinx.serialization.json.JsonNamingStrategy
26+
import okhttp3.Call
27+
import okhttp3.Callback
28+
import okhttp3.Request
29+
import okhttp3.RequestBody.Companion.toRequestBody
30+
import okhttp3.Response
31+
import java.io.IOException
32+
import java.net.URL
33+
import kotlin.coroutines.resume
34+
import kotlin.coroutines.resumeWithException
35+
36+
internal class EndpointTokenSourceImpl(
37+
override val url: URL,
38+
override val method: String,
39+
override val headers: Map<String, String>,
40+
) : EndpointTokenSource
41+
42+
data class SandboxTokenServerOptions(
43+
val baseUrl: String? = null,
44+
)
45+
46+
internal class SandboxTokenSource(sandboxId: String, options: SandboxTokenServerOptions) : EndpointTokenSource {
47+
override val url: URL = URL("${options.baseUrl ?: "https://cloud-api.livekit.io"}/api/v2/sandbox/connection-details")
48+
override val headers: Map<String, String> = mapOf(
49+
"X-Sandbox-ID" to sandboxId,
50+
)
51+
}
52+
53+
internal interface EndpointTokenSource : ConfigurableTokenSource {
54+
/** The url to fetch the token from */
55+
val url: URL
56+
57+
/** The HTTP method to use (defaults to "POST") */
58+
val method: String
59+
get() = "POST"
60+
61+
/** Additional HTTP headers to include with the request */
62+
val headers: Map<String, String>
63+
64+
@OptIn(ExperimentalSerializationApi::class)
65+
override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse = suspendCancellableCoroutine { continuation ->
66+
try {
67+
val okHttpClient = globalOkHttpClient
68+
69+
val snakeCaseJson = Json {
70+
namingStrategy = JsonNamingStrategy.SnakeCase
71+
ignoreUnknownKeys = true
72+
explicitNulls = false
73+
}
74+
75+
// v1 token server returns camelCase keys
76+
val camelCaseJson = Json {
77+
ignoreUnknownKeys = true
78+
explicitNulls = false
79+
}
80+
val body = snakeCaseJson.encodeToString(options.toRequest())
81+
82+
val request = Request.Builder()
83+
.url(url)
84+
.method(method, body.toRequestBody())
85+
.addHeader("Content-Type", "application/json")
86+
.apply {
87+
headers.forEach { (key, value) ->
88+
addHeader(key, value)
89+
}
90+
}
91+
.build()
92+
93+
val call = okHttpClient.newCall(request)
94+
call.enqueue(
95+
object : Callback {
96+
override fun onResponse(call: Call, response: Response) {
97+
val bodyStr = response.body?.string()
98+
if (bodyStr == null) {
99+
continuation.resumeWithException(NullPointerException("No response returned from server"))
100+
return
101+
}
102+
103+
var tokenResponse: TokenSourceResponse? = null
104+
try {
105+
tokenResponse = snakeCaseJson.decodeFromString<TokenSourceResponse>(bodyStr)
106+
} catch (e: Exception) {
107+
}
108+
109+
if (tokenResponse == null) {
110+
// snake_case decoding failed, try camelCase decoding for v1 back compatibility
111+
try {
112+
tokenResponse = camelCaseJson.decodeFromString<TokenSourceResponse>(bodyStr)
113+
} catch (e: Exception) {
114+
continuation.resumeWithException(IllegalArgumentException("Failed to decode response from token server", e))
115+
return
116+
}
117+
}
118+
119+
continuation.resume(tokenResponse)
120+
}
121+
122+
override fun onFailure(call: Call, e: IOException) {
123+
continuation.resumeWithException(e)
124+
}
125+
},
126+
)
127+
} catch (e: Exception) {
128+
continuation.resumeWithException(e)
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)