diff --git a/.github/workflows/build-pull-request.yml b/.github/workflows/build-pull-request.yml index 04a79ca8..0b9ec428 100644 --- a/.github/workflows/build-pull-request.yml +++ b/.github/workflows/build-pull-request.yml @@ -13,10 +13,10 @@ jobs: - name: Checkout Repository uses: actions/checkout@v4 - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v4 with: - java-version: 11 + java-version: 17 distribution: 'temurin' - name: Detect modified subprojects diff --git a/.gitignore b/.gitignore index 0638f439..5b5bd574 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ out/ /.run/publish kafka and run example.run.xml /.kotlin /buildSrc/.kotlin +**/node_modules/ \ No newline at end of file diff --git a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt index 51c36899..2a6569e9 100644 --- a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt +++ b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt @@ -17,7 +17,6 @@ import kotlinx.kover.gradle.plugin.dsl.KoverReportExtension import org.gradle.api.NamedDomainObjectCollection import org.gradle.api.Plugin import org.gradle.api.Project -import org.gradle.api.tasks.testing.Test import org.gradle.api.tasks.wrapper.Wrapper import org.gradle.kotlin.dsl.DependencyHandlerScope import org.gradle.kotlin.dsl.findByType @@ -30,6 +29,7 @@ import org.gradle.plugins.ide.idea.model.IdeaModel import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension import org.jetbrains.kotlin.gradle.plugin.KotlinDependencyHandler import org.jetbrains.kotlin.gradle.plugin.KotlinSourceSet +import org.jetbrains.kotlin.gradle.targets.jvm.tasks.KotlinJvmTest import org.jlleitschuh.gradle.ktlint.KtlintExtension import org.jlleitschuh.gradle.ktlint.tasks.KtLintCheckTask import org.jlleitschuh.gradle.ktlint.tasks.KtLintFormatTask @@ -54,7 +54,6 @@ open class Conventions : Plugin { apply(project.plugin("ktlint")) } repositories { - mavenLocal() mavenCentral() maven { url = uri("https://maven.pkg.github.com/flaxoos/flax-gradle-plugins") @@ -120,12 +119,12 @@ open class Conventions : Plugin { tasks.named("build") { dependsOn(tasks.matching { it.name.matches(Regex("detekt(?!.*Baseline).*\\b(Main|Test)\\b\n")) }) } - tasks.withType(Test::class) { + tasks.withType { useJUnitPlatform() } tasks.withType { - gradleVersion = "8.3" + gradleVersion = "8.11" distributionType = Wrapper.DistributionType.BIN } extensions.findByType(KoverReportExtension::class)?.apply { diff --git a/gradle.properties b/gradle.properties index 0ffa05db..b98d72bb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ github.repository.name=extra-ktor-plugins kotlin.native.cacheKind.linuxX64=none kotlin.native.ignoreDisabledTargets=true gradle.publish.enable.module-metadata=true -version=2.1.3 +version=2.2.1 gpr.user=flaxoos org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=2g kotlin.mpp.applyDefaultHierarchyTemplate=false diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2d3c5a2c..43b97970 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,23 +1,23 @@ [versions] # Core technologies -kotlin = "2.0.20" -java = "11" -kotlinx-serialization = "1.7.3" -kotlinx-io = "0.3.0" -ksp = "1.9.10-1.0.13" +kotlin = "2.1.10" +java = "17" +kotlinx-serialization = "1.8.0" +kotlinx-io = "0.7.0" +ksp = "2.1.10-1.0.31" reactor = "3.7.4" # Web Framework -ktor = "3.0.0" +ktor = "3.1.1" # Android android = "8.1.0" # Database -h2 = "2.2.224" -postgres = "42.6.0" -exposed = "0.44.0" -mongodb = "5.2.0" +h2 = "2.3.232" +postgres = "42.7.5" +exposed = "0.60.0" +mongodb = "5.3.1" # Logging logback = "1.5.12" @@ -25,18 +25,17 @@ kotlin-logging = "5.1.0" logging_capabilities = "0.11.1" # Asynchronous and Concurrency -atomicfu = "0.25.0" -kotlinx-coroutines = "1.9.0" +atomicfu = "0.27.0" +kotlinx-coroutines = "1.10.1" # Testing -#kotest = "6.0.0.M1" -kotest = "5.9.1" -kotest-stable = "5.9.1" +kotest-6 = "6.0.0.M1" +kotest-5 = "5.9.1" kotest-test-containers = "2.0.2" mockk = "1.13.4" mockative = "2.0.1" kmock = "0.3.0-rc08" -testcontainers = "1.19.8" +testcontainers = "1.20.6" redis-testcontainers = "1.6.4" # Code Quality and Coverage @@ -47,7 +46,7 @@ koverBadge = "0.0.6" detekt = "1.23.1" # Date and Time -kotlinx-datetime = "0.4.0" +kotlinx-datetime = "0.6.2" # Functional Programming arrow = "1.2.4" @@ -74,8 +73,8 @@ gradle-release = "3.0.2" nexusPublish = "2.0.0" # Miscellaneous -krontab = "2.2.1" -uuid = "0.8.1" +krontab = "2.7.2" +uuid = "0.8.4" [libraries] # Core libraries @@ -128,11 +127,11 @@ mongodb-driver-kotlin-coroutine = { module = "org.mongodb:mongodb-driver-kotlin- mongodb-bson-kotlinx = { module = "org.mongodb:bson-kotlinx", version.ref = "mongodb" } # Testing libraries -kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" } -kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" } -kotest-framework-datatest = { module = "io.kotest:kotest-framework-datatest", version.ref = "kotest-stable" } -kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest-stable" } -kotest-framework-engine = { module = "io.kotest:kotest-framework-engine", version.ref = "kotest" } +kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest-6" } +kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest-6" } +kotest-framework-datatest = { module = "io.kotest:kotest-framework-datatest", version.ref = "kotest-5" } +kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest-6" } +kotest-framework-engine = { module = "io.kotest:kotest-framework-engine", version.ref = "kotest-6" } mockk = { module = "io.mockk:mockk", version.ref = "mockk" } mockk-agent-jvm = { module = "io.mockk:mockk-agent-jvm", version.ref = "mockk" } mockative = { module = "io.mockative:mockative", version.ref = "mockative" } @@ -170,7 +169,7 @@ kotlin-logging = { module = "io.github.oshai:kotlin-logging", version.ref = "kot loggingCapabilities-gradlePlugin = { module = "dev.jacomet.gradle.plugins:logging-capabilities", version.ref = "logging_capabilities" } # Testing plugins -kotestFrameworkMultiplatform-gradlePlugin = { module = "io.kotest:kotest-framework-multiplatform-plugin-gradle", version.ref = "kotest" } +kotestFrameworkMultiplatform-gradlePlugin = { module = "io.kotest:kotest-framework-multiplatform-plugin-gradle", version.ref = "kotest-6" } # Documentation dokka-gradlePlugin = { module = "org.jetbrains.dokka:dokka-gradle-plugin", version.ref = "dokka" } @@ -221,7 +220,7 @@ dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" } atomicfu = { id = "org.jetbrains.kotlinx.atomicfu", version.ref = "atomicfu" } # Testing -kotest = { id = "io.kotest.multiplatform", version.ref = "kotest" } +kotest = { id = "io.kotest.multiplatform", version.ref = "kotest-6" } # Ksp ksp = { id = "com.google.devtools.ksp", version.ref = "ksp" } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt index f5734988..98771924 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt @@ -51,7 +51,7 @@ public open class TaskSchedulingConfiguration { ) } - public fun addTaskManager(taskManagerConfiguration: TaskManagerConfiguration<*>) { + public fun addTaskManager(taskManagerConfiguration: TaskManagerConfiguration) { taskManagers.add { taskManagerConfiguration.createTaskManager(it) } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt index 455005c8..9f675036 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt @@ -5,11 +5,12 @@ import io.github.flaxoos.ktor.server.plugins.taskscheduling.tasks.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.server.application.Application import io.ktor.utils.io.core.Closeable -import korlibs.time.DateFormat +import korlibs.time.DateFormat.Companion.FORMAT2 import korlibs.time.DateTime +import korlibs.time.TimeFormat.Companion.DEFAULT_FORMAT import korlibs.time.parseLocal -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch import kotlin.jvm.JvmInline private val logger = KotlinLogging.logger { } @@ -22,9 +23,10 @@ public abstract class TaskManager : C task: Task, executionTime: DateTime, ) { - val runs = - task.concurrencyRange().map { concurrencyIndex -> - application.async { + task + .concurrencyRange() + .map { concurrencyIndex -> + application.launch { logger.trace { "${application.host()}: Attempting task execution at ${executionTime.format2()} for ${task.name} - $concurrencyIndex" } @@ -42,10 +44,7 @@ public abstract class TaskManager : C null } } - } - runs.awaitAll().filterNotNull().forEach { - markExecuted(it) - } + }.joinAll() } /** @@ -62,17 +61,14 @@ public abstract class TaskManager : C concurrencyIndex: Int, ): TASK_EXECUTION_TOKEN? - /** - * Mark this task as, provided a key was acquired - */ - public abstract suspend fun markExecuted(key: TASK_EXECUTION_TOKEN) - public companion object { public fun Application.host(): String = "Host ${environment.config.property("ktor.deployment.host").getString()}" - public fun DateTime.format2(): String = format(DateFormat.FORMAT2) + public fun DateTime.format2(): String = format(FORMAT2) + + public fun DateTime.formatTime(): String = time.format(DEFAULT_FORMAT) - public fun String.format2ToDateTime(): DateTime = DateFormat.FORMAT2.parseLocal(this) + public fun String.format2ToDateTime(): DateTime = FORMAT2.parseLocal(this) } } @@ -80,7 +76,7 @@ public abstract class TaskManager : C * Configuration for [TaskManager] */ @TaskSchedulingDsl -public abstract class TaskManagerConfiguration { +public abstract class TaskManagerConfiguration { /** * The name of the task manager, will be used to identify the task manager when assigning tasks to it * if none is provided, it will be considered the default one. only one default task manager is allowed. diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt index f6838e91..8de25efe 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt @@ -14,10 +14,6 @@ public abstract class TaskLockManager : TaskManager : TaskManager : TaskManagerConfiguration() +public abstract class TaskLockManagerConfiguration : TaskManagerConfiguration() diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/DatabaseTaskLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/DatabaseTaskLockManager.kt index 93b407f6..a90eb37e 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/DatabaseTaskLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/DatabaseTaskLockManager.kt @@ -85,5 +85,4 @@ public interface DatabaseTaskLock : TaskLock { } @TaskSchedulingDsl -public abstract class DatabaseTaskLockManagerConfiguration : - TaskLockManagerConfiguration() +public abstract class DatabaseTaskLockManagerConfiguration : TaskLockManagerConfiguration() diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt index 3ab87019..ba918b41 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt @@ -1,25 +1,30 @@ package io.github.flaxoos.ktor.server.plugins.taskscheduling import dev.inmo.krontab.builder.SchedulerBuilder -import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.TaskManager.Companion.format2 +import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.TaskManager.Companion.formatTime import io.github.oshai.kotlinlogging.KotlinLogging -import io.kotest.assertions.fail +import io.kotest.assertions.retry +import io.kotest.assertions.withClue import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.scopes.ContainerScope import io.kotest.datatest.withData +import io.kotest.inspectors.forAll import io.kotest.matchers.ints.shouldBeGreaterThan +import io.kotest.matchers.ints.shouldBeLessThanOrEqual +import io.kotest.matchers.shouldBe import io.ktor.server.application.log import io.ktor.server.config.MapApplicationConfig import io.ktor.server.config.mergeWith import io.ktor.server.testing.TestApplication import io.ktor.server.testing.TestApplicationBuilder import korlibs.time.DateTime +import korlibs.time.minutes +import kotlin.math.pow +import kotlin.time.Duration.Companion.milliseconds import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch -import kotlin.math.pow -import kotlin.time.Duration.Companion.milliseconds @Suppress("UNUSED") val logger = KotlinLogging.logger { } @@ -56,46 +61,76 @@ abstract class TaskSchedulingPluginTest : FunSpec() { initial = frequenciesExponentialSeriesInitialMs, n = frequenciesExponentialSeriesN, ) + withData(nameFn = { "Freq: $it ms" }, frequencies) { freqMs -> withData(nameFn = { "Task count = $it" }, taskCounts) { taskCount -> withData(nameFn = { "Concurrency = $it" }, concurrencyValues) { concurrency -> - coroutineScope { - val taskLogsAndApplications = - setupApplicationEngines( - taskSchedulingConfiguration = taskSchedulingConfiguration, - count = engineCount, - freqMs = freqMs.toLong(), - taskCount = taskCount.toShort(), - concurrency = concurrency.toShort(), - kronTaskSchedule = kronTaskSchedule(freqMs), - ).map { it to launch { it.second.start() } } - .also { it.map { engineAndJob -> engineAndJob.second }.joinAll() } - .map { it.first } - - delay((freqMs + executionBufferMs).milliseconds * executions) - taskLogsAndApplications.forEach { launch { it.second.stop() } } - - try { - with(taskLogsAndApplications.map { it.first }.flatten()) { - size shouldBeGreaterThan executions - 2 - with(groupingBy { it }.eachCount()) { - val errors = - this.mapNotNull { - val expectedExecutions = concurrency * taskCount - if (it.value > expectedExecutions) { - "${it.key.format2()} was executed ${it.value} times, expected no more than $expectedExecutions times" - } else { - null + retry(3, 2.minutes) { + coroutineScope { + val taskExecutionsAndApplications = + setupApplicationEngines( + taskSchedulingConfiguration = taskSchedulingConfiguration, + count = engineCount, + freqMs = freqMs.toLong(), + taskCount = taskCount.toShort(), + concurrency = concurrency.toShort(), + kronTaskSchedule = kronTaskSchedule(freqMs), + ).map { executionsAndApp -> + executionsAndApp to + launch { + val app = executionsAndApp.second + app.start() } - } - if (errors.isNotEmpty()) { - fail(errors.joinToString("\n")) + }.also { appsAndJobs -> + // wait for all app engines to start + appsAndJobs.map { appAndJob -> appAndJob.second }.joinAll() + }.map { + // don't need to remember the job + it.first + } + + delay(freqMs.milliseconds * executions) + delay(executionBufferMs.milliseconds) + taskExecutionsAndApplications + .map { (_, app) -> + launch { + app.stop() } + }.joinAll() + + val totalExecutions = taskExecutionsAndApplications.sumOf { it.first.size } + val totalExpectedExecutionsMinusLastRound = taskCount * concurrency * (executions - 1) + totalExecutions shouldBeGreaterThan totalExpectedExecutionsMinusLastRound + + try { + taskExecutionsAndApplications.map { it.first }.flatten().let { records -> + records + .groupBy { it.taskName to it.executionTime } + .toSortedMap { (_, a), (_, b) -> + a.compareTo(b) + }.let { executionsPerTaskAndTime -> + val (_, lastTime) = executionsPerTaskAndTime.lastKey() + executionsPerTaskAndTime.forAll { (pair, executions) -> + val (taskName, executionTime) = pair + withClue( + "\n$taskName - ${executionTime.formatTime()} was executed ${executions.size} times instead of $concurrency: \n\t${ + executions.joinToString("\n\t") + }", + ) { + if (executionTime == lastTime) { + // The last round might miss executions due to the server shutting down + executions.size shouldBeLessThanOrEqual concurrency + } else { + executions.size shouldBe concurrency + } + } + } + } } + } finally { + delay(1000) + clean() } - } finally { - delay(1000) - clean() } } } @@ -103,6 +138,31 @@ abstract class TaskSchedulingPluginTest : FunSpec() { } } + data class ExecutionRecord( + val taskName: String, + val ktorHost: String, + val executionTime: DateTime, + ) { + override fun toString(): String = + "ExecutionRecord(taskName='$taskName', ktorHost='$ktorHost', executionTime=${executionTime.formatTime()})" + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is ExecutionRecord) return false + + if (taskName != other.taskName) return false + if (executionTime != other.executionTime) return false + + return true + } + + override fun hashCode(): Int { + var result = taskName.hashCode() + result = 31 * result + executionTime.hashCode() + return result + } + } + private fun setupApplicationEngines( taskSchedulingConfiguration: TaskSchedulingConfiguration.(TaskFreqMs) -> Unit, count: Int, @@ -110,38 +170,39 @@ abstract class TaskSchedulingPluginTest : FunSpec() { taskCount: Short = 1, concurrency: Short = 1, kronTaskSchedule: SchedulerBuilder.() -> Unit, - ) = (1..count).map { ktorHost -> - val executionRecords = mutableListOf() - val block: TestApplicationBuilder.() -> Unit = { - environment { - config = config.mergeWith(MapApplicationConfig("ktor.deployment.host" to ktorHost.toString())) - } - install(TaskScheduling) { - taskSchedulingConfiguration(TaskFreqMs(freqMs)) - - for (i in 1 until taskCount + 1) { - val taskName = "Test Kron Task: $i" - logger.info { "Adding task: $taskName" } - task { - name = taskName - task = { taskExecutionTime -> - executionRecords.add(taskExecutionTime) - log.info("Host: $ktorHost executing task $taskName at ${taskExecutionTime.format2()}") + ): List, TestApplication>> = + (1..count).map { ktorHost -> + val executionRecords = mutableListOf() + val block: TestApplicationBuilder.() -> Unit = { + environment { + config = config.mergeWith(MapApplicationConfig("ktor.deployment.host" to ktorHost.toString())) + } + install(TaskScheduling) { + taskSchedulingConfiguration(TaskFreqMs(freqMs)) + + for (i in 1 until taskCount + 1) { + val taskName = "Test Kron Task: $i" + logger.info { "Adding task: $taskName" } + task { + name = taskName + task = { taskExecutionTime -> + executionRecords.add(ExecutionRecord(taskName, ktorHost.toString(), taskExecutionTime)) + log.info("Host: $ktorHost executing task $taskName at ${taskExecutionTime.formatTime()}") + } + kronSchedule = kronTaskSchedule + this.concurrency = concurrency.toInt() } - kronSchedule = kronTaskSchedule - this.concurrency = concurrency.toInt() } } } + executionRecords to + TestApplication { + engine { + shutdownGracePeriod = freqMs * 10 + } + block() + } } - executionRecords to - TestApplication { - engine { - shutdownGracePeriod = freqMs * 10 - } - block() - } - } private fun exponentialScheduleGenerator( initial: Short, diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt index 8082a44a..a6b5eb1f 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt @@ -6,22 +6,21 @@ import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.TaskManager import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.TaskManagerConfiguration import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.TaskManagerConfiguration.TaskManagerName.Companion.toTaskManagerName import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.lock.database.DefaultTaskLockTable.lockedAt -import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.lock.database.DefaultTaskLockTable.name import io.github.flaxoos.ktor.server.plugins.taskscheduling.tasks.Task import io.ktor.server.application.Application import korlibs.time.DateTime import kotlinx.datetime.Instant import org.jetbrains.exposed.sql.Column import org.jetbrains.exposed.sql.Database -import org.jetbrains.exposed.sql.LiteralOp import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.SqlExpressionBuilder.neq +import org.jetbrains.exposed.sql.SqlExpressionBuilder.isNull +import org.jetbrains.exposed.sql.SqlExpressionBuilder.less import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insertIgnore -import org.jetbrains.exposed.sql.kotlin.datetime.KotlinInstantColumnType import org.jetbrains.exposed.sql.kotlin.datetime.timestamp +import org.jetbrains.exposed.sql.or import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.update @@ -59,12 +58,11 @@ public class JdbcLockManager( database, transactionIsolation = Connection.TRANSACTION_READ_COMMITTED, ) { - repetitionAttempts = 0 - debug = true + maxAttempts = 1 taskLockTable.insertIgnore { it[name] = task.name it[concurrencyIndex] = taskConcurrencyIndex - it[lockedAt] = Instant.fromEpochMilliseconds(0) + it[lockedAt] = null } }.insertedCount == 1 @@ -78,17 +76,16 @@ public class JdbcLockManager( db = database, transactionIsolation = Connection.TRANSACTION_READ_COMMITTED, ) { - val taskExecutionInstant = Instant.fromEpochMilliseconds(executionTime.unixMillisLong) taskLockTable.update( where = { selectClause( task, concurrencyIndex, - taskExecutionInstant, + executionTime, ) }, ) { - it[lockedAt] = taskExecutionInstant + it[lockedAt] = executionTime.toInstant() it[taskLockTable.concurrencyIndex] = concurrencyIndex } }.let { @@ -103,16 +100,17 @@ public class JdbcLockManager( } } - override suspend fun releaseLockKey(key: JdbcTaskLock) {} - override fun close() {} private fun selectClause( task: Task, concurrencyIndex: Int, - taskExecutionInstant: Instant, - ) = (taskLockTable.name eq task.name and taskLockTable.concurrencyIndex.eq(concurrencyIndex)) and - lockedAt.neq(LiteralOp(KotlinInstantColumnType(), taskExecutionInstant)) + executionTime: DateTime, + ) = ( + taskLockTable.name eq task.name and + taskLockTable.concurrencyIndex.eq(concurrencyIndex) + ) and + (lockedAt.isNull() or lockedAt.less(executionTime.toInstant())) } public class JdbcTaskLock( @@ -128,19 +126,19 @@ public abstract class ExposedTaskLockTable( ) : Table(tableName) { public abstract val name: Column public abstract val concurrencyIndex: Column - public abstract val lockedAt: Column + public abstract val lockedAt: Column } public object DefaultTaskLockTable : ExposedTaskLockTable("task_locks") { override val name: Column = text("_name") override val concurrencyIndex: Column = integer("concurrency_index") - override val lockedAt: Column = timestamp("locked_at").index() + override val lockedAt: Column = timestamp("locked_at").nullable().index() override val primaryKey: PrimaryKey = PrimaryKey(firstColumn = name, concurrencyIndex, name = "pk_task_locks") } @TaskSchedulingDsl -public class JdbcJobLockManagerConfiguration : DatabaseTaskLockManagerConfiguration() { +public class JdbcJobLockManagerConfiguration : DatabaseTaskLockManagerConfiguration() { public var database: Database by Delegates.notNull() override fun createTaskManager(application: Application): JdbcLockManager = @@ -170,3 +168,5 @@ public fun TaskSchedulingConfiguration.jdbc( }, ) } + +private fun DateTime.toInstant() = Instant.fromEpochMilliseconds(unixMillisLong) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/taskscheduling/JdbcLockManagerTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/JdbcLockManagerTest.kt similarity index 100% rename from ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/taskscheduling/JdbcLockManagerTest.kt rename to ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/JdbcLockManagerTest.kt diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt index 880ebbdf..0d7c019c 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt @@ -21,6 +21,7 @@ import io.github.flaxoos.ktor.server.plugins.taskscheduling.managers.TaskManager import io.github.flaxoos.ktor.server.plugins.taskscheduling.tasks.Task import io.ktor.server.application.Application import korlibs.time.DateTime +import kotlin.properties.Delegates import kotlinx.coroutines.flow.firstOrNull import org.bson.BsonReader import org.bson.BsonWriter @@ -28,7 +29,6 @@ import org.bson.codecs.Codec import org.bson.codecs.DecoderContext import org.bson.codecs.EncoderContext import org.bson.codecs.configuration.CodecRegistries -import kotlin.properties.Delegates /** * An implementation of [DatabaseTaskLockManager] using MongoDB as the lock store @@ -61,8 +61,8 @@ public class MongoDBLockManager( Filters.and( Filters.eq(MongoDbTaskLock::name.name, task.name), Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex), + Filters.lt(MongoDbTaskLock::lockedAt.name, executionTime), ), - Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime), ) val updates = Updates.combine( @@ -86,6 +86,7 @@ public class MongoDBLockManager( Indexes.compoundIndex( Indexes.ascending(MongoDbTaskLock::name.name), Indexes.ascending(MongoDbTaskLock::concurrencyIndex.name), + Indexes.ascending(MongoDbTaskLock::lockedAt.name), ), IndexOptions().unique(true), ) @@ -103,6 +104,7 @@ public class MongoDBLockManager( Filters.and( Filters.eq(MongoDbTaskLock::name.name, task.name), Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex), + Filters.eq(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH), ), ).firstOrNull() ?.let { false } ?: runCatching { @@ -127,8 +129,6 @@ public class MongoDBLockManager( } } - protected override suspend fun releaseLockKey(key: MongoDbTaskLock) {} - override fun close() { client.close() } @@ -143,7 +143,7 @@ public class MongoDBLockManager( .build() } -public data class MongoDbTaskLock( +public class MongoDbTaskLock( override val name: String, override val concurrencyIndex: Int, override var lockedAt: DateTime, @@ -221,7 +221,7 @@ internal val codecRegistry = ) @TaskSchedulingDsl -public class MongoDBJobLockManagerConfiguration : DatabaseTaskLockManagerConfiguration() { +public class MongoDBJobLockManagerConfiguration : DatabaseTaskLockManagerConfiguration() { public var client: MongoClient by Delegates.notNull() public var databaseName: String by Delegates.notNull() diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmTest/kotlin/taskscheduling/MongodbLockManagerTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmTest/kotlin/taskscheduling/MongodbLockManagerTest.kt index cf8cde99..7ca5ad52 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmTest/kotlin/taskscheduling/MongodbLockManagerTest.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmTest/kotlin/taskscheduling/MongodbLockManagerTest.kt @@ -24,7 +24,9 @@ class MongodbLockManagerTest : TaskSchedulingPluginTest() { init { context("mongodb lock manager") { - testTaskScheduling { + testTaskScheduling( + executionBufferMs = 1000, + ) { mongoDb { databaseName = "test" client = mongoClient diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt index ae7ed2a2..f92d7515 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt @@ -13,8 +13,8 @@ import io.github.flaxoos.ktor.server.plugins.taskscheduling.tasks.TaskLock import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.server.application.Application import korlibs.time.DateTime -import kotlinx.coroutines.runBlocking import kotlin.jvm.JvmInline +import kotlinx.coroutines.runBlocking internal val logger = KotlinLogging.logger { } @@ -28,29 +28,30 @@ public class RedisLockManager( private val lockExpirationMs: Long, private val connectionAcquisitionTimeoutMs: Long, ) : TaskLockManager() { + override suspend fun init(tasks: List) {} override suspend fun acquireLockKey( task: Task, executionTime: DateTime, concurrencyIndex: Int, - ): RedisTaskLock? = - connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection -> - logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" } - val key = task.toRedisLockKey(executionTime, concurrencyIndex) - if (redisConnection.setNx(key.value, "1", lockExpirationMs) != null) { + ): RedisTaskLock? { + logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" } + val key = task.toRedisLockKey(concurrencyIndex, executionTime) + return connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection -> + if (redisConnection.setNx(key.value, executionTime.format2(), lockExpirationMs) != null) { logger.debug { "${application.host()}: ${executionTime.format2()}: Acquired lock for ${task.name} - $concurrencyIndex" } return@withConnection key + } else { + return@withConnection null } - null } ?: run { logger.debug { "${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex" } null } - - override suspend fun releaseLockKey(key: RedisTaskLock) {} + } override fun close() { runBlocking { @@ -70,18 +71,26 @@ public value class RedisTaskLock internal constructor( public val value: String, ) : TaskLock { public companion object { - private const val DELIMITER = "-" + private const val DELIMITER = "-***-" public fun Task.toRedisLockKey( - executionTime: DateTime, concurrencyIndex: Int, - ): RedisTaskLock = RedisTaskLock("${name.replace(DELIMITER, "_")}-$concurrencyIndex at ${executionTime.format2()}") + executionTime: DateTime, + ): RedisTaskLock = + RedisTaskLock( + "${ + name.replace( + DELIMITER, + "_", + ) + }$DELIMITER$concurrencyIndex$DELIMITER${executionTime.format2()}", + ) } override val name: String - get() = value.split(DELIMITER, limit = 2)[0] + get() = value.split(DELIMITER, limit = 3)[0] override val concurrencyIndex: Int - get() = value.split(DELIMITER, limit = 2)[1].toInt() + get() = value.split(DELIMITER, limit = 3)[1].toInt() } @TaskSchedulingDsl @@ -118,7 +127,7 @@ public class RedisTaskLockManagerConfiguration( * The timeout for trying to get a connection to from the pool */ public var connectionAcquisitionTimeoutMs: Long = 100, -) : TaskLockManagerConfiguration() { +) : TaskLockManagerConfiguration() { override fun createTaskManager(application: Application): RedisLockManager = RedisLockManager( name = name.toTaskManagerName(),