Skip to content

Commit 3e61ede

Browse files
committed
feat(embedded): decide on whether commands are executed sync or async
Introduce EngineCommandExecutor to centralise how the four core engine calls (correlateMessage, sendSignal, startProcess, deploy) are dispatched. The default executor submits work to ForkJoinPool.commonPool(), mirroring the behaviour of a remote engine and keeping adapters portable across engine implementations (embedded ↔ remote, Camunda 7 ↔ Zeebe). Users override the Spring bean with any java.util.concurrent.Executor to change the dispatch strategy — most commonly a same-thread executor for @transactional coupling. Closes #156
1 parent 7ad67bf commit 3e61ede

File tree

14 files changed

+251
-50
lines changed

14 files changed

+251
-50
lines changed

docs/reference-c7-embedded.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,33 @@ the real type, you want to access if reading the field. For this purpose, `TaskI
5151
| tenantId | String | Tenant Id | my_tenant |
5252
| topicName | String | Topic name (from BPMN) for external task | topic_approve |
5353
| creationDate | OffsetDateTime | Time stamp of task creation formatted as ISO-8601 in UTC | 2025-05-01T10:00:00.000Z |
54+
55+
## Engine Command Executor
56+
57+
`EngineCommandExecutor` is an embedded-adapter-specific class that controls how the four core API calls
58+
(`correlateMessage`, `sendSignal`, `startProcess`, `deploy`) are dispatched to the embedded Camunda 7 engine.
59+
60+
By default, all engine calls are submitted asynchronously to `ForkJoinPool.commonPool()`. This means they run on a
61+
**different thread** from the caller and do **not** participate in the caller's `@Transactional` context — a rollback
62+
on the calling thread will **not** roll back the engine operation.
63+
64+
### Customizing execution
65+
66+
Provide a Spring bean of type `EngineCommandExecutor` to override the default. The
67+
auto-configured default is annotated with `@ConditionalOnMissingBean`, so your bean takes precedence automatically.
68+
69+
**Same-thread (synchronous) execution** — engine calls run on the calling thread and honour `@Transactional`:
70+
71+
```kotlin
72+
@Bean
73+
fun engineCommandExecutor(): EngineCommandExecutor =
74+
EngineCommandExecutor(Executor { it.run() })
75+
```
76+
77+
**Virtual-thread execution** — lightweight concurrency without pinning platform threads:
78+
79+
```kotlin
80+
@Bean
81+
fun engineCommandExecutor(): EngineCommandExecutor =
82+
EngineCommandExecutor(Executors.newVirtualThreadPerTaskExecutor())
83+
```

engine-adapter/c7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/correlation/CorrelationApiImpl.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import dev.bpmcrafters.processengineapi.Empty
55
import dev.bpmcrafters.processengineapi.MetaInfo
66
import dev.bpmcrafters.processengineapi.MetaInfoAware
77
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.CorrelationApiImpl.Restrictions.USE_GLOBAL_CORRELATION_KEY
8+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
89
import dev.bpmcrafters.processengineapi.correlation.CorrelateMessageCmd
910
import dev.bpmcrafters.processengineapi.correlation.CorrelationApi
1011
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -18,14 +19,15 @@ private val logger = KotlinLogging.logger {}
1819
*/
1920
class CorrelationApiImpl(
2021
private val runtimeService: RuntimeService,
22+
private val commandExecutor: EngineCommandExecutor,
2123
) : CorrelationApi {
2224

2325
object Restrictions {
2426
const val USE_GLOBAL_CORRELATION_KEY = "useGlobalCorrelationKey"
2527
}
2628

2729
override fun correlateMessage(cmd: CorrelateMessageCmd): CompletableFuture<Empty> {
28-
return CompletableFuture.supplyAsync {
30+
return commandExecutor.execute {
2931
val correlation = cmd.correlation.get()
3032
val globalCorrelation = cmd.restrictions.containsKey(USE_GLOBAL_CORRELATION_KEY)
3133
&& cmd.restrictions.getValue(USE_GLOBAL_CORRELATION_KEY).toBoolean()

engine-adapter/c7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/correlation/SignalApiImpl.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import dev.bpmcrafters.processengineapi.CommonRestrictions
44
import dev.bpmcrafters.processengineapi.Empty
55
import dev.bpmcrafters.processengineapi.MetaInfo
66
import dev.bpmcrafters.processengineapi.MetaInfoAware
7+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
78
import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
89
import dev.bpmcrafters.processengineapi.correlation.SignalApi
910
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -17,12 +18,13 @@ private val logger = KotlinLogging.logger {}
1718
* Implementation of Signal API using local runtime service.
1819
*/
1920
class SignalApiImpl(
20-
private val runtimeService: RuntimeService
21+
private val runtimeService: RuntimeService,
22+
private val commandExecutor: EngineCommandExecutor,
2123
) : SignalApi {
2224

2325
override fun sendSignal(cmd: SendSignalCmd): CompletableFuture<Empty> {
2426
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-002: sending signal ${cmd.signalName}." }
25-
return CompletableFuture.supplyAsync {
27+
return commandExecutor.execute {
2628
runtimeService
2729
.createSignalEvent(cmd.signalName)
2830
.applyRestrictions(ensureSupported(cmd.restrictions))

engine-adapter/c7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/deploy/DeploymentApiImpl.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dev.bpmcrafters.processengineapi.adapter.c7.embedded.deploy
22

33
import dev.bpmcrafters.processengineapi.MetaInfo
44
import dev.bpmcrafters.processengineapi.MetaInfoAware
5+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
56
import dev.bpmcrafters.processengineapi.deploy.DeployBundleCommand
67
import dev.bpmcrafters.processengineapi.deploy.DeploymentApi
78
import dev.bpmcrafters.processengineapi.deploy.DeploymentInformation
@@ -16,13 +17,14 @@ private val logger = KotlinLogging.logger {}
1617
* Implementation for deployment API using repository service.
1718
*/
1819
class DeploymentApiImpl(
19-
private val repositoryService: RepositoryService
20+
private val repositoryService: RepositoryService,
21+
private val commandExecutor: EngineCommandExecutor,
2022
) : DeploymentApi {
2123

2224
override fun deploy(cmd: DeployBundleCommand): CompletableFuture<DeploymentInformation> {
2325
require(cmd.resources.isNotEmpty()) { "Resources must not be empty, at least one resource must be provided." }
2426
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-003: executing a bundle deployment with ${cmd.resources.size} resources." }
25-
return CompletableFuture.supplyAsync {
27+
return commandExecutor.execute {
2628
repositoryService
2729
.createDeployment()
2830
.apply {

engine-adapter/c7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/process/StartProcessApiImpl.kt

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import dev.bpmcrafters.processengineapi.CommonRestrictions
44
import dev.bpmcrafters.processengineapi.MetaInfo
55
import dev.bpmcrafters.processengineapi.MetaInfoAware
66
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.applyTenantRestrictions
7+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
78
import dev.bpmcrafters.processengineapi.process.*
89
import io.github.oshai.kotlinlogging.KotlinLogging
910
import org.camunda.bpm.engine.RepositoryService
@@ -19,30 +20,32 @@ private val logger = KotlinLogging.logger {}
1920
class StartProcessApiImpl(
2021
private val runtimeService: RuntimeService,
2122
private val repositoryService: RepositoryService,
23+
private val commandExecutor: EngineCommandExecutor,
2224
) : StartProcessApi {
2325

2426
override fun startProcess(cmd: StartProcessCommand): CompletableFuture<ProcessInformation> {
2527
return when (cmd) {
2628
is StartProcessByDefinitionCmd ->
27-
CompletableFuture.supplyAsync {
29+
commandExecutor.execute {
2830
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-004: starting a new process instance by definition ${cmd.definitionKey}." }
2931
ensureSupported(cmd.restrictions)
3032
val payload = cmd.payloadSupplier.get()
3133
val tenantId = cmd.restrictions[CommonRestrictions.TENANT_ID]
3234
if (!tenantId.isNullOrBlank()) {
33-
repositoryService
34-
.createProcessDefinitionQuery()
35-
.processDefinitionKey(cmd.definitionKey)
36-
.tenantIdIn(tenantId)
37-
.active()
38-
.latestVersion()
39-
.singleResult()?.let { processDefinition ->
40-
runtimeService.startProcessInstanceById(
41-
processDefinition.id,
42-
payload[CommonRestrictions.BUSINESS_KEY]?.toString(),
43-
payload,
44-
).toProcessInformation()
45-
}
35+
val processDefinition = requireNotNull(
36+
repositoryService
37+
.createProcessDefinitionQuery()
38+
.processDefinitionKey(cmd.definitionKey)
39+
.tenantIdIn(tenantId)
40+
.active()
41+
.latestVersion()
42+
.singleResult()
43+
)
44+
runtimeService.startProcessInstanceById(
45+
processDefinition.id,
46+
payload[CommonRestrictions.BUSINESS_KEY]?.toString(),
47+
payload,
48+
).toProcessInformation()
4649
} else {
4750
runtimeService.startProcessInstanceByKey(
4851
cmd.definitionKey,
@@ -53,7 +56,7 @@ class StartProcessApiImpl(
5356
}
5457

5558
is StartProcessByMessageCmd ->
56-
CompletableFuture.supplyAsync {
59+
commandExecutor.execute {
5760
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-005: starting a new process instance by message ${cmd.messageName}." }
5861
val payload = cmd.payloadSupplier.get()
5962
var correlationBuilder = runtimeService
@@ -69,7 +72,7 @@ class StartProcessApiImpl(
6972
}
7073

7174
is StartProcessByDefinitionAtElementCmd ->
72-
CompletableFuture.supplyAsync {
75+
commandExecutor.execute {
7376
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-006: starting a new process instance by definition ${cmd.definitionKey} at element ${cmd.elementId}" }
7477
val startProcessCommand = StartProcessByDefinitionCmd(
7578
definitionKey = cmd.definitionKey,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared
2+
3+
import java.util.concurrent.CompletableFuture
4+
import java.util.concurrent.Executor
5+
import java.util.concurrent.ForkJoinPool
6+
7+
/**
8+
* Controls how engine commands are dispatched.
9+
* By default, commands run asynchronously on [ForkJoinPool.commonPool] —
10+
* mirroring the behavior of a remote engine.
11+
*
12+
* This design choice was made on purpose to keep adapters portable across engine implementations
13+
* (embedded ↔ remote, Camunda 7 ↔ Zeebe).
14+
*
15+
* However, engine calls do not join the caller's transaction — a rollback triggered elsewhere
16+
* will not undo what the engine already executed, risking data inconsistencies between your service and the engine.
17+
*
18+
* Override this bean with any [Executor] to change the dispatch strategy.
19+
* The most common use case for this is transactional coupling via a same-thread executor:
20+
* ```
21+
* @Bean fun engineCommandExecutor() = EngineCommandExecutor { it.run() }
22+
* ```
23+
*/
24+
class EngineCommandExecutor(
25+
private val executor: Executor = ForkJoinPool.commonPool()
26+
) {
27+
fun <T> execute(
28+
supplier: () -> T
29+
): CompletableFuture<T> {
30+
return CompletableFuture.supplyAsync({ supplier() }, executor)
31+
}
32+
}

engine-adapter/c7-embedded-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/testing/AbstractC7EmbeddedStage.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import dev.bpmcrafters.processengineapi.CommonRestrictions
66
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.CorrelationApiImpl
77
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.SignalApiImpl
88
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.deploy.DeploymentApiImpl
9+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
910
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.process.CachingProcessDefinitionMetaDataResolver
1011
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.process.StartProcessApiImpl
1112
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.completion.C7ServiceTaskCompletionApiImpl
@@ -120,12 +121,17 @@ abstract class AbstractC7EmbeddedStage<SUBTYPE : AbstractC7EmbeddedStage<SUBTYPE
120121
this.workerId = self().javaClass.simpleName
121122

122123
val subscriptionRepository = InMemSubscriptionRepository()
124+
val commandExecutor = EngineCommandExecutor { it.run() }
123125

124126
startProcessApi = StartProcessApiImpl(
125127
runtimeService = processEngineServices.runtimeService,
126128
repositoryService = processEngineServices.repositoryService,
129+
commandExecutor = commandExecutor,
130+
)
131+
deploymentApi = DeploymentApiImpl(
132+
repositoryService = processEngineServices.repositoryService,
133+
commandExecutor = commandExecutor,
127134
)
128-
deploymentApi = DeploymentApiImpl(processEngineServices.repositoryService)
129135
userTaskCompletionApi = C7UserTaskCompletionApiImpl(processEngineServices.taskService, subscriptionRepository)
130136
serviceTaskCompletionApi = C7ServiceTaskCompletionApiImpl(
131137
workerId, processEngineServices.externalTaskService, subscriptionRepository, LinearMemoryFailureRetrySupplier(3, 3L)
@@ -152,8 +158,14 @@ abstract class AbstractC7EmbeddedStage<SUBTYPE : AbstractC7EmbeddedStage<SUBTYPE
152158
taskSubscriptionApi, restrictions, null, null
153159
)
154160

155-
signalApi = SignalApiImpl(processEngineServices.runtimeService)
156-
correlationApi = CorrelationApiImpl(processEngineServices.runtimeService)
161+
signalApi = SignalApiImpl(
162+
runtimeService = processEngineServices.runtimeService,
163+
commandExecutor = commandExecutor,
164+
)
165+
correlationApi = CorrelationApiImpl(
166+
runtimeService = processEngineServices.runtimeService,
167+
commandExecutor = commandExecutor,
168+
)
157169

158170
initialize()
159171

engine-adapter/c7-embedded-core/src/test/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/correlation/CorrelationApiImplTest.kt

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
11
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation
22

33
import dev.bpmcrafters.processengineapi.CommonRestrictions
4+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
45
import dev.bpmcrafters.processengineapi.correlation.CorrelateMessageCmd
56
import dev.bpmcrafters.processengineapi.correlation.Correlation
67
import org.camunda.bpm.engine.RuntimeService
78
import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder
89
import org.camunda.community.mockito.ProcessExpressions
910
import org.junit.jupiter.api.BeforeEach
1011
import org.junit.jupiter.api.Test
11-
import org.junit.jupiter.api.extension.ExtendWith
12-
import org.mockito.InjectMocks
1312
import org.mockito.Mockito.mock
14-
import org.mockito.junit.jupiter.MockitoExtension
1513
import org.mockito.kotlin.verify
1614
import org.mockito.kotlin.verifyNoMoreInteractions
1715

18-
@ExtendWith(MockitoExtension::class)
1916
class CorrelationApiImplTest {
2017

2118
private val runtimeService: RuntimeService = mock()
22-
23-
@InjectMocks
24-
private lateinit var correlationApi: CorrelationApiImpl
19+
private val correlationApi = CorrelationApiImpl(
20+
runtimeService = runtimeService,
21+
commandExecutor = EngineCommandExecutor { it.run() }
22+
)
2523

2624
private lateinit var correlation: MessageCorrelationBuilder
2725

@@ -64,6 +62,5 @@ class CorrelationApiImplTest {
6462
verify(correlation).setVariables(mapOf("some" to 1L))
6563
verify(correlation).processInstanceVariableEquals("myCorrelation", "varValue")
6664
verifyNoMoreInteractions(correlation)
67-
6865
}
69-
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation
2+
3+
import dev.bpmcrafters.processengineapi.Empty
4+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
5+
import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
6+
import org.assertj.core.api.Assertions.assertThat
7+
import org.camunda.bpm.engine.RuntimeService
8+
import org.camunda.bpm.engine.runtime.SignalEventReceivedBuilder
9+
import org.junit.jupiter.api.BeforeEach
10+
import org.junit.jupiter.api.Test
11+
import org.junit.jupiter.api.extension.ExtendWith
12+
import org.mockito.Mock
13+
import org.mockito.junit.jupiter.MockitoExtension
14+
import org.mockito.kotlin.any
15+
import org.mockito.kotlin.doAnswer
16+
import org.mockito.kotlin.mock
17+
import org.mockito.kotlin.verify
18+
import org.mockito.kotlin.verifyNoMoreInteractions
19+
import org.mockito.kotlin.whenever
20+
21+
@ExtendWith(MockitoExtension::class)
22+
class SignalApiImplTest {
23+
24+
@Mock
25+
private lateinit var runtimeService: RuntimeService
26+
27+
private lateinit var signalApi: SignalApiImpl
28+
29+
@BeforeEach
30+
fun setUp() {
31+
signalApi = SignalApiImpl(
32+
runtimeService = runtimeService,
33+
commandExecutor = EngineCommandExecutor { it.run() }
34+
)
35+
}
36+
37+
@Test
38+
fun `should send signal and return completedFuture`() {
39+
val signalBuilder = mock<SignalEventReceivedBuilder>()
40+
val payload = mapOf("key" to "value")
41+
val cmd = SendSignalCmd(signalName = "mySignal", payloadSupplier = { payload })
42+
whenever(runtimeService.createSignalEvent(any())).thenReturn(signalBuilder)
43+
whenever(signalBuilder.setVariables(any())).thenReturn(signalBuilder)
44+
doAnswer { }.whenever(signalBuilder).send()
45+
46+
val future = signalApi.sendSignal(cmd = cmd).get()
47+
48+
assertThat(future).isEqualTo(Empty)
49+
verify(runtimeService).createSignalEvent("mySignal")
50+
verify(signalBuilder).setVariables(payload)
51+
verify(signalBuilder).send()
52+
verifyNoMoreInteractions(signalBuilder)
53+
}
54+
}

engine-adapter/c7-embedded-core/src/test/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/embedded/deploy/DeploymentApiImplTest.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.deploy
22

3+
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
34
import dev.bpmcrafters.processengineapi.deploy.DeployBundleCommand
45
import dev.bpmcrafters.processengineapi.deploy.NamedResource
56
import org.assertj.core.api.Assertions.assertThat
@@ -17,7 +18,10 @@ import java.util.*
1718
class DeploymentApiImplTest {
1819

1920
private val repositoryService: RepositoryService = mock()
20-
private val deploymentApiImpl = DeploymentApiImpl(repositoryService)
21+
private val deploymentApiImpl = DeploymentApiImpl(
22+
repositoryService = repositoryService,
23+
commandExecutor = EngineCommandExecutor { it.run() }
24+
)
2125

2226
@Test
2327
fun `empty tenant id was not set`() {

0 commit comments

Comments
 (0)