Skip to content

Commit 74c7dad

Browse files
committed
feat: Add a service to run maintenance jobs
Add an initial implementation of the `MaintenanceService` to run maintenance jobs. This is required to run the package deduplication which will be added in the next commit. This first version is sufficient for the package deduplication but it has several shortcoming that will have to be addressed later: * The service runs on core, on the long term maintenance jobs should run on a separate node. * It only supports jobs which run exactly once, support for running jobs multiple times or on a schedule is missing. * The mechanism to prevent that a job is executed in parallel is based is based on a heuristic and needs to be replaced with a more sophisticated approach. Signed-off-by: Martin Nonnenmacher <[email protected]>
1 parent 00da5c3 commit 74c7dad

File tree

5 files changed

+393
-0
lines changed

5 files changed

+393
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
plugins {
21+
// Apply precompiled plugins.
22+
id("ort-server-kotlin-jvm-conventions")
23+
id("ort-server-publication-conventions")
24+
25+
// Apply third-party plugins.
26+
alias(libs.plugins.kotlinSerialization)
27+
}
28+
29+
group = "org.eclipse.apoapsis.ortserver.services"
30+
31+
dependencies {
32+
api(projects.model)
33+
34+
implementation(projects.dao)
35+
implementation(projects.utils.logging)
36+
37+
implementation(libs.kotlinxSerializationJson)
38+
39+
runtimeOnly(libs.logback)
40+
41+
testImplementation(testFixtures(projects.dao))
42+
testImplementation(projects.utils.test)
43+
44+
testImplementation(libs.kotestAssertionsCore)
45+
testImplementation(libs.kotestRunnerJunit5)
46+
testImplementation(libs.mockk)
47+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
23+
import org.eclipse.apoapsis.ortserver.utils.logging.withMdcContext
24+
25+
abstract class MaintenanceJob {
26+
abstract val name: String
27+
28+
var active = false
29+
30+
suspend fun start(service: MaintenanceService, jobData: MaintenanceJobData) {
31+
active = true
32+
33+
try {
34+
withMdcContext("maintenanceJob" to name) {
35+
execute(service, jobData)
36+
}
37+
} finally {
38+
active = false
39+
}
40+
}
41+
42+
abstract suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData)
43+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import java.sql.Connection
23+
24+
import kotlin.time.Duration
25+
import kotlin.time.Duration.Companion.minutes
26+
27+
import kotlinx.coroutines.Dispatchers
28+
import kotlinx.coroutines.delay
29+
import kotlinx.coroutines.launch
30+
import kotlinx.coroutines.sync.Mutex
31+
import kotlinx.coroutines.sync.withLock
32+
import kotlinx.coroutines.withContext
33+
import kotlinx.datetime.Clock
34+
import kotlinx.serialization.json.JsonObject
35+
36+
import org.eclipse.apoapsis.ortserver.dao.blockingQuery
37+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobDao
38+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobsTable
39+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
40+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
41+
42+
import org.jetbrains.exposed.sql.Database
43+
import org.jetbrains.exposed.sql.and
44+
45+
import org.slf4j.LoggerFactory
46+
47+
private val logger = LoggerFactory.getLogger(MaintenanceService::class.java)
48+
49+
/**
50+
* A service to run maintenance jobs. The current implementation has the following limitations:
51+
*
52+
* - The service runs on core, on the long-term maintenance jobs should run on a separate node.
53+
* - It only supports jobs which run exactly once, support for running jobs multiple times or on a schedule is missing.
54+
* - The mechanism to prevent that a job is executed in parallel is based on a heuristic and needs to be
55+
* replaced with a more sophisticated approach.
56+
*/
57+
class MaintenanceService(private val db: Database, private val updateInterval: Duration = 5.minutes) {
58+
private val jobs: MutableList<MaintenanceJob> = mutableListOf()
59+
private val mutex = Mutex()
60+
61+
/**
62+
* Add a [job] to the list of jobs to run. This function should be called before [run].
63+
*/
64+
suspend fun addJob(job: MaintenanceJob) {
65+
mutex.withLock { jobs += job }
66+
}
67+
68+
/**
69+
* Run the previously added jobs. This function is blocking and should be called from a coroutine.
70+
*/
71+
suspend fun run() {
72+
logger.info("Starting maintenance service.")
73+
74+
withContext(Dispatchers.IO) {
75+
createJobs()
76+
77+
var uncompletedJobs = getUncompletedJobs()
78+
79+
while (uncompletedJobs.isNotEmpty()) {
80+
// Check which jobs need to be started.
81+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
82+
jobs.filterNot { it.active }.forEach { job ->
83+
logger.info("Checking if maintenance job '${job.name}' needs to be started.")
84+
val jobData = MaintenanceJobDao.find { MaintenanceJobsTable.name eq job.name }.firstOrNull()
85+
86+
if (jobData == null) {
87+
logger.warn("Could not find job '${job.name}' in the database.")
88+
return@forEach
89+
}
90+
91+
if (jobData.status !in MaintenanceJobStatus.uncompletedStates) {
92+
logger.info("Not starting maintenance job '${job.name}' as it is already completed.")
93+
return@forEach
94+
}
95+
96+
// Only start the job if it was not updated in the last five minutes. This heuristic needs
97+
// to be replaced with a more reliable mechanism to find out if the job is already running.
98+
if (jobData.updatedAt?.let { it > Clock.System.now() - 5.minutes } == true) {
99+
logger.info("Not starting maintenance job '${job.name}' as it is already running.")
100+
return@forEach
101+
}
102+
103+
logger.info("Starting maintenance job '${job.name}'.")
104+
105+
val jobDataModel = jobData.mapToModel()
106+
107+
launch {
108+
job.start(this@MaintenanceService, jobDataModel)
109+
}
110+
}
111+
}
112+
113+
delay(updateInterval)
114+
115+
uncompletedJobs = getUncompletedJobs()
116+
}
117+
118+
logger.info("All maintenance jobs have been completed.")
119+
}
120+
}
121+
122+
/**
123+
* Create entries in the database for all jobs that are not yet present.
124+
*/
125+
private fun createJobs() {
126+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
127+
val existingJobs = MaintenanceJobDao.all()
128+
129+
jobs.forEach { job ->
130+
if (existingJobs.none { it.name == job.name }) {
131+
logger.info("Creating maintenance job '${job.name}'.")
132+
133+
MaintenanceJobDao.new {
134+
name = job.name
135+
status = MaintenanceJobStatus.STARTED
136+
startedAt = Clock.System.now()
137+
}
138+
}
139+
}
140+
}
141+
}
142+
143+
/**
144+
* Get all uncompleted jobs from the database.
145+
*/
146+
private suspend fun getUncompletedJobs(): List<MaintenanceJobData> {
147+
return mutex.withLock {
148+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
149+
val jobNames = jobs.map { it.name }
150+
MaintenanceJobDao.find {
151+
MaintenanceJobsTable.name inList jobNames and
152+
(MaintenanceJobsTable.status inList MaintenanceJobStatus.uncompletedStates)
153+
}.map { it.mapToModel() }
154+
}
155+
}
156+
}
157+
158+
/**
159+
* Update the job with the given [id] with the given [data] and/or [status].
160+
*/
161+
internal fun updateJob(id: Long, data: JsonObject? = null, status: MaintenanceJobStatus? = null) {
162+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
163+
val job = MaintenanceJobDao.findById(id)
164+
165+
if (job == null) {
166+
logger.warn("Could not find job with ID $id.")
167+
return@blockingQuery
168+
}
169+
170+
if (job.status.completed) {
171+
logger.warn("Job '${job.name}' with ID $id is already completed.")
172+
return@blockingQuery
173+
}
174+
175+
val verb = when (status) {
176+
MaintenanceJobStatus.FINISHED -> "Finish"
177+
MaintenanceJobStatus.FAILED -> "Fail"
178+
else -> "Update"
179+
}
180+
181+
logger.info("$verb job '${job.name}' with ID $id.")
182+
183+
job.data = data
184+
job.updatedAt = Clock.System.now()
185+
186+
if (status != null) {
187+
job.status = status
188+
if (status.completed) {
189+
job.finishedAt = Clock.System.now()
190+
}
191+
}
192+
}
193+
}
194+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import io.kotest.core.spec.style.WordSpec
23+
import io.kotest.matchers.collections.containExactlyInAnyOrder
24+
import io.kotest.matchers.should
25+
import io.kotest.matchers.shouldBe
26+
27+
import kotlin.time.Duration.Companion.milliseconds
28+
29+
import kotlinx.serialization.Serializable
30+
import kotlinx.serialization.json.Json
31+
import kotlinx.serialization.json.decodeFromJsonElement
32+
import kotlinx.serialization.json.encodeToJsonElement
33+
import kotlinx.serialization.json.jsonObject
34+
35+
import org.eclipse.apoapsis.ortserver.dao.findSingle
36+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobDao
37+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobsTable
38+
import org.eclipse.apoapsis.ortserver.dao.test.DatabaseTestExtension
39+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
40+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
41+
42+
import org.jetbrains.exposed.sql.transactions.transaction
43+
44+
class MaintenanceServiceTest : WordSpec({
45+
val dbExtension = extension(DatabaseTestExtension())
46+
47+
"run" should {
48+
"run the supplied jobs" {
49+
val service = MaintenanceService(dbExtension.db, 100.milliseconds)
50+
51+
(1..5).forEach { index ->
52+
service.addJob(object : MaintenanceJob() {
53+
override val name = "TestJob$index"
54+
override suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData) {
55+
service.updateJob(jobData.id, status = MaintenanceJobStatus.FINISHED)
56+
}
57+
})
58+
}
59+
60+
service.run()
61+
62+
transaction {
63+
val jobData = MaintenanceJobDao.all().map { it.mapToModel() }
64+
jobData.forEach { it.status shouldBe MaintenanceJobStatus.FINISHED }
65+
jobData.map { it.name } should
66+
containExactlyInAnyOrder("TestJob1", "TestJob2", "TestJob3", "TestJob4", "TestJob5")
67+
}
68+
}
69+
}
70+
71+
"updateJob" should {
72+
"update the job data" {
73+
val service = MaintenanceService(dbExtension.db, 100.milliseconds)
74+
val jobId = createJob()
75+
val jobData = JobData(42)
76+
77+
service.updateJob(jobId, data = Json.Default.encodeToJsonElement(jobData).jsonObject)
78+
79+
transaction {
80+
val jobDao = MaintenanceJobDao.findSingle { MaintenanceJobsTable.id eq jobId }
81+
jobDao.data?.let { Json.Default.decodeFromJsonElement<JobData>(it) } shouldBe jobData
82+
}
83+
}
84+
85+
"update the status" {
86+
val service = MaintenanceService(dbExtension.db, 100.milliseconds)
87+
val jobId = createJob()
88+
89+
service.updateJob(jobId, status = MaintenanceJobStatus.FAILED)
90+
91+
transaction {
92+
val jobDao = MaintenanceJobDao.findSingle { MaintenanceJobsTable.id eq jobId }
93+
jobDao.status shouldBe MaintenanceJobStatus.FAILED
94+
}
95+
}
96+
}
97+
})
98+
99+
@Serializable
100+
private data class JobData(val progress: Int)
101+
102+
private fun createJob() = transaction {
103+
MaintenanceJobDao.new {
104+
name = "TestJob"
105+
status = MaintenanceJobStatus.STARTED
106+
}.id.value
107+
}

0 commit comments

Comments
 (0)