Skip to content

Commit 70433b9

Browse files
committed
feat(maintenance-service): Add a job to deduplicate packages
Due to an issue fixed in fb57726, duplicates of packages were stored in the database. The new mechanism to prevent duplicates is very slow if the database contains a lot of packages (>1,000,000), so add a maintenance job to remove the duplicates. Signed-off-by: Martin Nonnenmacher <[email protected]>
1 parent 0b8b174 commit 70433b9

File tree

2 files changed

+786
-0
lines changed

2 files changed

+786
-0
lines changed
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance.jobs
21+
22+
import java.sql.Connection
23+
24+
import kotlinx.serialization.Serializable
25+
import kotlinx.serialization.SerializationException
26+
import kotlinx.serialization.json.Json
27+
import kotlinx.serialization.json.JsonObject
28+
import kotlinx.serialization.json.decodeFromJsonElement
29+
import kotlinx.serialization.json.encodeToJsonElement
30+
import kotlinx.serialization.json.jsonObject
31+
32+
import org.eclipse.apoapsis.ortserver.dao.blockingQuery
33+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.MappedDeclaredLicensesTable
34+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesAnalyzerRunsTable
35+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesAuthorsTable
36+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesDeclaredLicensesTable
37+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesTable
38+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesMappedDeclaredLicensesTable
39+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesTable
40+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable
41+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.UnmappedDeclaredLicensesTable
42+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
43+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
44+
import org.eclipse.apoapsis.ortserver.model.runs.ProcessedDeclaredLicense
45+
import org.eclipse.apoapsis.ortserver.services.maintenance.MaintenanceJob
46+
import org.eclipse.apoapsis.ortserver.services.maintenance.MaintenanceService
47+
48+
import org.jetbrains.exposed.sql.Database
49+
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
50+
import org.jetbrains.exposed.sql.and
51+
import org.jetbrains.exposed.sql.deleteWhere
52+
import org.jetbrains.exposed.sql.selectAll
53+
import org.jetbrains.exposed.sql.update
54+
55+
import org.slf4j.LoggerFactory
56+
57+
private val json = Json.Default
58+
private val logger = LoggerFactory.getLogger(DeduplicatePackagesJob::class.java)
59+
60+
/**
61+
* Progress data for the [DeduplicatePackagesJob].
62+
*/
63+
@Serializable
64+
private data class DeduplicatePackagesJobData(
65+
/** The last processed package ID. */
66+
val lastPackageId: Long,
67+
68+
/** The number of unique deduplicated packages. */
69+
val deduplicatedPackages: Long,
70+
71+
/** The number of removed duplicate packages. */
72+
val removedDuplicates: Long
73+
)
74+
75+
class DeduplicatePackagesJob(private val db: Database) : MaintenanceJob() {
76+
override val name = "DeduplicatePackages"
77+
78+
private var curIndex: Long = -1L
79+
private lateinit var curJobData: DeduplicatePackagesJobData
80+
81+
override suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData) {
82+
curJobData = jobData.data?.let { deserializeJobData(it) } ?: DeduplicatePackagesJobData(-1L, 0, 0)
83+
curIndex = curJobData.lastPackageId
84+
85+
while (nextPackageId() != null) {
86+
deduplicatePackage(service, jobData.id)
87+
}
88+
}
89+
90+
private fun nextPackageId() = db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
91+
PackagesTable.select(PackagesTable.id)
92+
.where { PackagesTable.id greater curIndex }
93+
.orderBy(PackagesTable.id)
94+
.limit(1)
95+
.singleOrNull()
96+
?.let {
97+
curIndex = it[PackagesTable.id].value
98+
curIndex
99+
}
100+
}
101+
102+
/**
103+
* Find all duplicates of the package with the ID [curIndex], update references to point to this package instead,
104+
* and delete the duplicates.
105+
*/
106+
private fun deduplicatePackage(service: MaintenanceService, jobId: Long) {
107+
db.blockingQuery {
108+
logger.info("Deduplicating package with ID $curIndex.")
109+
110+
val equalPackages = findEqualPackages(curIndex)
111+
112+
logger.info(
113+
"Found ${equalPackages.count()} equal packages for package with ID $curIndex: $equalPackages"
114+
)
115+
116+
equalPackages.forEach {
117+
updateReferences(curIndex, it)
118+
deletePackage(it)
119+
}
120+
121+
curJobData = DeduplicatePackagesJobData(
122+
lastPackageId = curIndex,
123+
deduplicatedPackages = curJobData.deduplicatedPackages + 1,
124+
removedDuplicates = curJobData.removedDuplicates + equalPackages.count()
125+
)
126+
127+
logger.info(
128+
"Finished deduplicating package with ID $curIndex. Deduplicated ${curJobData.deduplicatedPackages} " +
129+
"unique packages and removed ${curJobData.removedDuplicates} duplicates so far."
130+
)
131+
}
132+
133+
val remainingPackages = countRemainingPackages()
134+
135+
if (remainingPackages > 0L) {
136+
service.updateJob(jobId, serializeJobData(curJobData))
137+
logger.info("Remaining packages: $remainingPackages")
138+
} else {
139+
service.updateJob(jobId, serializeJobData(curJobData), MaintenanceJobStatus.FINISHED)
140+
logger.info(
141+
"Package deduplication finished. Deduplicated ${curJobData.deduplicatedPackages} unique packages and " +
142+
"removed ${curJobData.removedDuplicates} duplicates."
143+
)
144+
}
145+
}
146+
147+
/**
148+
* Find all packages that are equal to the package with the provided [pkgId]. Equal means that not only the values
149+
* of the columns are equal, but also the references to other tables.
150+
*/
151+
private fun findEqualPackages(pkgId: Long): List<Long> {
152+
val pkg = PackagesTable.selectAll().where { PackagesTable.id eq pkgId }.single()
153+
154+
val authors = PackagesAuthorsTable.getForPackage(pkgId)
155+
val declaredLicenses = PackagesDeclaredLicensesTable.getForPackage(pkgId)
156+
val processedDeclaredLicenses = ProcessedDeclaredLicensesTable.getForPackage(pkgId)
157+
158+
return PackagesTable.selectAll().where {
159+
PackagesTable.id neq pkgId and
160+
(PackagesTable.identifierId eq pkg[PackagesTable.identifierId]) and
161+
(PackagesTable.vcsId eq pkg[PackagesTable.vcsId]) and
162+
(PackagesTable.vcsProcessedId eq pkg[PackagesTable.vcsProcessedId]) and
163+
(PackagesTable.binaryArtifactId eq pkg[PackagesTable.binaryArtifactId]) and
164+
(PackagesTable.sourceArtifactId eq pkg[PackagesTable.sourceArtifactId]) and
165+
(PackagesTable.purl eq pkg[PackagesTable.purl]) and
166+
(PackagesTable.cpe eq pkg[PackagesTable.cpe]) and
167+
(PackagesTable.description eq pkg[PackagesTable.description]) and
168+
(PackagesTable.homepageUrl eq pkg[PackagesTable.homepageUrl]) and
169+
(PackagesTable.isMetadataOnly eq pkg[PackagesTable.isMetadataOnly]) and
170+
(PackagesTable.isModified eq pkg[PackagesTable.isModified])
171+
}.map { it[PackagesTable.id].value }
172+
.filter { PackagesAuthorsTable.getForPackage(it) == authors }
173+
.filter { PackagesDeclaredLicensesTable.getForPackage(it) == declaredLicenses }
174+
.filter { ProcessedDeclaredLicensesTable.getForPackage(it) == processedDeclaredLicenses }
175+
}
176+
177+
/**
178+
* Update all references to the [duplicatePkgId] to point to the [pkgId] instead.
179+
*/
180+
private fun updateReferences(pkgId: Long, duplicatePkgId: Long) {
181+
PackagesAnalyzerRunsTable.update({ PackagesAnalyzerRunsTable.packageId eq duplicatePkgId }) {
182+
it[packageId] = pkgId
183+
}
184+
}
185+
186+
/**
187+
* Delete the package with the provided [pkgId].
188+
*/
189+
private fun deletePackage(pkgId: Long) {
190+
logger.info("Deleting entries from packages_authors for package with ID $pkgId.")
191+
PackagesAuthorsTable.deleteWhere { packageId eq pkgId }
192+
logger.info("Deleting entries from packages_declared_licenses for package with ID $pkgId.")
193+
PackagesDeclaredLicensesTable.deleteWhere { packageId eq pkgId }
194+
195+
logger.info("Deleting processed declared licenses for package with ID $pkgId.")
196+
ProcessedDeclaredLicensesTable.select(ProcessedDeclaredLicensesTable.id)
197+
.where { ProcessedDeclaredLicensesTable.packageId eq pkgId }
198+
.forEach { processedDeclaredLicense ->
199+
val id = processedDeclaredLicense[ProcessedDeclaredLicensesTable.id].value
200+
201+
ProcessedDeclaredLicensesMappedDeclaredLicensesTable.deleteWhere {
202+
processedDeclaredLicenseId eq id
203+
}
204+
205+
ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.deleteWhere {
206+
processedDeclaredLicenseId eq id
207+
}
208+
}
209+
210+
logger.info("Deleting declared licenses for package with ID $pkgId.")
211+
ProcessedDeclaredLicensesTable.deleteWhere { packageId eq pkgId }
212+
213+
logger.info("Deleting package with ID $pkgId.")
214+
PackagesTable.deleteWhere { id eq pkgId }
215+
}
216+
217+
/**
218+
* Count the number of remaining packages to process.
219+
*/
220+
private fun countRemainingPackages() =
221+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
222+
PackagesTable.selectAll().where { PackagesTable.id greater curIndex }.count()
223+
}
224+
}
225+
226+
private fun deserializeJobData(data: JsonObject) =
227+
try {
228+
json.decodeFromJsonElement<DeduplicatePackagesJobData>(data)
229+
} catch (e: SerializationException) {
230+
logger.error("Could not deserialize job data, starting from the beginning.", e)
231+
null
232+
}
233+
234+
private fun serializeJobData(data: DeduplicatePackagesJobData) = json.encodeToJsonElement(data).jsonObject
235+
236+
private fun PackagesAuthorsTable.getForPackage(pkgId: Long): Set<Long> =
237+
select(authorId)
238+
.where { packageId eq pkgId }
239+
.orderBy(packageId)
240+
.mapTo(mutableSetOf()) { it[authorId].value }
241+
242+
private fun PackagesDeclaredLicensesTable.getForPackage(pkgId: Long): Set<Long> =
243+
select(declaredLicenseId)
244+
.where { packageId eq pkgId }
245+
.orderBy(packageId)
246+
.mapTo(mutableSetOf()) { it[declaredLicenseId].value }
247+
248+
private fun ProcessedDeclaredLicensesTable.getForPackage(pkgId: Long): Set<ProcessedDeclaredLicense> =
249+
selectAll()
250+
.where { packageId eq pkgId }
251+
.orderBy(packageId)
252+
.mapTo(mutableSetOf()) { processedDeclaredLicense ->
253+
val id = processedDeclaredLicense[id].value
254+
val spdxExpression = processedDeclaredLicense[spdxExpression]
255+
256+
val mappedLicenses =
257+
ProcessedDeclaredLicensesMappedDeclaredLicensesTable.getForProcessedDeclaredLicense(id)
258+
259+
val unmappedLicenses =
260+
ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.getForProcessedDeclaredLicense(id)
261+
262+
ProcessedDeclaredLicense(spdxExpression, mappedLicenses, unmappedLicenses)
263+
}
264+
265+
private fun ProcessedDeclaredLicensesMappedDeclaredLicensesTable.getForProcessedDeclaredLicense(
266+
processedDeclaredLicense: Long
267+
) = (this innerJoin MappedDeclaredLicensesTable)
268+
.select(MappedDeclaredLicensesTable.declaredLicense, MappedDeclaredLicensesTable.mappedLicense)
269+
.where { processedDeclaredLicenseId eq processedDeclaredLicense }
270+
.associate {
271+
it[MappedDeclaredLicensesTable.declaredLicense] to it[MappedDeclaredLicensesTable.mappedLicense]
272+
}
273+
274+
private fun ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.getForProcessedDeclaredLicense(
275+
processedDeclaredLicense: Long
276+
) = (this innerJoin UnmappedDeclaredLicensesTable)
277+
.select(UnmappedDeclaredLicensesTable.unmappedLicense)
278+
.where { processedDeclaredLicenseId eq processedDeclaredLicense }
279+
.mapTo(mutableSetOf()) { it[UnmappedDeclaredLicensesTable.unmappedLicense] }

0 commit comments

Comments
 (0)