Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions engine-adapter/adapter-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@
<artifactId>awaitility</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility-kotlin</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
1 change: 0 additions & 1 deletion engine-adapter/c7-embedded-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
<version>2026.02.1</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.correlation.applyTenantRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.shared.EngineCommandExecutor
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.metaOf
import dev.bpmcrafters.processengineapi.process.*
import io.github.oshai.kotlinlogging.KotlinLogging
import org.camunda.bpm.engine.RepositoryService
Expand Down Expand Up @@ -104,10 +105,11 @@ class StartProcessApiImpl(

fun ProcessInstance.toProcessInformation() = ProcessInformation(
instanceId = this.id,
meta = mapOf(
meta = metaOf(
CommonRestrictions.PROCESS_DEFINITION_KEY to this.processDefinitionId,
CommonRestrictions.BUSINESS_KEY to this.businessKey,
CommonRestrictions.TENANT_ID to this.tenantId,
"rootProcessInstanceId" to this.rootProcessInstanceId
"rootProcessInstanceId" to this.rootProcessInstanceId,
CommonRestrictions.PROCESS_DEFINITION_ID to this.processDefinitionId,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ package dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.ExternalServiceTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull.EmbeddedPullServiceTaskDeliveryMetrics.DropReason.EXPIRED_WHILE_IN_QUEUE
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull.EmbeddedPullServiceTaskDeliveryMetrics.DropReason.NO_MATCHING_SUBSCRIPTIONS
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull.EmbeddedPullServiceTaskDeliveryMetrics.FetchAndLockSkipReason.NO_SUBSCRIPTIONS
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.pull.EmbeddedPullServiceTaskDeliveryMetrics.FetchAndLockSkipReason.QUEUE_FULL
import dev.bpmcrafters.processengineapi.adapter.c7.embedded.task.delivery.toTaskInformation
import dev.bpmcrafters.processengineapi.impl.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.impl.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.impl.task.filterBySubscription
import dev.bpmcrafters.processengineapi.task.TaskInformation
import dev.bpmcrafters.processengineapi.task.TaskInformation.Companion.CREATE
import dev.bpmcrafters.processengineapi.task.TaskType
import io.github.oshai.kotlinlogging.KotlinLogging
import org.camunda.bpm.engine.ExternalTaskService
import org.camunda.bpm.engine.externaltask.ExternalTaskQueryBuilder
import org.camunda.bpm.engine.externaltask.LockedExternalTask
import java.util.concurrent.ExecutorService
import java.time.Duration
import java.time.OffsetDateTime
import java.util.concurrent.Callable
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicInteger

private val logger = KotlinLogging.logger {}

Expand All @@ -28,85 +37,160 @@ class EmbeddedPullServiceTaskDelivery(
private val lockDurationInSeconds: Long,
private val retryTimeoutInSeconds: Long,
private val retries: Int,
private val executorService: ExecutorService
private val executor: ThreadPoolExecutor,
private val metrics: EmbeddedPullServiceTaskDeliveryMetrics
) : ExternalServiceTaskDelivery, RefreshableDelivery {

internal val stillLockedTasksGauge = AtomicInteger()

init {
metrics.registerExecutorThreadsUsedGauge(executor::getActiveCount)
metrics.registerExecutorQueueCapacityGauge(executor.queue::remainingCapacity)
metrics.registerStillLockedTasksGauge(stillLockedTasksGauge)
}

/**
* Delivers all tasks found in the external service to corresponding subscriptions.
*/
override fun refresh() {
deliverNewTasks()
cleanUpTerminatedTasks()
}

internal fun deliverNewTasks() {
val subscriptions = subscriptionRepository.getTaskSubscriptions().filter { s -> s.taskType==TaskType.EXTERNAL }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val subscriptions = subscriptionRepository.getTaskSubscriptions().filter { s -> s.taskType==TaskType.EXTERNAL }
val subscriptions = subscriptionRepository.getTaskSubscriptions().filter { s -> s.taskType == TaskType.EXTERNAL }

Found a bunch of weirdly formatted comparisons all across this merge request. Is this this project's formatter setting? If so, I need to update mine.

if (subscriptions.isEmpty()) {
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-035: Pull external tasks disabled because of no active subscriptions" }
metrics.incrementFetchAndLockTasksSkippedCounter(NO_SUBSCRIPTIONS)
return
}

val tasksToFetch = maxTasks.coerceAtMost(executor.queue.remainingCapacity())
if (tasksToFetch==0) {
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-044: Task executor queue is full, skipping task fetch" }
metrics.incrementFetchAndLockTasksSkippedCounter(QUEUE_FULL)
return
}

logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-030: pulling $tasksToFetch service tasks for subscriptions: $subscriptions" }
val lockedTasks = externalTaskService
.fetchAndLock(tasksToFetch, workerId)
.forSubscriptions(subscriptions)
.execute()

logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-045: pulled ${lockedTasks.size} service tasks" }
lockedTasks
.groupBy { it.topicName }
.forEach { (topic, tasks) -> metrics.incrementFetchedAndLockedTasksCounter(topic!!, tasks.size) }

val taskActionHandlerCallables = lockedTasks
.asSequence()
.map { lockedTask -> lockedTask to subscriptions.firstOrNull { subscription -> subscription.matches(lockedTask) } }
.filter { (lockedTask, subscription) ->
val keep = subscription!=null
if (!keep) {
metrics.incrementDroppedTasksCounter(lockedTask.topicName!!, NO_MATCHING_SUBSCRIPTIONS)
}
keep
}
.map { (lockedTask, activeSubscription) -> createTaskActionHandlerCallable(lockedTask, activeSubscription!!) }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!! signifies for me still a kind of danger, that a NullPointer could be thrown, even if this is prevented due to the filter above. however, we could remove the !! by rewriting the filter with a mapNotNull from the beginning

val taskActionHandlerCallables = lockedTasks
    .asSequence()
    .mapNotNull { lockedTask ->
        val subscription = subscriptions.firstOrNull { it.matches(lockedTask) }
        if (subscription != null) {
            lockedTask to subscription
        } else {
            metrics.incrementDroppedTasksCounter(lockedTask.topicName!!, NO_MATCHING_SUBSCRIPTIONS)
            null
        }
    }
    .map { (lockedTask, activeSubscription) -> createTaskActionHandlerCallable(lockedTask, activeSubscription) }

.toList()

taskActionHandlerCallables
.map { executor.submit(it) }
.forEach { it.get() }

}

internal fun createTaskActionHandlerCallable(lockedTask: LockedExternalTask, activeSubscription: TaskSubscriptionHandle): Callable<Unit> =
Callable {
// make sure the task has not expired waiting in the queue for the execution
val start = OffsetDateTime.now().toInstant()
val lockExpirationInstant = lockedTask.lockExpirationTime.toInstant()
val timePassedSinceLockAcquisition = Duration.between(lockExpirationInstant.minusSeconds(lockDurationInSeconds), start)
metrics.recordTaskQueueTime(lockedTask.topicName!!, timePassedSinceLockAcquisition)
if (start.isBefore(lockExpirationInstant)) {
try {
if (subscriptionRepository.getActiveSubscriptionForTask(lockedTask.id)==activeSubscription) {
// task is already delivered to the current subscription, nothing to do
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-041: skipping task ${lockedTask.id} since it is unchanged." }
} else {
// create task information and set up the reason
val taskInformation = lockedTask.toTaskInformation().withReason(CREATE)
subscriptionRepository.activateSubscriptionForTask(lockedTask.id, activeSubscription)
val variables = lockedTask.variables.filterBySubscription(activeSubscription)
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-031: delivering service task ${lockedTask.id}." }
activeSubscription.action.accept(taskInformation, variables)
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-032: successfully delivered service task ${lockedTask.id}." }
}
logger.debug { "PROCESS-ENGINE-C7-REMOTE-032: successfully delivered service task ${lockedTask.id}." }
metrics.incrementCompletedTasksCounter(lockedTask.topicName!!)
} catch (e: Exception) {
val jobRetries: Int = lockedTask.retries?.minus(1) ?: retries
logger.error { "PROCESS-ENGINE-C7-EMBEDDED-033: failing delivering task ${lockedTask.id}: ${e.message}" }
metrics.incrementFailedTasksCounter(lockedTask.topicName!!)
externalTaskService.handleFailure(lockedTask.id, workerId, e.message, jobRetries, retryTimeoutInSeconds * 1000)
subscriptionRepository.deactivateSubscriptionForTask(taskId = lockedTask.id)
logger.error { "PROCESS-ENGINE-C7-EMBEDDED-034: successfully failed delivering task ${lockedTask.id}: ${e.message}" }

val subscriptions = subscriptionRepository.getTaskSubscriptions().filter { s -> s.taskType == TaskType.EXTERNAL }
if (subscriptions.isNotEmpty()) {
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-030: pulling service tasks for subscriptions: $subscriptions" }
val deliveredTaskIds = subscriptionRepository.getDeliveredTaskIds(TaskType.EXTERNAL).toMutableList()
externalTaskService
.fetchAndLock(maxTasks, workerId)
.forSubscriptions(subscriptions)
.execute()
.parallelStream()
.map { lockedTask ->
subscriptions
.firstOrNull { subscription -> subscription.matches(lockedTask) }
?.let { activeSubscription ->
executorService.submit { // in another thread
try {
val taskInformation = if (deliveredTaskIds.contains(lockedTask.id)
&& subscriptionRepository.getActiveSubscriptionForTask(lockedTask.id) == activeSubscription
) {
// task is already delivered to the current subscription, nothing to do
null
} else {
// create task information and set up the reason
lockedTask.toTaskInformation().withReason(TaskInformation.CREATE)
}
if (taskInformation != null) {
subscriptionRepository.activateSubscriptionForTask(lockedTask.id, activeSubscription)
val variables = lockedTask.variables.filterBySubscription(activeSubscription)
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-031: delivering service task ${lockedTask.id}." }
activeSubscription.action.accept(taskInformation, variables)
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-032: successfully delivered service task ${lockedTask.id}." }
} else {
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-041: skipping task ${lockedTask.id} since it is unchanged." }
}
// remove from already delivered
deliveredTaskIds.remove(lockedTask.id)
} catch (e: Exception) {
val jobRetries: Int = lockedTask.retries?.minus(1) ?: retries
logger.error { "PROCESS-ENGINE-C7-EMBEDDED-033: failing delivering task ${lockedTask.id}: ${e.message}" }
externalTaskService.handleFailure(lockedTask.id, workerId, e.message, jobRetries, retryTimeoutInSeconds * 1000)
subscriptionRepository.deactivateSubscriptionForTask(taskId = lockedTask.id)
logger.error { "PROCESS-ENGINE-C7-EMBEDDED-034: successfully failed delivering task ${lockedTask.id}: ${e.message}" }
}
}
}
}.forEach { taskExecutionFuture ->
taskExecutionFuture.get()
} finally {
metrics.recordTaskExecutionTime(lockedTask.topicName!!, Duration.between(start, OffsetDateTime.now()))
}
} else {
metrics.incrementDroppedTasksCounter(lockedTask.topicName!!, EXPIRED_WHILE_IN_QUEUE)
}

}

internal fun cleanUpTerminatedTasks() {

// retrieve external tasks locked for configured worker id
val stillLockedTasks = externalTaskService.createExternalTaskQuery()
.workerId(workerId)
.locked()
.list()

val stillLockedTaskIds = stillLockedTasks.map { dto -> dto.id!! }.toSet()
stillLockedTasksGauge.set(stillLockedTaskIds.size)

// now we removed all still existing task ids from the list of already delivered
// the remaining tasks doesn't exist in the engine, lets handle them
deliveredTaskIds.parallelStream().map { taskId ->
executorService.submit {
// deactivate active subscription and handle termination
subscriptionRepository.deactivateSubscriptionForTask(taskId)?.termination?.accept(
TaskInformation(
taskId,
emptyMap()
).withReason(TaskInformation.DELETE)
)
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-043: deactivating $taskId, task is gone." }
val deliveredTaskIdsMissingInEngine =
subscriptionRepository.getDeliveredTaskIds(TaskType.EXTERNAL)
.toMutableSet()
.apply {
removeAll(stillLockedTaskIds)
}
.toList()
.let {
it.subList(0, it.size.coerceAtMost(executor.queue.remainingCapacity())) // make sure the executor can accept our callable
}
}.forEach { taskTerminationFuture -> taskTerminationFuture.get() }

} else {
logger.trace { "PROCESS-ENGINE-C7-EMBEDDED-035: Pull external tasks disabled because of no active subscriptions" }
// now we removed all still existing task ids from the list of already delivered
// the remaining tasks don't exist in the engine, lets handle them
val taskTerminationHandlerCallables = deliveredTaskIdsMissingInEngine.map { createTaskTerminationHandlerCallable(it) }

taskTerminationHandlerCallables
.map { executor.submit(it) }
.forEach { it.get() }
}

internal fun createTaskTerminationHandlerCallable(taskId: String): Callable<Unit> = Callable {
// deactivate active subscription and handle termination
val taskSubscriptionHandle = subscriptionRepository.deactivateSubscriptionForTask(taskId)
if (taskSubscriptionHandle!=null) {
taskSubscriptionHandle.termination.accept(
TaskInformation(
taskId = taskId,
meta = emptyMap()
).withReason(TaskInformation.DELETE)
)
metrics.incrementTerminatedTasksCounter(taskSubscriptionHandle.taskDescriptionKey ?: "?")
}
}


private fun ExternalTaskQueryBuilder.forSubscriptions(subscriptions: List<TaskSubscriptionHandle>): ExternalTaskQueryBuilder {
subscriptions
.filter { it.taskDescriptionKey != null }
.filter { it.taskDescriptionKey!=null }
.distinctBy { it.taskDescriptionKey }
.forEach { subscription ->
val lockDurationInMilliseconds = getLockDurationForSubscription(subscription)
Expand All @@ -123,21 +207,28 @@ class EmbeddedPullServiceTaskDelivery(
return customLockDuration?.toLong() ?: (lockDurationInSeconds * 1000)
}

private fun TaskSubscriptionHandle.matches(task: LockedExternalTask): Boolean {
return this.taskType == TaskType.EXTERNAL
&& (this.taskDescriptionKey == null || this.taskDescriptionKey == task.topicName)
&& this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == task.executionId
CommonRestrictions.ACTIVITY_ID -> it.value == task.activityId
CommonRestrictions.BUSINESS_KEY -> it.value == task.businessKey
CommonRestrictions.TENANT_ID -> it.value == task.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == task.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_KEY -> it.value == task.processDefinitionKey
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == task.processDefinitionId
CommonRestrictions.PROCESS_DEFINITION_VERSION_TAG -> it.value == task.processDefinitionVersionTag
else -> false
internal fun TaskSubscriptionHandle.matches(task: LockedExternalTask): Boolean {
return this.taskType==TaskType.EXTERNAL
&& (this.taskDescriptionKey==null || this.taskDescriptionKey==task.topicName)
&& this.restrictions
.minus( // ignore some restrictions which are not relevant for external tasks
"workerLockDurationInMilliseconds"
)
.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value==task.executionId
CommonRestrictions.ACTIVITY_ID -> it.value==task.activityId
CommonRestrictions.BUSINESS_KEY -> it.value==task.businessKey
CommonRestrictions.TENANT_ID -> it.value==task.tenantId
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value==task.processInstanceId
CommonRestrictions.PROCESS_DEFINITION_KEY -> it.value==task.processDefinitionKey
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value==task.processDefinitionId
CommonRestrictions.PROCESS_DEFINITION_VERSION_TAG -> it.value==task.processDefinitionVersionTag
else -> {
logger.debug { "PROCESS-ENGINE-C7-EMBEDDED-041: Unknown restriction key: ${it.key}" }
false
}
}
}
}
}
}
Loading