Skip to content

Commit f23f315

Browse files
committed
feat: Add a service to run maintenance jobs
Add an initial implementation of the `MaintenanceService` to run maintenance jobs. This is required to run the package deduplication which will be added in the next commit. This first version is sufficient for the package deduplication but it has several shortcoming that will have to be addressed later: * The service runs on core, on the long term maintenance jobs should run on a separate node. * It only supports jobs which run exactly once, support for running jobs multiple times or on a schedule is missing. * The mechanism to prevent that a job is executed in parallel is based on a heuristic and needs to be replaced with a more sophisticated approach. Signed-off-by: Martin Nonnenmacher <[email protected]>
1 parent 64a0519 commit f23f315

File tree

5 files changed

+402
-0
lines changed

5 files changed

+402
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
plugins {
21+
// Apply precompiled plugins.
22+
id("ort-server-kotlin-jvm-conventions")
23+
id("ort-server-publication-conventions")
24+
25+
// Apply third-party plugins.
26+
alias(libs.plugins.kotlinSerialization)
27+
}
28+
29+
group = "org.eclipse.apoapsis.ortserver.services"
30+
31+
dependencies {
32+
api(projects.model)
33+
34+
implementation(projects.dao)
35+
implementation(projects.utils.logging)
36+
37+
implementation(libs.kotlinxSerializationJson)
38+
39+
runtimeOnly(libs.logback)
40+
41+
testImplementation(testFixtures(projects.dao))
42+
testImplementation(projects.utils.test)
43+
44+
testImplementation(libs.kotestAssertionsCore)
45+
testImplementation(libs.kotestRunnerJunit5)
46+
testImplementation(libs.mockk)
47+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
23+
import org.eclipse.apoapsis.ortserver.utils.logging.withMdcContext
24+
import java.util.concurrent.atomic.AtomicBoolean
25+
26+
abstract class MaintenanceJob {
27+
abstract val name: String
28+
29+
private var _active = AtomicBoolean(false)
30+
31+
val active get() = _active.get()
32+
33+
suspend fun start(service: MaintenanceService, jobData: MaintenanceJobData) {
34+
_active.set(true)
35+
36+
try {
37+
withMdcContext("maintenanceJob" to name) {
38+
execute(service, jobData)
39+
}
40+
} finally {
41+
_active.set(false)
42+
}
43+
}
44+
45+
abstract suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData)
46+
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance
21+
22+
import java.sql.Connection
23+
24+
import kotlin.time.Duration
25+
import kotlin.time.Duration.Companion.minutes
26+
27+
import kotlinx.coroutines.Dispatchers
28+
import kotlinx.coroutines.delay
29+
import kotlinx.coroutines.launch
30+
import kotlinx.coroutines.sync.Mutex
31+
import kotlinx.coroutines.sync.withLock
32+
import kotlinx.coroutines.withContext
33+
import kotlinx.datetime.Clock
34+
import kotlinx.serialization.json.JsonObject
35+
36+
import org.eclipse.apoapsis.ortserver.dao.blockingQuery
37+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobDao
38+
import org.eclipse.apoapsis.ortserver.dao.tables.MaintenanceJobsTable
39+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
40+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
41+
42+
import org.jetbrains.exposed.sql.Database
43+
import org.jetbrains.exposed.sql.and
44+
45+
import org.slf4j.LoggerFactory
46+
47+
private val logger = LoggerFactory.getLogger(MaintenanceService::class.java)
48+
49+
/**
50+
* A service to run maintenance jobs. The current implementation has the following limitations:
51+
*
52+
* - The service runs on core, on the long-term maintenance jobs should run on a separate node.
53+
* - It only supports jobs which run exactly once, support for running jobs multiple times or on a schedule is missing.
54+
* - The mechanism to prevent that a job is executed in parallel is based on a heuristic and needs to be
55+
* replaced with a more sophisticated approach.
56+
* - No proper error handling is implemented yet. If a job fails, it is not retried and if a job throws and exception,
57+
* the whole job execution stops.
58+
*/
59+
class MaintenanceService(private val db: Database, private val updateInterval: Duration = 5.minutes) {
60+
private val jobs: MutableList<MaintenanceJob> = mutableListOf()
61+
private val mutex = Mutex()
62+
63+
/**
64+
* Add a [job] to the list of jobs to run. This function should be called before [run].
65+
*/
66+
suspend fun addJob(job: MaintenanceJob) {
67+
mutex.withLock { jobs += job }
68+
}
69+
70+
/**
71+
* Run the previously added jobs. This function is blocking and should be called from a coroutine.
72+
*/
73+
suspend fun run() {
74+
logger.info("Starting maintenance service.")
75+
76+
withContext(Dispatchers.IO) {
77+
createJobs()
78+
79+
var uncompletedJobs = getUncompletedJobs()
80+
81+
while (uncompletedJobs.isNotEmpty()) {
82+
// Check which jobs need to be started.
83+
val activeJobs = mutex.withLock { jobs.filterNot { it.active } }
84+
85+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
86+
activeJobs.forEach { job ->
87+
logger.info("Checking if maintenance job '${job.name}' needs to be started.")
88+
val jobData = MaintenanceJobDao.find { MaintenanceJobsTable.name eq job.name }.firstOrNull()
89+
90+
if (jobData == null) {
91+
logger.warn("Could not find job '${job.name}' in the database.")
92+
return@forEach
93+
}
94+
95+
if (jobData.status !in MaintenanceJobStatus.uncompletedStates) {
96+
logger.info("Not starting maintenance job '${job.name}' as it is already completed.")
97+
return@forEach
98+
}
99+
100+
// Only start the job if it was not updated in the last five minutes. This heuristic needs
101+
// to be replaced with a more reliable mechanism to find out if the job is already running.
102+
if (jobData.updatedAt?.let { it > Clock.System.now() - 5.minutes } == true) {
103+
logger.info("Not starting maintenance job '${job.name}' as it is already running.")
104+
return@forEach
105+
}
106+
107+
logger.info("Starting maintenance job '${job.name}'.")
108+
109+
val jobDataModel = jobData.mapToModel()
110+
111+
launch {
112+
job.start(this@MaintenanceService, jobDataModel)
113+
}
114+
}
115+
}
116+
117+
delay(updateInterval)
118+
119+
uncompletedJobs = getUncompletedJobs()
120+
}
121+
122+
logger.info("All maintenance jobs have been completed.")
123+
}
124+
}
125+
126+
/**
127+
* Create entries in the database for all jobs that are not yet present.
128+
*/
129+
private suspend fun createJobs() {
130+
mutex.withLock {
131+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
132+
val existingJobs = MaintenanceJobDao.all()
133+
134+
jobs.forEach { job ->
135+
if (existingJobs.none { it.name == job.name }) {
136+
logger.info("Creating maintenance job '${job.name}'.")
137+
138+
MaintenanceJobDao.new {
139+
name = job.name
140+
status = MaintenanceJobStatus.STARTED
141+
startedAt = Clock.System.now()
142+
}
143+
}
144+
}
145+
}
146+
}
147+
}
148+
149+
/**
150+
* Get all uncompleted jobs from the database.
151+
*/
152+
private suspend fun getUncompletedJobs(): List<MaintenanceJobData> {
153+
return mutex.withLock {
154+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
155+
val jobNames = jobs.map { it.name }
156+
MaintenanceJobDao.find {
157+
MaintenanceJobsTable.name inList jobNames and
158+
(MaintenanceJobsTable.status inList MaintenanceJobStatus.uncompletedStates)
159+
}.map { it.mapToModel() }
160+
}
161+
}
162+
}
163+
164+
/**
165+
* Update the job with the given [id] with the given [data] and/or [status].
166+
*/
167+
internal fun updateJob(id: Long, data: JsonObject? = null, status: MaintenanceJobStatus? = null) {
168+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
169+
val job = MaintenanceJobDao.findById(id)
170+
171+
if (job == null) {
172+
logger.warn("Could not find job with ID $id.")
173+
return@blockingQuery
174+
}
175+
176+
if (job.status.completed) {
177+
logger.warn("Job '${job.name}' with ID $id is already completed.")
178+
return@blockingQuery
179+
}
180+
181+
val verb = when (status) {
182+
MaintenanceJobStatus.FINISHED -> "Finish"
183+
MaintenanceJobStatus.FAILED -> "Fail"
184+
else -> "Update"
185+
}
186+
187+
logger.info("$verb job '${job.name}' with ID $id.")
188+
189+
job.data = data
190+
job.updatedAt = Clock.System.now()
191+
192+
if (status != null) {
193+
job.status = status
194+
if (status.completed) {
195+
job.finishedAt = Clock.System.now()
196+
}
197+
}
198+
}
199+
}
200+
}

0 commit comments

Comments
 (0)