Skip to content

Commit 1392929

Browse files
authored
PIR: support remove defunct brokers (#6568)
Task/Issue URL: https://app.asana.com/1/137249556945/project/488551667048375/task/1211032016934484?focus=true ### Description See attached task description ### Steps to test this PR https://app.asana.com/1/137249556945/task/1211035661205077?focus=true
1 parent 2330570 commit 1392929

File tree

16 files changed

+1248
-133
lines changed

16 files changed

+1248
-133
lines changed

pir/pir-impl/schemas/com.duckduckgo.pir.impl.store.PirDatabase/7.json

Lines changed: 748 additions & 0 deletions
Large diffs are not rendered by default.

pir/pir-impl/src/main/java/com/duckduckgo/pir/impl/brokers/BrokerJsonUpdater.kt

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,26 +92,23 @@ class RealBrokerJsonUpdater @Inject constructor(
9292
}
9393

9494
private suspend fun checkUpdatesFromMainConfig(pirMainConfig: PirMainConfig) {
95-
val currentBrokerJson = pirRepository.getAllLocalBrokerJsons()
96-
val newActiveBrokers = pirMainConfig.activeBrokers
97-
val newBrokerJson = pirMainConfig.jsonEtags.current.map {
95+
val existingEtags = pirRepository.getAllLocalBrokerJsons()
96+
val jsonEtagsFromConfig = pirMainConfig.jsonEtags.current.map {
9897
BrokerJson(
9998
fileName = it.key,
10099
etag = it.value,
101100
)
102-
}.associateWith { newActiveBrokers.contains(it.fileName) }
101+
}
103102

104-
val shouldDownloadJson = currentBrokerJson.keys != newBrokerJson.keys // Broker json changed
105-
val shouldUpdateLocalEtags = newBrokerJson.entries != currentBrokerJson.entries // broker state / active state changed
103+
val updatedJsons = jsonEtagsFromConfig.toSet() - existingEtags.toSet()
106104

107-
if (shouldDownloadJson) {
108-
val diff = newBrokerJson.keys - currentBrokerJson.keys
109-
brokerDataDownloader.downloadBrokerData(diff.map { it.fileName })
110-
}
105+
pirRepository.updateBrokerJsons(jsonEtagsFromConfig)
111106

112-
// We update stored json etags once the actual json data have been stored.
113-
if (shouldUpdateLocalEtags) {
114-
pirRepository.updateBrokerJsons(newBrokerJson)
107+
if (updatedJsons.isNotEmpty()) {
108+
logcat { "PIR-update: Downloading updated broker json files: $updatedJsons" }
109+
brokerDataDownloader.downloadBrokerData(updatedJsons.map { it.fileName })
110+
} else {
111+
logcat { "PIR-update: No broker json files to update." }
115112
}
116113
}
117114
}

pir/pir-impl/src/main/java/com/duckduckgo/pir/impl/scheduling/PirJobsRunner.kt

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ class RealPirJobsRunner @Inject constructor(
7575
val startTimeInMillis = currentTimeProvider.currentTimeMillis()
7676
emitStartPixel(executionType)
7777

78-
// This should be all brokers since all brokers have scan steps and inactive brokers are removed
79-
val activeBrokers = pirRepository.getAllBrokersForScan().toHashSet()
78+
val activeBrokers = pirRepository.getAllActiveBrokers().toHashSet()
8079

8180
// Multiple profile support
8281
val profileQueries = obtainProfiles()
@@ -88,6 +87,7 @@ class RealPirJobsRunner @Inject constructor(
8887
emitCompletedPixel(executionType, startTimeInMillis)
8988
return@withContext Result.success(Unit)
9089
}
90+
9191
if (activeBrokers.isEmpty()) {
9292
logcat { "PIR-JOB-RUNNER: No active brokers available. Completing run." }
9393
pixelSender.reportScanStats(0)
@@ -98,8 +98,19 @@ class RealPirJobsRunner @Inject constructor(
9898

9999
attemptCreateScanJobs(activeBrokers, profileQueries)
100100
executeScanJobs(context, executionType, activeBrokers)
101-
attemptCreateOptOutJobs(activeBrokers)
102-
executeOptOutJobs(context)
101+
102+
val formOptOutBrokers = pirRepository.getBrokersForOptOut(true).toSet()
103+
val activeFormOptOutBrokers = formOptOutBrokers.intersect(activeBrokers)
104+
105+
if (activeFormOptOutBrokers.isEmpty()) {
106+
logcat { "PIR-JOB-RUNNER: No active parent brokers available for optout. Completing run." }
107+
pixelSender.reportOptOutStats(0)
108+
emitCompletedPixel(executionType, startTimeInMillis)
109+
return@withContext Result.success(Unit)
110+
}
111+
112+
attemptCreateOptOutJobs(activeFormOptOutBrokers)
113+
executeOptOutJobs(context, activeFormOptOutBrokers)
103114

104115
logcat { "PIR-JOB-RUNNER: Completed." }
105116
emitCompletedPixel(executionType, startTimeInMillis)
@@ -137,7 +148,7 @@ class RealPirJobsRunner @Inject constructor(
137148
profileQueries: List<ProfileQuery>,
138149
) {
139150
logcat { "PIR-JOB-RUNNER: Attempting to create new scan jobs" }
140-
// No scan jobs mean that this is the first time PR is being run OR all jobs have been invalidated.
151+
// No scan jobs mean that this is the first time PIR is being run OR all jobs have been invalidated.
141152
val toCreate = mutableListOf<ScanJobRecord>()
142153

143154
profileQueries.filter {
@@ -187,12 +198,12 @@ class RealPirJobsRunner @Inject constructor(
187198
}
188199
}
189200

190-
private suspend fun attemptCreateOptOutJobs(activeBrokers: Set<String>) {
201+
private suspend fun attemptCreateOptOutJobs(activeFormOptOutBrokers: Set<String>) {
191202
val toCreate = mutableListOf<OptOutJobRecord>()
192203
val extractedProfiles = pirRepository.getAllExtractedProfiles()
193204

194205
extractedProfiles.forEach {
195-
if (activeBrokers.contains(it.brokerName) &&
206+
if (activeFormOptOutBrokers.contains(it.brokerName) &&
196207
pirSchedulingRepository.getValidOptOutJobRecord(it.dbId) == null
197208
) {
198209
toCreate.add(
@@ -213,12 +224,13 @@ class RealPirJobsRunner @Inject constructor(
213224
}
214225
}
215226

216-
private suspend fun executeOptOutJobs(context: Context) {
217-
val formOptOutBrokers = pirRepository.getBrokersForOptOut(true)
218-
227+
private suspend fun executeOptOutJobs(
228+
context: Context,
229+
activeFormOptOutBrokers: Set<String>,
230+
) {
219231
eligibleOptOutJobProvider.getAllEligibleOptOutJobs(currentTimeProvider.currentTimeMillis())
220232
.filter {
221-
formOptOutBrokers.contains(it.brokerName)
233+
activeFormOptOutBrokers.contains(it.brokerName)
222234
}.also {
223235
pixelSender.reportOptOutStats(it.size)
224236
if (it.isNotEmpty()) {

pir/pir-impl/src/main/java/com/duckduckgo/pir/impl/service/DbpService.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ interface DbpService {
9797
@field:Json(name = "steps")
9898
val steps: List<String>,
9999
val schedulingConfig: PirJsonBrokerSchedulingConfig,
100+
val removedAt: Long?,
100101
)
101102

102103
data class PirJsonBrokerSchedulingConfig(

pir/pir-impl/src/main/java/com/duckduckgo/pir/impl/store/PirDatabase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import com.squareup.moshi.Types
4848

4949
@Database(
5050
exportSchema = true,
51-
version = 6,
51+
version = 7,
5252
entities = [
5353
BrokerJsonEtag::class,
5454
Broker::class,

0 commit comments

Comments
 (0)