Skip to content

Commit d9fd82d

Browse files
committed
feat: Add a service to run maintenance jobs
Add an initial implementation of the `MaintenanceService` to run maintenance jobs. This is required to run the package deduplication which will be added in the next commit. This first version is sufficient for the package deduplication but it has several shortcoming that will have to be addressed later: * The service runs on core, on the long term maintenance jobs should run on a separate node. * It only supports jobs which run exactly once, support for running jobs multiple times or on a schedule is missing. * The mechanism to prevent that a job is executed in parallel is based is based on a heuristic and needs to be replaced with a more sophisticated approach. Signed-off-by: Martin Nonnenmacher <[email protected]>
1 parent 00da5c3 commit d9fd82d

File tree

5 files changed

+373
-0
lines changed

5 files changed

+373
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
plugins {
21+
// Apply precompiled plugins.
22+
id("ort-server-kotlin-jvm-conventions")
23+
id("ort-server-publication-conventions")
24+
25+
// Apply third-party plugins.
26+
alias(libs.plugins.kotlinSerialization)
27+
}
28+
29+
group = "org.eclipse.apoapsis.ortserver.services"
30+
31+
dependencies {
32+
api(projects.model)
33+
34+
implementation(projects.dao)
35+
implementation(projects.utils.logging)
36+
37+
implementation(libs.kotlinxSerializationJson)
38+
39+
runtimeOnly(libs.logback)
40+
41+
testImplementation(testFixtures(projects.dao))
42+
testImplementation(projects.utils.test)
43+
44+
testImplementation(libs.kotestAssertionsCore)
45+
testImplementation(libs.kotestRunnerJunit5)
46+
testImplementation(libs.mockk)
47+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
23+
import org.eclipse.apoapsis.ortserver.utils.logging.withMdcContext
24+
25+
abstract class MaintenanceJob {
26+
abstract val name: String
27+
28+
var active = false
29+
30+
suspend fun start(service: MaintenanceService, jobData: MaintenanceJobData) {
31+
active = true
32+
33+
try {
34+
withMdcContext("maintenanceJob" to name) {
35+
execute(service, jobData)
36+
}
37+
} finally {
38+
active = false
39+
}
40+
}
41+
42+
abstract suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData)
43+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import java.sql.Connection
23+
24+
import kotlin.time.Duration
25+
import kotlin.time.Duration.Companion.minutes
26+
27+
import kotlinx.coroutines.Dispatchers
28+
import kotlinx.coroutines.delay
29+
import kotlinx.coroutines.launch
30+
import kotlinx.coroutines.sync.Mutex
31+
import kotlinx.coroutines.sync.withLock
32+
import kotlinx.coroutines.withContext
33+
import kotlinx.datetime.Clock
34+
import kotlinx.serialization.json.JsonObject
35+
36+
import org.eclipse.apoapsis.ortserver.dao.blockingQuery
37+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobDao
38+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobsTable
39+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
40+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
41+
42+
import org.jetbrains.exposed.sql.Database
43+
import org.jetbrains.exposed.sql.and
44+
45+
import org.slf4j.LoggerFactory
46+
47+
private val logger = LoggerFactory.getLogger(MaintenanceService::class.java)
48+
49+
class MaintenanceService(private val db: Database, private val updateInterval: Duration = 5.minutes) {
50+
private val jobs: MutableList<MaintenanceJob> = mutableListOf()
51+
private val mutex = Mutex()
52+
53+
suspend fun addJob(job: MaintenanceJob) {
54+
mutex.withLock { jobs += job }
55+
}
56+
57+
suspend fun run() {
58+
logger.info("Starting maintenance service.")
59+
60+
withContext(Dispatchers.IO) {
61+
createJobs()
62+
63+
var uncompletedJobs = getUncompletedJobs()
64+
65+
while (uncompletedJobs.isNotEmpty()) {
66+
// Check which jobs need to be started.
67+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
68+
jobs.filterNot { it.active }.forEach { job ->
69+
logger.info("Checking if maintenance job '${job.name}' needs to be started.")
70+
val jobData = MaintenanceJobDao.find { MaintenanceJobsTable.name eq job.name }.firstOrNull()
71+
72+
if (jobData == null) {
73+
logger.warn("Could not find job '${job.name}' in the database.")
74+
return@forEach
75+
}
76+
77+
if (jobData.status !in MaintenanceJobStatus.uncompletedStates) {
78+
logger.info("Not starting maintenance job '${job.name}' as it is already completed.")
79+
return@forEach
80+
}
81+
82+
// Only start the job if it was not updated in the last five minutes. This heuristic needs
83+
// to be replaced with a more reliable mechanism to find out if the job is already running.
84+
if (jobData.updatedAt?.let { it > Clock.System.now() - 5.minutes } == true) {
85+
logger.info("Not starting maintenance job '${job.name}' as it is already running.")
86+
return@forEach
87+
}
88+
89+
logger.info("Starting maintenance job '${job.name}'.")
90+
91+
val jobDataModel = jobData.mapToModel()
92+
93+
launch {
94+
job.start(this@MaintenanceService, jobDataModel)
95+
}
96+
}
97+
}
98+
99+
delay(updateInterval)
100+
101+
uncompletedJobs = getUncompletedJobs()
102+
}
103+
104+
logger.info("All maintenance jobs have been completed.")
105+
}
106+
}
107+
108+
/**
109+
* Create entries in the database for all jobs that are not yet present.
110+
*/
111+
private fun createJobs() {
112+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
113+
val existingJobs = MaintenanceJobDao.all()
114+
115+
jobs.forEach { job ->
116+
if (existingJobs.none { it.name == job.name }) {
117+
logger.info("Creating maintenance job '${job.name}'.")
118+
119+
MaintenanceJobDao.new {
120+
name = job.name
121+
status = MaintenanceJobStatus.STARTED
122+
startedAt = Clock.System.now()
123+
}
124+
}
125+
}
126+
}
127+
}
128+
129+
private suspend fun getUncompletedJobs(): List<MaintenanceJobData> {
130+
return mutex.withLock {
131+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
132+
val jobNames = jobs.map { it.name }
133+
MaintenanceJobDao.find {
134+
MaintenanceJobsTable.name inList jobNames and
135+
(MaintenanceJobsTable.status inList MaintenanceJobStatus.uncompletedStates)
136+
}.map { it.mapToModel() }
137+
}
138+
}
139+
}
140+
141+
internal fun updateJob(id: Long, data: JsonObject? = null, status: MaintenanceJobStatus? = null) {
142+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
143+
val job = MaintenanceJobDao.findById(id)
144+
145+
if (job == null) {
146+
logger.warn("Could not find job with ID $id.")
147+
return@blockingQuery
148+
}
149+
150+
if (job.status.completed) {
151+
logger.warn("Job '${job.name}' with ID $id is already completed.")
152+
return@blockingQuery
153+
}
154+
155+
val verb = when (status) {
156+
MaintenanceJobStatus.FINISHED -> "Finish"
157+
MaintenanceJobStatus.FAILED -> "Fail"
158+
else -> "Update"
159+
}
160+
161+
logger.info("$verb job '${job.name}' with ID $id.")
162+
163+
job.data = data
164+
job.updatedAt = Clock.System.now()
165+
166+
if (status != null) {
167+
job.status = status
168+
if (status.completed) {
169+
job.finishedAt = Clock.System.now()
170+
}
171+
}
172+
}
173+
}
174+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import io.kotest.core.spec.style.WordSpec
23+
import io.kotest.matchers.collections.containExactlyInAnyOrder
24+
import io.kotest.matchers.should
25+
import io.kotest.matchers.shouldBe
26+
27+
import kotlin.time.Duration.Companion.milliseconds
28+
29+
import kotlinx.serialization.Serializable
30+
import kotlinx.serialization.json.Json
31+
import kotlinx.serialization.json.decodeFromJsonElement
32+
import kotlinx.serialization.json.encodeToJsonElement
33+
import kotlinx.serialization.json.jsonObject
34+
35+
import org.eclipse.apoapsis.ortserver.dao.findSingle
36+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobDao
37+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobsTable
38+
import org.eclipse.apoapsis.ortserver.dao.test.DatabaseTestExtension
39+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
40+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
41+
42+
import org.jetbrains.exposed.sql.transactions.transaction
43+
44+
class MaintenanceServiceTest : WordSpec({
45+
val dbExtension = extension(DatabaseTestExtension())
46+
47+
"run" should {
48+
"run the supplied jobs" {
49+
val service = MaintenanceService(dbExtension.db, 100.milliseconds)
50+
51+
(1..5).forEach { index ->
52+
service.addJob(object : MaintenanceJob() {
53+
override val name = "TestJob$index"
54+
override suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData) {
55+
service.updateJob(jobData.id, status = MaintenanceJobStatus.FINISHED)
56+
}
57+
})
58+
}
59+
60+
service.run()
61+
62+
transaction {
63+
val jobData = MaintenanceJobDao.all().map { it.mapToModel() }
64+
jobData.forEach { it.status shouldBe MaintenanceJobStatus.FINISHED }
65+
jobData.map { it.name } should
66+
containExactlyInAnyOrder("TestJob1", "TestJob2", "TestJob3", "TestJob4", "TestJob5")
67+
}
68+
}
69+
}
70+
71+
"updateJob" should {
72+
"update the job data" {
73+
val service = MaintenanceService(dbExtension.db, 100.milliseconds)
74+
val jobId = createJob()
75+
val jobData = JobData(42)
76+
77+
service.updateJob(jobId, data = Json.Default.encodeToJsonElement(jobData).jsonObject)
78+
79+
transaction {
80+
val jobDao = MaintenanceJobDao.findSingle { MaintenanceJobsTable.id eq jobId }
81+
jobDao.data?.let { Json.Default.decodeFromJsonElement<JobData>(it) } shouldBe jobData
82+
}
83+
}
84+
85+
"update the status" {
86+
val service = MaintenanceService(dbExtension.db, 100.milliseconds)
87+
val jobId = createJob()
88+
89+
service.updateJob(jobId, status = MaintenanceJobStatus.FAILED)
90+
91+
transaction {
92+
val jobDao = MaintenanceJobDao.findSingle { MaintenanceJobsTable.id eq jobId }
93+
jobDao.status shouldBe MaintenanceJobStatus.FAILED
94+
}
95+
}
96+
}
97+
})
98+
99+
@Serializable
100+
private data class JobData(val progress: Int)
101+
102+
private fun createJob() = transaction {
103+
MaintenanceJobDao.new {
104+
name = "TestJob"
105+
status = MaintenanceJobStatus.STARTED
106+
}.id.value
107+
}

settings.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ include(":secrets:vault")
4444
include(":services:authorization")
4545
include(":services:hierarchy")
4646
include(":services:infrastructure")
47+
include(":services:maintenance")
4748
include(":services:report-storage")
4849
include(":services:secret")
4950
include(":storage:database")
@@ -75,6 +76,7 @@ project(":secrets:spi").name = "secrets-spi"
7576
project(":services:authorization").name = "authorization-service"
7677
project(":services:hierarchy").name = "hierarchy-service"
7778
project(":services:infrastructure").name = "infrastructure-service"
79+
project(":services:maintenance").name = "maintenance-service"
7880
project(":services:report-storage").name = "report-storage-service"
7981
project(":services:secret").name = "secret-service"
8082
project(":storage:spi").name = "storage-spi"

0 commit comments

Comments
 (0)