Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import dev.restate.admin.api.DeploymentApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.client.ApiException
import dev.restate.admin.model.RegisterDeploymentRequest
import dev.restate.admin.model.RegisterDeploymentRequestAnyOf
import dev.restate.admin.model.RegisterHttpDeploymentRequest
import dev.restate.sdk.endpoint.Endpoint
import dev.restate.sdk.http.vertx.RestateHttpServer
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
Expand Down Expand Up @@ -391,7 +391,7 @@ private constructor(
}

private fun discoverDeployment(client: DeploymentApi, uri: String) {
val request = RegisterDeploymentRequest(RegisterDeploymentRequestAnyOf().uri(uri).force(false))
val request = RegisterDeploymentRequest(RegisterHttpDeploymentRequest().uri(uri).force(false))

val response =
Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,14 @@ class FrontCompatibilityTest {
.dryRun(false))

try {
adminApi.updateDeployment(deployment.id, updateRequest)
adminApi.updateDeployment(deployment.httpDeploymentResponse.id, updateRequest)
LOG.info(
"Successfully updated deployment {} to use URI {}", deployment.id, localEndpointURI)
"Successfully updated deployment {} to use URI {}",
deployment.httpDeploymentResponse.id,
localEndpointURI)
} catch (e: Exception) {
LOG.error("Failed to update deployment {}: {}", deployment.id, e.message)
LOG.error(
"Failed to update deployment {}: {}", deployment.httpDeploymentResponse.id, e.message)
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import dev.restate.sdk.kotlin.set
import dev.restate.sdktesting.infra.InjectAdminURI
import dev.restate.sdktesting.infra.InjectClient
import dev.restate.sdktesting.infra.RestateDeployerExtension
import kotlinx.serialization.json.Json
import java.net.URI
import kotlinx.serialization.json.Json
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
Expand Down Expand Up @@ -73,9 +73,11 @@ class StatePatchingTest {

// Patch the state
val newState = "patched-state"
val request = ModifyServiceStateRequest()
.objectKey("test-key")
.newState(mapOf("state" to Json.encodeToString(newState).toByteArray().map { it.toInt() }))
val request =
ModifyServiceStateRequest()
.objectKey("test-key")
.newState(
mapOf("state" to Json.encodeToString(newState).toByteArray().map { it.toInt() }))

serviceApi.modifyServiceState("StateObject", request)

Expand Down
157 changes: 157 additions & 0 deletions src/main/kotlin/dev/restate/sdktesting/tests/TimeTravelTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate SDK Test suite tool,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE
package dev.restate.sdktesting.tests

import dev.restate.admin.api.InvocationApi
import dev.restate.admin.client.ApiClient
import dev.restate.client.Client
import dev.restate.client.kotlin.attachSuspend
import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.Name
import dev.restate.sdk.annotation.Service
import dev.restate.sdk.annotation.VirtualObject
import dev.restate.sdk.endpoint.Endpoint
import dev.restate.sdk.kotlin.*
import dev.restate.sdktesting.infra.InjectAdminURI
import dev.restate.sdktesting.infra.InjectClient
import dev.restate.sdktesting.infra.RestateDeployerExtension
import java.net.URI
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.seconds
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension

class TimeTravelTest {

@VirtualObject
@Name("TimeObject")
class TimeObject {
companion object {
val shouldFail = AtomicBoolean(true)
}

@Handler
suspend fun getState(ctx: ObjectContext): String? {
return ctx.get<String>("state")
}

@Handler
suspend fun testHandler(ctx: ObjectContext): String {
// Call another service, but don't await the response yet
val firstMethodResponse = TimeTravelTestCalleeServiceClient.fromContext(ctx).firstMethod()

// Load if we should fail
val shouldFail = shouldFail.get()

// Set a different state
ctx.set("state", if (shouldFail) "a" else "b")

// Execute a different context run
val runResult = ctx.runAsync("my-run") { if (shouldFail) "a" else "b" }

// Call the second method
val secondMethodResponse =
TimeTravelTestCalleeServiceClient.fromContext(ctx).secondMethod(shouldFail)

// Await all calls and return
return listOf(firstMethodResponse, secondMethodResponse, runResult)
.awaitAll()
.joinToString("-")
}
}

@Service
@Name("CalleeService")
class CalleeService {
companion object {
val firstMethod = AtomicInteger(0)
}

@Handler
suspend fun firstMethod(ctx: Context): String {
ctx.sleep(2.seconds)
// Assert it's called just once
assertThat(firstMethod.incrementAndGet()).isEqualTo(1)
return "first"
}

@Handler
fun secondMethod(ctx: Context, shouldFail: Boolean): String {
if (shouldFail) {
throw RuntimeException("Failing!")
}
return "second"
}
}

companion object {
@RegisterExtension
val deployerExt: RestateDeployerExtension = RestateDeployerExtension {
withEndpoint(Endpoint.bind(TimeObject()).bind(CalleeService()))
}
}

@Test
fun testTimeTravel(
@InjectClient ingressClient: Client,
@InjectAdminURI adminURI: URI,
) = runTest {
// Create clients for the services
val timeClient = TimeTravelTestTimeObjectClient.fromClient(ingressClient, "test-key")

// Send request
val sendResult = timeClient.send().testHandler(init = idempotentCallOptions)
val invocationId = sendResult.invocationId()

// Wait for the invocation to reach the error state
await withAlias
"invocation has all the journal entries" untilAsserted
{
assertThat(getJournal(adminURI, invocationId).rows)
.map<String> { it.entryType }
.containsExactlyInAnyOrder(
"Command: Input",
"Command: Call",
"Notification: CallInvocationId",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: When are we returning from val firstMethodResponse = TimeTravelTestCalleeServiceClient.fromContext(ctx).firstMethod()? Before or after the invocation id was decided?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before, getting the invocation id is an async operation

"Notification: Call",
"Command: SetState",
"Command: Run",
"Notification: Run",
"Command: Call",
Comment on lines +129 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: The notification is allowed to move past Command: Call, right? Is it generated once the run closure completes and persists the value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's why containsExactlyInAnyOrder

Comment on lines +127 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: Will the SDK send the commands in creation order (as the awaitables are created in the code)? I would assume so because otherwise it's probably hard to match them on replay with existing journal entries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the SDK send the commands in creation order

yes but the notification order is established by the runtime

"Notification: CallInvocationId",
)
}

// Find the trim index
val trimIndex =
getJournal(adminURI, invocationId).rows.find { it.entryType == "Command: SetState" }!!.index

// Set shouldFail to false so the handler will succeed after time travel
TimeObject.shouldFail.set(false)

// Use the time travel API to trim entry index 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to trim trimIndex and not a fixed index 2.

val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
invocationApi.timeTravelInvocation(invocationId, trimIndex)

// Wait for the response to be sent back
assertThat(sendResult.attachSuspend().response()).isEqualTo("first-second-b")

// Wait for state to be b
await withAlias
"response is sent back" untilAsserted
{
assertThat(timeClient.getState()).isEqualTo("b")
}
}
}
47 changes: 47 additions & 0 deletions src/main/kotlin/dev/restate/sdktesting/tests/utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
package dev.restate.sdktesting.tests

import dev.restate.common.RequestBuilder
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.UUID
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.future.await
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.kotlin.additionalLoggingContext
import org.awaitility.core.ConditionFactory
Expand All @@ -40,3 +48,42 @@ fun runTest(timeout: Duration = 60.seconds, testBody: suspend TestScope.() -> Un
val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = {
idempotencyKey = UUID.randomUUID().toString()
}

/** Data classes for sys_invocation query result */
@Serializable data class JournalQueryResult(val rows: List<SysInvocationEntry> = emptyList())

@Serializable
data class SysInvocationEntry(val index: Int, @SerialName("entry_type") val entryType: String)

/** JSON parser with configuration for sys_invocation query result */
private val sysJournalJson = Json {
ignoreUnknownKeys = true
coerceInputValues = true
}

/**
* Queries the sys_journal table for a given invocation ID and returns the parsed result.
*
* @param invocationId The ID of the invocation to query
* @param adminURI The URI of the Restate admin API
* @return The parsed result of the query
*/
suspend fun getJournal(adminURI: URI, invocationId: String): JournalQueryResult {
// Create the HTTP request to query sys_invocation
val request =
HttpRequest.newBuilder()
.uri(URI.create("http://${adminURI.host}:${adminURI.port}/query"))
.header("accept", "application/json")
.header("content-type", "application/json")
.POST(
HttpRequest.BodyPublishers.ofString(
"""{"query": "SELECT index, entry_type FROM sys_journal WHERE id = '$invocationId'"}"""))
.build()

// Send the request and get the response
val response =
HttpClient.newHttpClient().sendAsync(request, HttpResponse.BodyHandlers.ofString()).await()

// Parse the response using Kotlin serialization
return sysJournalJson.decodeFromString<JournalQueryResult>(response.body())
}
Loading
Loading