Skip to content

Commit fd4ab7e

Browse files
Adapt to SDK test suite v3.0 (#458)
1 parent f4f0602 commit fd4ab7e

File tree

16 files changed

+307
-116
lines changed

16 files changed

+307
-116
lines changed

.github/workflows/integration.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ jobs:
106106
- name: Build restatedev/java-test-services image
107107
run: ./gradlew -Djib.console=plain :test-services:jibDockerBuild
108108

109-
- name: Run test tool 2.4
110-
uses: restatedev/sdk-test-suite@v2.4
109+
- name: Run test tool 3.0
110+
uses: restatedev/sdk-test-suite@v3.0
111111
with:
112112
envVars: ${{ inputs.envVars }}
113113
testArtifactOutput: ${{ inputs.testArtifactOutput != '' && inputs.testArtifactOutput || 'sdk-java-integration-test-report' }}

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,13 +460,21 @@ sealed interface Awaitable<T> {
460460
return wrapAllAwaitable(listOf(first) + listOf(second) + others.asList())
461461
}
462462

463+
fun all(awaitables: List<Awaitable<*>>): Awaitable<Unit> {
464+
return wrapAllAwaitable(awaitables)
465+
}
466+
463467
fun any(
464468
first: Awaitable<*>,
465469
second: Awaitable<*>,
466470
vararg others: Awaitable<*>
467471
): Awaitable<Int> {
468472
return wrapAnyAwaitable(listOf(first) + listOf(second) + others.asList())
469473
}
474+
475+
fun any(awaitables: List<Awaitable<*>>): Awaitable<Int> {
476+
return wrapAnyAwaitable(awaitables)
477+
}
470478
}
471479
}
472480

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/awaitables.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,15 @@ internal open class SingleAwaitableImpl<T : Any?>(private val asyncResult: Async
147147
}
148148

149149
internal fun wrapAllAwaitable(awaitables: List<Awaitable<*>>): Awaitable<Unit> {
150+
check(awaitables.isNotEmpty()) { "The awaitables list should be non empty" }
150151
val ctx = (awaitables.get(0) as BaseAwaitableImpl<*>).asyncResult().ctx()
151152
return SingleAwaitableImpl(
152153
ctx.createAllAsyncResult(awaitables.map { (it as BaseAwaitableImpl<*>).asyncResult() }))
153154
.simpleMap {}
154155
}
155156

156157
internal fun wrapAnyAwaitable(awaitables: List<Awaitable<*>>): BaseAwaitableImpl<Int> {
158+
check(awaitables.isNotEmpty()) { "The awaitables list should be non empty" }
157159
val ctx = (awaitables.get(0) as BaseAwaitableImpl<*>).asyncResult().ctx()
158160
return SingleAwaitableImpl(
159161
ctx.createAnyAsyncResult(awaitables.map { (it as BaseAwaitableImpl<*>).asyncResult() }))

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,11 @@ public int send(
329329
@Nullable String idempotencyKey,
330330
@Nullable Collection<Map.Entry<String, String>> headers,
331331
@Nullable Duration delay) {
332-
LOG.debug("Executing 'Send {}'", target);
332+
if (delay != null && !delay.isZero()) {
333+
LOG.debug("Executing 'Delayed send {} with delay {}'", target, delay);
334+
} else {
335+
LOG.debug("Executing 'Send {}'", target);
336+
}
333337
if (idempotencyKey != null && idempotencyKey.isBlank()) {
334338
throw ProtocolException.idempotencyKeyIsEmpty();
335339
}

test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import dev.restate.sdk.auth.signing.RestateRequestIdentityVerifier
1212
import dev.restate.sdk.http.vertx.RestateHttpServer
1313
import dev.restate.sdk.kotlin.endpoint.endpoint
1414
import dev.restate.sdk.testservices.contracts.*
15+
import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreterMetadata
1516

1617
val KNOWN_SERVICES_FACTORIES: Map<String, () -> Any> =
1718
mapOf(
@@ -28,6 +29,10 @@ val KNOWN_SERVICES_FACTORIES: Map<String, () -> Any> =
2829
NonDeterministicMetadata.SERVICE_NAME to { NonDeterministicImpl() },
2930
ProxyMetadata.SERVICE_NAME to { ProxyImpl() },
3031
TestUtilsServiceMetadata.SERVICE_NAME to { TestUtilsServiceImpl() },
32+
VirtualObjectCommandInterpreterMetadata.SERVICE_NAME to
33+
{
34+
VirtualObjectCommandInterpreterImpl()
35+
},
3136
interpreterName(0) to { ObjectInterpreterImpl.getInterpreterDefinition(0) },
3237
interpreterName(1) to { ObjectInterpreterImpl.getInterpreterDefinition(1) },
3338
interpreterName(2) to { ObjectInterpreterImpl.getInterpreterDefinition(2) },

test-services/src/main/kotlin/dev/restate/sdk/testservices/ProxyImpl.kt

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,26 @@ class ProxyImpl : Proxy {
3030

3131
override suspend fun call(context: Context, request: ProxyRequest): ByteArray {
3232
return context
33-
.call(Request.of(request.toTarget(), Serde.RAW, Serde.RAW, request.message))
33+
.call(
34+
Request.of(request.toTarget(), Serde.RAW, Serde.RAW, request.message).also {
35+
if (request.idempotencyKey != null) {
36+
it.idempotencyKey = request.idempotencyKey
37+
}
38+
})
3439
.await()
3540
}
3641

37-
override suspend fun oneWayCall(context: Context, request: ProxyRequest): Unit {
38-
val ignored =
39-
context.send(
40-
SendRequest.of(request.toTarget(), Serde.RAW, Serde.SLICE, request.message)
41-
.asSendDelayed((request.delayMillis?.milliseconds ?: Duration.ZERO)))
42-
}
42+
override suspend fun oneWayCall(context: Context, request: ProxyRequest): String =
43+
context
44+
.send(
45+
SendRequest.of(request.toTarget(), Serde.RAW, Serde.SLICE, request.message)
46+
.also {
47+
if (request.idempotencyKey != null) {
48+
it.idempotencyKey = request.idempotencyKey
49+
}
50+
}
51+
.asSendDelayed((request.delayMillis?.milliseconds ?: Duration.ZERO)))
52+
.invocationId()
4353

4454
override suspend fun manyCalls(context: Context, requests: List<ManyCallRequest>) {
4555
val toAwait = mutableListOf<Awaitable<ByteArray>>()
@@ -52,15 +62,25 @@ class ProxyImpl : Proxy {
5262
Serde.RAW,
5363
Serde.SLICE,
5464
request.proxyRequest.message)
65+
.also {
66+
if (request.proxyRequest.idempotencyKey != null) {
67+
it.idempotencyKey = request.proxyRequest.idempotencyKey
68+
}
69+
}
5570
.asSendDelayed((request.proxyRequest.delayMillis?.milliseconds ?: Duration.ZERO)))
5671
} else {
5772
val awaitable =
5873
context.call(
5974
Request.of(
60-
request.proxyRequest.toTarget(),
61-
Serde.RAW,
62-
Serde.RAW,
63-
request.proxyRequest.message))
75+
request.proxyRequest.toTarget(),
76+
Serde.RAW,
77+
Serde.RAW,
78+
request.proxyRequest.message)
79+
.also {
80+
if (request.proxyRequest.idempotencyKey != null) {
81+
it.idempotencyKey = request.proxyRequest.idempotencyKey
82+
}
83+
})
6484
if (request.awaitAtTheEnd) {
6585
toAwait.add(awaitable)
6686
}

test-services/src/main/kotlin/dev/restate/sdk/testservices/TestUtilsServiceImpl.kt

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,6 @@ class TestUtilsServiceImpl : TestUtilsService {
3131
return input
3232
}
3333

34-
override suspend fun createAwakeableAndAwaitIt(
35-
ctx: Context,
36-
req: CreateAwakeableAndAwaitItRequest
37-
): CreateAwakeableAndAwaitItResponse {
38-
val awakeable = ctx.awakeable<String>()
39-
AwakeableHolderClient.fromContext(ctx, req.awakeableKey).hold(awakeable.id)
40-
41-
if (req.awaitTimeout == null) {
42-
return AwakeableResultResponse(awakeable.await())
43-
}
44-
45-
val timeout = ctx.timer(req.awaitTimeout.milliseconds)
46-
return select {
47-
awakeable.onAwait { AwakeableResultResponse(it) }
48-
timeout.onAwait { TimeoutResponse }
49-
}
50-
.await()
51-
}
52-
5334
override suspend fun sleepConcurrently(context: Context, millisDuration: List<Long>) {
5435
val timers = millisDuration.map { context.timer(it.milliseconds) }.toList()
5536

@@ -66,21 +47,7 @@ class TestUtilsServiceImpl : TestUtilsService {
6647
return invokedSideEffects.get()
6748
}
6849

69-
override suspend fun getEnvVariable(context: Context, env: String): String {
70-
return context.runBlock { System.getenv(env) ?: "" }
71-
}
72-
73-
override suspend fun interpretCommands(context: Context, req: InterpretRequest) {
74-
val listClient = ListObjectClient.fromContext(context, req.listName).send()
75-
req.commands.forEach {
76-
when (it) {
77-
is CreateAwakeableAndAwaitIt -> {
78-
val awakeable = context.awakeable<String>()
79-
AwakeableHolderClient.fromContext(context, it.awakeableKey).hold(awakeable.id)
80-
listClient.append(awakeable.await())
81-
}
82-
is GetEnvVariable -> listClient.append(getEnvVariable(context, it.envName))
83-
}
84-
}
50+
override suspend fun cancelInvocation(context: Context, invocationId: String) {
51+
context.invocationHandle<Unit>(invocationId).cancel()
8552
}
8653
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.sdk.testservices
10+
11+
import dev.restate.sdk.kotlin.*
12+
import dev.restate.sdk.kotlin.get
13+
import dev.restate.sdk.types.TerminalException
14+
import dev.restate.sdk.types.TimeoutException
15+
import dev.restate.sdktesting.contracts.VirtualObjectCommandInterpreter
16+
import kotlin.time.Duration.Companion.milliseconds
17+
import org.apache.logging.log4j.LogManager
18+
import org.apache.logging.log4j.Logger
19+
20+
class VirtualObjectCommandInterpreterImpl : VirtualObjectCommandInterpreter {
21+
22+
companion object {
23+
private val LOG: Logger = LogManager.getLogger(VirtualObjectCommandInterpreterImpl::class.java)
24+
}
25+
26+
override suspend fun interpretCommands(
27+
context: ObjectContext,
28+
req: VirtualObjectCommandInterpreter.InterpretRequest
29+
): String {
30+
LOG.info("Interpreting commands {}", req)
31+
32+
var result = ""
33+
34+
req.commands.forEach {
35+
LOG.info("Start interpreting command {}", it)
36+
when (it) {
37+
is VirtualObjectCommandInterpreter.AwaitAny -> {
38+
val cmds = it.commands.map { it.toAwaitable(context) }
39+
result =
40+
select {
41+
for (cmd in cmds) {
42+
cmd.onAwait { it }
43+
}
44+
}
45+
.await()
46+
}
47+
is VirtualObjectCommandInterpreter.AwaitAnySuccessful -> {
48+
val cmds = it.commands.map { it.toAwaitable(context) }.toMutableList()
49+
50+
while (true) {
51+
@Suppress("UNCHECKED_CAST")
52+
val completed = Awaitable.any(cmds as List<Awaitable<*>>).await()
53+
54+
try {
55+
result = cmds[completed].await()
56+
break
57+
} catch (_: TerminalException) {
58+
// Remove the cmd to make sure we don't fail on it again
59+
cmds.removeAt(completed)
60+
}
61+
}
62+
}
63+
is VirtualObjectCommandInterpreter.AwaitOne -> {
64+
result = it.command.toAwaitable(context).await()
65+
}
66+
is VirtualObjectCommandInterpreter.GetEnvVariable -> {
67+
result = context.runBlock { System.getenv(it.envName) ?: "" }
68+
}
69+
is VirtualObjectCommandInterpreter.ResolveAwakeable -> {
70+
resolveAwakeable(context, it)
71+
result = ""
72+
}
73+
is VirtualObjectCommandInterpreter.RejectAwakeable -> {
74+
rejectAwakeable(context, it)
75+
result = ""
76+
}
77+
is VirtualObjectCommandInterpreter.AwaitAwakeableOrTimeout -> {
78+
val awk = context.awakeable<String>()
79+
context.set("awk-${it.awakeableKey}", awk.id)
80+
try {
81+
result = awk.await(it.timeoutMillis.milliseconds)
82+
} catch (_: TimeoutException) {
83+
throw TerminalException("await-timeout")
84+
}
85+
}
86+
}
87+
LOG.info("Command result {}", result)
88+
appendResult(context, result)
89+
}
90+
91+
return result
92+
}
93+
94+
override suspend fun resolveAwakeable(
95+
context: SharedObjectContext,
96+
resolveAwakeable: VirtualObjectCommandInterpreter.ResolveAwakeable
97+
) {
98+
context
99+
.awakeableHandle(
100+
context.get("awk-${resolveAwakeable.awakeableKey}")
101+
?: throw TerminalException("awakeable is not registerd yet"))
102+
.resolve(resolveAwakeable.value)
103+
}
104+
105+
override suspend fun rejectAwakeable(
106+
context: SharedObjectContext,
107+
rejectAwakeable: VirtualObjectCommandInterpreter.RejectAwakeable
108+
) {
109+
context
110+
.awakeableHandle(
111+
context.get("awk-${rejectAwakeable.awakeableKey}")
112+
?: throw TerminalException("awakeable is not registerd yet"))
113+
.reject(rejectAwakeable.reason)
114+
}
115+
116+
override suspend fun hasAwakeable(context: SharedObjectContext, awakeableKey: String): Boolean =
117+
!context.get<String>("awk-$awakeableKey").isNullOrBlank()
118+
119+
override suspend fun getResults(context: SharedObjectContext): List<String> =
120+
context.get("results") ?: listOf()
121+
122+
private suspend fun VirtualObjectCommandInterpreter.AwaitableCommand.toAwaitable(
123+
ctx: ObjectContext
124+
): Awaitable<String> {
125+
return when (this) {
126+
is VirtualObjectCommandInterpreter.CreateAwakeable -> {
127+
val awk = ctx.awakeable<String>()
128+
ctx.set("awk-${this.awakeableKey}", awk.id)
129+
awk
130+
}
131+
is VirtualObjectCommandInterpreter.RunThrowTerminalException ->
132+
ctx.runAsync<String>("should-fail-with-${this.reason}") {
133+
throw TerminalException(this.reason)
134+
}
135+
is VirtualObjectCommandInterpreter.Sleep ->
136+
ctx.timer(this.timeoutMillis.milliseconds).map { "sleep" }
137+
}
138+
}
139+
140+
private suspend fun appendResult(ctx: ObjectContext, newResult: String) =
141+
ctx.set("results", (ctx.get("results") ?: listOf<String>()) + listOf(newResult))
142+
}

test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/BlockAndWaitWorkflow.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import dev.restate.sdk.annotation.Workflow
1313
import dev.restate.sdk.kotlin.SharedWorkflowContext
1414
import dev.restate.sdk.kotlin.WorkflowContext
1515

16-
@Workflow
16+
@Workflow(name = "BlockAndWaitWorkflow")
1717
interface BlockAndWaitWorkflow {
1818
@Workflow suspend fun run(context: WorkflowContext, input: String): String
1919

test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/CancelTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
99
package dev.restate.sdk.testservices.contracts
1010

11-
import dev.restate.sdk.annotation.Exclusive
12-
import dev.restate.sdk.annotation.VirtualObject
11+
import dev.restate.sdk.annotation.*
1312
import dev.restate.sdk.kotlin.ObjectContext
1413
import kotlinx.serialization.Serializable
1514

0 commit comments

Comments
 (0)