Skip to content

Commit 02f1ab8

Browse files
committed
feat(maintenance-service): Add a job to deduplicate packages
Due to an issue fixed in fb57726, duplicates of packages were stored in the database. The new mechanism to prevent duplicates is very slow if the database contains a lot of packages (>1,000,000), so add a maintenance job to remove the duplicates. Signed-off-by: Martin Nonnenmacher <[email protected]>
1 parent f909b81 commit 02f1ab8

File tree

2 files changed

+794
-0
lines changed

2 files changed

+794
-0
lines changed
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
package org.eclipse.apoapsis.ortserver.services.maintenance.jobs
21+
22+
import java.sql.Connection
23+
24+
import kotlinx.serialization.Serializable
25+
import kotlinx.serialization.SerializationException
26+
import kotlinx.serialization.json.Json
27+
import kotlinx.serialization.json.JsonObject
28+
import kotlinx.serialization.json.decodeFromJsonElement
29+
import kotlinx.serialization.json.encodeToJsonElement
30+
import kotlinx.serialization.json.jsonObject
31+
32+
import org.eclipse.apoapsis.ortserver.dao.blockingQuery
33+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.MappedDeclaredLicensesTable
34+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesAnalyzerRunsTable
35+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesAuthorsTable
36+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesDeclaredLicensesTable
37+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.PackagesTable
38+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesMappedDeclaredLicensesTable
39+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesTable
40+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable
41+
import org.eclipse.apoapsis.ortserver.dao.tables.runs.analyzer.UnmappedDeclaredLicensesTable
42+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobData
43+
import org.eclipse.apoapsis.ortserver.model.MaintenanceJobStatus
44+
import org.eclipse.apoapsis.ortserver.model.runs.ProcessedDeclaredLicense
45+
import org.eclipse.apoapsis.ortserver.services.maintenance.MaintenanceJob
46+
import org.eclipse.apoapsis.ortserver.services.maintenance.MaintenanceService
47+
48+
import org.jetbrains.exposed.sql.Database
49+
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
50+
import org.jetbrains.exposed.sql.and
51+
import org.jetbrains.exposed.sql.deleteWhere
52+
import org.jetbrains.exposed.sql.selectAll
53+
import org.jetbrains.exposed.sql.update
54+
55+
import org.slf4j.LoggerFactory
56+
57+
private val json = Json.Default
58+
private val logger = LoggerFactory.getLogger(DeduplicatePackagesJob::class.java)
59+
60+
/**
61+
* Progress data for the [DeduplicatePackagesJob].
62+
*/
63+
@Serializable
64+
private data class DeduplicatePackagesJobData(
65+
/** The last processed package ID. */
66+
val lastPackageId: Long,
67+
68+
/** The number of unique deduplicated packages. */
69+
val deduplicatedPackages: Long,
70+
71+
/** The number of removed duplicate packages. */
72+
val removedDuplicates: Long
73+
)
74+
75+
/**
76+
* A maintenance job to deduplicate packages. The algorithm works as follows:
77+
*
78+
* 1. Find all packages that are equal to the package with the provided ID.
79+
* 2. Update all references to the duplicate packages to point to the original package.
80+
* 3. Delete the duplicate packages.
81+
* 4. Repeat until all packages are processed.
82+
*/
83+
class DeduplicatePackagesJob(private val db: Database) : MaintenanceJob() {
84+
override val name = "DeduplicatePackages"
85+
86+
private var curIndex: Long = -1L
87+
private lateinit var curJobData: DeduplicatePackagesJobData
88+
89+
override suspend fun execute(service: MaintenanceService, jobData: MaintenanceJobData) {
90+
curJobData = jobData.data?.let { deserializeJobData(it) } ?: DeduplicatePackagesJobData(-1L, 0, 0)
91+
curIndex = curJobData.lastPackageId
92+
93+
while (nextPackageId() != null) {
94+
deduplicatePackage(service, jobData.id)
95+
}
96+
}
97+
98+
private fun nextPackageId() = db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
99+
PackagesTable.select(PackagesTable.id)
100+
.where { PackagesTable.id greater curIndex }
101+
.orderBy(PackagesTable.id)
102+
.limit(1)
103+
.singleOrNull()
104+
?.let {
105+
curIndex = it[PackagesTable.id].value
106+
curIndex
107+
}
108+
}
109+
110+
/**
111+
* Find all duplicates of the package with the ID [curIndex], update references to point to this package instead,
112+
* and delete the duplicates.
113+
*/
114+
private fun deduplicatePackage(service: MaintenanceService, jobId: Long) {
115+
db.blockingQuery {
116+
logger.info("Deduplicating package with ID $curIndex.")
117+
118+
val equalPackages = findEqualPackages(curIndex)
119+
120+
logger.info(
121+
"Found ${equalPackages.count()} equal packages for package with ID $curIndex: $equalPackages"
122+
)
123+
124+
equalPackages.forEach {
125+
updateReferences(curIndex, it)
126+
deletePackage(it)
127+
}
128+
129+
curJobData = DeduplicatePackagesJobData(
130+
lastPackageId = curIndex,
131+
deduplicatedPackages = curJobData.deduplicatedPackages + 1,
132+
removedDuplicates = curJobData.removedDuplicates + equalPackages.count()
133+
)
134+
135+
logger.info(
136+
"Finished deduplicating package with ID $curIndex. Deduplicated ${curJobData.deduplicatedPackages} " +
137+
"unique packages and removed ${curJobData.removedDuplicates} duplicates so far."
138+
)
139+
}
140+
141+
val remainingPackages = countRemainingPackages()
142+
143+
if (remainingPackages > 0L) {
144+
service.updateJob(jobId, serializeJobData(curJobData))
145+
logger.info("Remaining packages: $remainingPackages")
146+
} else {
147+
service.updateJob(jobId, serializeJobData(curJobData), MaintenanceJobStatus.FINISHED)
148+
logger.info(
149+
"Package deduplication finished. Deduplicated ${curJobData.deduplicatedPackages} unique packages and " +
150+
"removed ${curJobData.removedDuplicates} duplicates."
151+
)
152+
}
153+
}
154+
155+
/**
156+
* Find all packages that are equal to the package with the provided [pkgId]. Equal means that not only the values
157+
* of the columns are equal, but also the references to other tables.
158+
*/
159+
private fun findEqualPackages(pkgId: Long): List<Long> {
160+
val pkg = PackagesTable.selectAll().where { PackagesTable.id eq pkgId }.single()
161+
162+
val authors = PackagesAuthorsTable.getForPackage(pkgId)
163+
val declaredLicenses = PackagesDeclaredLicensesTable.getForPackage(pkgId)
164+
val processedDeclaredLicenses = ProcessedDeclaredLicensesTable.getForPackage(pkgId)
165+
166+
return PackagesTable.selectAll().where {
167+
PackagesTable.id neq pkgId and
168+
(PackagesTable.identifierId eq pkg[PackagesTable.identifierId]) and
169+
(PackagesTable.vcsId eq pkg[PackagesTable.vcsId]) and
170+
(PackagesTable.vcsProcessedId eq pkg[PackagesTable.vcsProcessedId]) and
171+
(PackagesTable.binaryArtifactId eq pkg[PackagesTable.binaryArtifactId]) and
172+
(PackagesTable.sourceArtifactId eq pkg[PackagesTable.sourceArtifactId]) and
173+
(PackagesTable.purl eq pkg[PackagesTable.purl]) and
174+
(PackagesTable.cpe eq pkg[PackagesTable.cpe]) and
175+
(PackagesTable.description eq pkg[PackagesTable.description]) and
176+
(PackagesTable.homepageUrl eq pkg[PackagesTable.homepageUrl]) and
177+
(PackagesTable.isMetadataOnly eq pkg[PackagesTable.isMetadataOnly]) and
178+
(PackagesTable.isModified eq pkg[PackagesTable.isModified])
179+
}.map { it[PackagesTable.id].value }
180+
.filter { PackagesAuthorsTable.getForPackage(it) == authors }
181+
.filter { PackagesDeclaredLicensesTable.getForPackage(it) == declaredLicenses }
182+
.filter { ProcessedDeclaredLicensesTable.getForPackage(it) == processedDeclaredLicenses }
183+
}
184+
185+
/**
186+
* Update all references to the [duplicatePkgId] to point to the [pkgId] instead.
187+
*/
188+
private fun updateReferences(pkgId: Long, duplicatePkgId: Long) {
189+
PackagesAnalyzerRunsTable.update({ PackagesAnalyzerRunsTable.packageId eq duplicatePkgId }) {
190+
it[packageId] = pkgId
191+
}
192+
}
193+
194+
/**
195+
* Delete the package with the provided [pkgId].
196+
*/
197+
private fun deletePackage(pkgId: Long) {
198+
logger.info("Deleting entries from packages_authors for package with ID $pkgId.")
199+
PackagesAuthorsTable.deleteWhere { packageId eq pkgId }
200+
logger.info("Deleting entries from packages_declared_licenses for package with ID $pkgId.")
201+
PackagesDeclaredLicensesTable.deleteWhere { packageId eq pkgId }
202+
203+
logger.info("Deleting processed declared licenses for package with ID $pkgId.")
204+
ProcessedDeclaredLicensesTable.select(ProcessedDeclaredLicensesTable.id)
205+
.where { ProcessedDeclaredLicensesTable.packageId eq pkgId }
206+
.forEach { processedDeclaredLicense ->
207+
val id = processedDeclaredLicense[ProcessedDeclaredLicensesTable.id].value
208+
209+
ProcessedDeclaredLicensesMappedDeclaredLicensesTable.deleteWhere {
210+
processedDeclaredLicenseId eq id
211+
}
212+
213+
ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.deleteWhere {
214+
processedDeclaredLicenseId eq id
215+
}
216+
}
217+
218+
logger.info("Deleting declared licenses for package with ID $pkgId.")
219+
ProcessedDeclaredLicensesTable.deleteWhere { packageId eq pkgId }
220+
221+
logger.info("Deleting package with ID $pkgId.")
222+
PackagesTable.deleteWhere { id eq pkgId }
223+
}
224+
225+
/**
226+
* Count the number of remaining packages to process.
227+
*/
228+
private fun countRemainingPackages() =
229+
db.blockingQuery(transactionIsolation = Connection.TRANSACTION_SERIALIZABLE) {
230+
PackagesTable.selectAll().where { PackagesTable.id greater curIndex }.count()
231+
}
232+
}
233+
234+
private fun deserializeJobData(data: JsonObject) =
235+
try {
236+
json.decodeFromJsonElement<DeduplicatePackagesJobData>(data)
237+
} catch (e: SerializationException) {
238+
logger.error("Could not deserialize job data, starting from the beginning.", e)
239+
null
240+
}
241+
242+
private fun serializeJobData(data: DeduplicatePackagesJobData) = json.encodeToJsonElement(data).jsonObject
243+
244+
private fun PackagesAuthorsTable.getForPackage(pkgId: Long): Set<Long> =
245+
select(authorId)
246+
.where { packageId eq pkgId }
247+
.orderBy(packageId)
248+
.mapTo(mutableSetOf()) { it[authorId].value }
249+
250+
private fun PackagesDeclaredLicensesTable.getForPackage(pkgId: Long): Set<Long> =
251+
select(declaredLicenseId)
252+
.where { packageId eq pkgId }
253+
.orderBy(packageId)
254+
.mapTo(mutableSetOf()) { it[declaredLicenseId].value }
255+
256+
private fun ProcessedDeclaredLicensesTable.getForPackage(pkgId: Long): Set<ProcessedDeclaredLicense> =
257+
selectAll()
258+
.where { packageId eq pkgId }
259+
.orderBy(packageId)
260+
.mapTo(mutableSetOf()) { processedDeclaredLicense ->
261+
val id = processedDeclaredLicense[id].value
262+
val spdxExpression = processedDeclaredLicense[spdxExpression]
263+
264+
val mappedLicenses =
265+
ProcessedDeclaredLicensesMappedDeclaredLicensesTable.getForProcessedDeclaredLicense(id)
266+
267+
val unmappedLicenses =
268+
ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.getForProcessedDeclaredLicense(id)
269+
270+
ProcessedDeclaredLicense(spdxExpression, mappedLicenses, unmappedLicenses)
271+
}
272+
273+
private fun ProcessedDeclaredLicensesMappedDeclaredLicensesTable.getForProcessedDeclaredLicense(
274+
processedDeclaredLicense: Long
275+
) = (this innerJoin MappedDeclaredLicensesTable)
276+
.select(MappedDeclaredLicensesTable.declaredLicense, MappedDeclaredLicensesTable.mappedLicense)
277+
.where { processedDeclaredLicenseId eq processedDeclaredLicense }
278+
.associate {
279+
it[MappedDeclaredLicensesTable.declaredLicense] to it[MappedDeclaredLicensesTable.mappedLicense]
280+
}
281+
282+
private fun ProcessedDeclaredLicensesUnmappedDeclaredLicensesTable.getForProcessedDeclaredLicense(
283+
processedDeclaredLicense: Long
284+
) = (this innerJoin UnmappedDeclaredLicensesTable)
285+
.select(UnmappedDeclaredLicensesTable.unmappedLicense)
286+
.where { processedDeclaredLicenseId eq processedDeclaredLicense }
287+
.mapTo(mutableSetOf()) { it[UnmappedDeclaredLicensesTable.unmappedLicense] }

0 commit comments

Comments
 (0)