Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/reference-c7-embedded.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,33 @@ the real type, you want to access if reading the field. For this purpose, `TaskI
| tenantId | String | Tenant Id | my_tenant |
| topicName | String | Topic name (from BPMN) for external task | topic_approve |
| creationDate | OffsetDateTime | Time stamp of task creation formatted as ISO-8601 in UTC | 2025-05-01T10:00:00.000Z |

## Engine Command Executor

`EngineCommandExecutor` is an embedded-adapter-specific class that controls how the four core API calls
(`correlateMessage`, `sendSignal`, `startProcess`, `deploy`) are dispatched to the embedded Camunda 7 engine.

By default, all engine calls are submitted asynchronously to `ForkJoinPool.commonPool()`. This means they run on a
**different thread** from the caller and do **not** participate in the caller's `@Transactional` context — a rollback
on the calling thread will **not** roll back the engine operation.

### Customizing execution

Provide a Spring bean of type `EngineCommandExecutor` to override the default. The
auto-configured default is annotated with `@ConditionalOnMissingBean`, so your bean takes precedence automatically.

**Same-thread (synchronous) execution** — engine calls run on the calling thread and honour `@Transactional`:

```kotlin
@Bean
fun engineCommandExecutor(): EngineCommandExecutor =
EngineCommandExecutor(Executor { it.run() })
```

**Virtual-thread execution** — lightweight concurrency without pinning platform threads:

```kotlin
@Bean
fun engineCommandExecutor(): EngineCommandExecutor =
EngineCommandExecutor(Executors.newVirtualThreadPerTaskExecutor())
```
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.CorrelationApiImpl.Restrictions.USE_GLOBAL_CORRELATION_KEY
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.correlation.CorrelateMessageCmd
import dev.bpmcrafters.processengineapi.correlation.CorrelationApi
import io.github.oshai.kotlinlogging.KotlinLogging
Expand All @@ -18,14 +19,15 @@ private val logger = KotlinLogging.logger {}
*/
class CorrelationApiImpl(
private val runtimeService: RuntimeService,
private val commandExecutor: EngineCommandExecutor,
) : CorrelationApi {

object Restrictions {
const val USE_GLOBAL_CORRELATION_KEY = "useGlobalCorrelationKey"
}

override fun correlateMessage(cmd: CorrelateMessageCmd): CompletableFuture<Empty> {
return CompletableFuture.supplyAsync {
return commandExecutor.execute {
val correlation = cmd.correlation.get()
val globalCorrelation = cmd.restrictions.containsKey(USE_GLOBAL_CORRELATION_KEY)
&& cmd.restrictions.getValue(USE_GLOBAL_CORRELATION_KEY).toBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
import dev.bpmcrafters.processengineapi.correlation.SignalApi
import io.github.oshai.kotlinlogging.KotlinLogging
Expand All @@ -17,12 +18,13 @@ private val logger = KotlinLogging.logger {}
* Implementation of Signal API using local runtime service.
*/
class SignalApiImpl(
private val runtimeService: RuntimeService
private val runtimeService: RuntimeService,
private val commandExecutor: EngineCommandExecutor,
) : SignalApi {

override fun sendSignal(cmd: SendSignalCmd): CompletableFuture<Empty> {
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-002: sending signal ${cmd.signalName}." }
return CompletableFuture.supplyAsync {
return commandExecutor.execute {
runtimeService
.createSignalEvent(cmd.signalName)
.applyRestrictions(ensureSupported(cmd.restrictions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dev.bpmcrafters.processengineapi.adapter.c7.embedded.deploy

import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.deploy.DeployBundleCommand
import dev.bpmcrafters.processengineapi.deploy.DeploymentApi
import dev.bpmcrafters.processengineapi.deploy.DeploymentInformation
Expand All @@ -16,13 +17,14 @@ private val logger = KotlinLogging.logger {}
* Implementation for deployment API using repository service.
*/
class DeploymentApiImpl(
private val repositoryService: RepositoryService
private val repositoryService: RepositoryService,
private val commandExecutor: EngineCommandExecutor,
) : DeploymentApi {

override fun deploy(cmd: DeployBundleCommand): CompletableFuture<DeploymentInformation> {
require(cmd.resources.isNotEmpty()) { "Resources must not be empty, at least one resource must be provided." }
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-003: executing a bundle deployment with ${cmd.resources.size} resources." }
return CompletableFuture.supplyAsync {
return commandExecutor.execute {
repositoryService
.createDeployment()
.apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.applyTenantRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.process.*
import io.github.oshai.kotlinlogging.KotlinLogging
import org.camunda.bpm.engine.RepositoryService
Expand All @@ -19,30 +20,32 @@ private val logger = KotlinLogging.logger {}
class StartProcessApiImpl(
private val runtimeService: RuntimeService,
private val repositoryService: RepositoryService,
private val commandExecutor: EngineCommandExecutor,
) : StartProcessApi {

override fun startProcess(cmd: StartProcessCommand): CompletableFuture<ProcessInformation> {
return when (cmd) {
is StartProcessByDefinitionCmd ->
CompletableFuture.supplyAsync {
commandExecutor.execute {
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-004: starting a new process instance by definition ${cmd.definitionKey}." }
ensureSupported(cmd.restrictions)
val payload = cmd.payloadSupplier.get()
val tenantId = cmd.restrictions[CommonRestrictions.TENANT_ID]
if (!tenantId.isNullOrBlank()) {
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKey(cmd.definitionKey)
.tenantIdIn(tenantId)
.active()
.latestVersion()
.singleResult()?.let { processDefinition ->
runtimeService.startProcessInstanceById(
processDefinition.id,
payload[CommonRestrictions.BUSINESS_KEY]?.toString(),
payload,
).toProcessInformation()
}
val processDefinition = requireNotNull(
repositoryService
.createProcessDefinitionQuery()
.processDefinitionKey(cmd.definitionKey)
.tenantIdIn(tenantId)
.active()
.latestVersion()
.singleResult()
)
runtimeService.startProcessInstanceById(
processDefinition.id,
payload[CommonRestrictions.BUSINESS_KEY]?.toString(),
payload,
).toProcessInformation()
} else {
runtimeService.startProcessInstanceByKey(
cmd.definitionKey,
Expand All @@ -53,7 +56,7 @@ class StartProcessApiImpl(
}

is StartProcessByMessageCmd ->
CompletableFuture.supplyAsync {
commandExecutor.execute {
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-005: starting a new process instance by message ${cmd.messageName}." }
val payload = cmd.payloadSupplier.get()
var correlationBuilder = runtimeService
Expand All @@ -69,7 +72,7 @@ class StartProcessApiImpl(
}

is StartProcessByDefinitionAtElementCmd ->
CompletableFuture.supplyAsync {
commandExecutor.execute {
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-006: starting a new process instance by definition ${cmd.definitionKey} at element ${cmd.elementId}" }
val startProcessCommand = StartProcessByDefinitionCmd(
definitionKey = cmd.definitionKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared

import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.concurrent.ForkJoinPool

/**
* Controls how engine commands are dispatched.
* By default, commands run asynchronously on [ForkJoinPool.commonPool] —
* mirroring the behavior of a remote engine.
*
* This design choice was made on purpose to keep adapters portable across engine implementations
* (embedded ↔ remote, Camunda 7 ↔ Zeebe).
*
* However, engine calls do not join the caller's transaction — a rollback triggered elsewhere
* will not undo what the engine already executed, risking data inconsistencies between your service and the engine.
*
* Override this bean with any [Executor] to change the dispatch strategy.
* The most common use case for this is transactional coupling via a same-thread executor:
* ```
* @Bean fun engineCommandExecutor() = EngineCommandExecutor { it.run() }
* ```
*/
class EngineCommandExecutor(
private val executor: Executor = ForkJoinPool.commonPool()
) {
Comment on lines +24 to +26
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, would it be sufficient to not have an EngineCommandExecutor, and just inject a "same thread executor" bean where needed and pass it to the supplyAsync calls there? Should be the same effect but with one fewer class.

Copy link
Copy Markdown
Member Author

@emaarco emaarco Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@i-am-not-giving-my-name-to-a-machine - if i understand you right, this should work as well, yes. And was also my first suggestion (to just execute the operation sync, and then wrap the result in a CompletableFuture.completedFuture(...) or as an alternative use a sync executor right away)

However, @zambrovski's take on this to make it customizable. So that the default behaviour is still async, and you need to opt in to sync execution with a "same thread executor". So that you do not become coupled to the embedded engine through implicit behavior of the adapter—but rather the application remains portable.

Thus i'd suggested this solution (which would also open the door to set another pool than the default forkJoinPool of supplyAsync, if a user would for example like to use something like newVirtualThreadPerTaskExecutor for whatever reason)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with @zambrovski on this, that it should be customizable and the default is what the API implies: async. However, I really like the option for transactional engine commands. I'd merely introduce an Executor bean (that is specifically for the service implementations that you modified). All I'm suggesting is to replace

  @Bean
  @ConditionalOnMissingBean
  fun engineCommandExecutor(): EngineCommandExecutor = EngineCommandExecutor()

with

  @Bean
  @ConditionalOnMissingBean
  fun engineCommandExecutor(): Executor = ForkJoinPool.commonPool()

and delete EngineCommandExecutor. Unless this would replace something in Spring and it would really hurt everyone who uses this lib, because this change should not interfere with anything else of course.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, got it :) not sure about that either, whether there is some kind of default bean on the Executor class.
Therefore i implemented it as a property of another class.

To make it

  • a) more explicit what the bean is about (just relevant to sending commands to the engine)
  • b) prevent issues if a user/spring itself already defined another Executor bean, thats used somewhere else.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if an indirection is necessary, so be it.

Tbh, Spring beans are not my strong suit, my background is in Jakarta EE and way back stuff, but I'd expect a mechanism in Spring that enables arbitrary beans of the same type that don't interfere with any default beans.

Copy link
Copy Markdown
Member Author

@emaarco emaarco Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use beans with specific qualifiers/names throughout the whole adapter, I expect this should be possible in Spring without bigger problems.

But given that, I don't see an outweighting advantage in this approach, since i force the user to give the bean a specific name - and neither a big disadvantage in defining one class that represents a lot better what the executor is about. Think it shouldn’t be that expressive to maintain in the long run - also considering that it could be somehow shared since all embedded adapters could require it

But i can live with both

fun <T> execute(
supplier: () -> T
): CompletableFuture<T> {
return CompletableFuture.supplyAsync({ supplier() }, executor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.CorrelationApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.SignalApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.deploy.DeploymentApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.process.CachingProcessDefinitionMetaDataResolver
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.process.StartProcessApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.completion.C7ServiceTaskCompletionApiImpl
Expand Down Expand Up @@ -120,12 +121,17 @@ abstract class AbstractC7EmbeddedStage<SUBTYPE : AbstractC7EmbeddedStage<SUBTYPE
this.workerId = self().javaClass.simpleName

val subscriptionRepository = InMemSubscriptionRepository()
val commandExecutor = EngineCommandExecutor { it.run() }

startProcessApi = StartProcessApiImpl(
runtimeService = processEngineServices.runtimeService,
repositoryService = processEngineServices.repositoryService,
commandExecutor = commandExecutor,
)
deploymentApi = DeploymentApiImpl(
repositoryService = processEngineServices.repositoryService,
commandExecutor = commandExecutor,
)
deploymentApi = DeploymentApiImpl(processEngineServices.repositoryService)
userTaskCompletionApi = C7UserTaskCompletionApiImpl(processEngineServices.taskService, subscriptionRepository)
serviceTaskCompletionApi = C7ServiceTaskCompletionApiImpl(
workerId, processEngineServices.externalTaskService, subscriptionRepository, LinearMemoryFailureRetrySupplier(3, 3L)
Expand All @@ -152,8 +158,14 @@ abstract class AbstractC7EmbeddedStage<SUBTYPE : AbstractC7EmbeddedStage<SUBTYPE
taskSubscriptionApi, restrictions, null, null
)

signalApi = SignalApiImpl(processEngineServices.runtimeService)
correlationApi = CorrelationApiImpl(processEngineServices.runtimeService)
signalApi = SignalApiImpl(
runtimeService = processEngineServices.runtimeService,
commandExecutor = commandExecutor,
)
correlationApi = CorrelationApiImpl(
runtimeService = processEngineServices.runtimeService,
commandExecutor = commandExecutor,
)

initialize()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.correlation.CorrelateMessageCmd
import dev.bpmcrafters.processengineapi.correlation.Correlation
import org.camunda.bpm.engine.RuntimeService
import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder
import org.camunda.community.mockito.ProcessExpressions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.InjectMocks
import org.mockito.Mockito.mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions

@ExtendWith(MockitoExtension::class)
class CorrelationApiImplTest {

private val runtimeService: RuntimeService = mock()

@InjectMocks
private lateinit var correlationApi: CorrelationApiImpl
private val correlationApi = CorrelationApiImpl(
runtimeService = runtimeService,
commandExecutor = EngineCommandExecutor { it.run() }
)

private lateinit var correlation: MessageCorrelationBuilder

Expand Down Expand Up @@ -64,6 +62,5 @@ class CorrelationApiImplTest {
verify(correlation).setVariables(mapOf("some" to 1L))
verify(correlation).processInstanceVariableEquals("myCorrelation", "varValue")
verifyNoMoreInteractions(correlation)

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation

import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
import org.assertj.core.api.Assertions.assertThat
import org.camunda.bpm.engine.RuntimeService
import org.camunda.bpm.engine.runtime.SignalEventReceivedBuilder
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import org.mockito.kotlin.whenever

@ExtendWith(MockitoExtension::class)
class SignalApiImplTest {

@Mock
private lateinit var runtimeService: RuntimeService

private lateinit var signalApi: SignalApiImpl

@BeforeEach
fun setUp() {
signalApi = SignalApiImpl(
runtimeService = runtimeService,
commandExecutor = EngineCommandExecutor { it.run() }
)
}

@Test
fun `should send signal and return completedFuture`() {
val signalBuilder = mock<SignalEventReceivedBuilder>()
val payload = mapOf("key" to "value")
val cmd = SendSignalCmd(signalName = "mySignal", payloadSupplier = { payload })
whenever(runtimeService.createSignalEvent(any())).thenReturn(signalBuilder)
whenever(signalBuilder.setVariables(any())).thenReturn(signalBuilder)
doAnswer { }.whenever(signalBuilder).send()

val future = signalApi.sendSignal(cmd = cmd).get()

assertThat(future).isEqualTo(Empty)
verify(runtimeService).createSignalEvent("mySignal")
verify(signalBuilder).setVariables(payload)
verify(signalBuilder).send()
verifyNoMoreInteractions(signalBuilder)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.deploy

import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.deploy.DeployBundleCommand
import dev.bpmcrafters.processengineapi.deploy.NamedResource
import org.assertj.core.api.Assertions.assertThat
Expand All @@ -17,7 +18,10 @@ import java.util.*
class DeploymentApiImplTest {

private val repositoryService: RepositoryService = mock()
private val deploymentApiImpl = DeploymentApiImpl(repositoryService)
private val deploymentApiImpl = DeploymentApiImpl(
repositoryService = repositoryService,
commandExecutor = EngineCommandExecutor { it.run() }
)

@Test
fun `empty tenant id was not set`() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c7.embedded.process

import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.completion.C7ServiceTaskCompletionApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.completion.C7UserTaskCompletionApiImpl
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.completion.LinearMemoryFailureRetrySupplier
Expand Down Expand Up @@ -43,6 +44,7 @@ class C7EmbeddedProcessTestHelper(private val processEngine: ProcessEngine) : Pr
override fun getStartProcessApi(): StartProcessApi = StartProcessApiImpl(
runtimeService = processEngine.runtimeService,
repositoryService = processEngine.repositoryService,
commandExecutor = EngineCommandExecutor { it.run() },
)

override fun getTaskSubscriptionApi(): TaskSubscriptionApi = C7TaskSubscriptionApiImpl(
Expand Down
Loading