@@ -6,13 +6,13 @@ import kotlinx.coroutines.launch
6
6
import kotlinx.coroutines.sync.Mutex
7
7
import kotlinx.coroutines.sync.withPermit
8
8
import kotlinx.coroutines.withContext
9
+ import org.radarbase.kotlin.coroutines.launchJoin
9
10
import org.radarbase.output.Application.Companion.format
10
11
import org.radarbase.output.FileStoreFactory
11
12
import org.radarbase.output.accounting.Accountant
12
13
import org.radarbase.output.accounting.AccountantImpl
13
14
import org.radarbase.output.config.RestructureConfig
14
- import org.radarbase.output.source.StorageIndex
15
- import org.radarbase.output.source.StorageNode
15
+ import org.radarbase.output.source.SourceStorageManager
16
16
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
17
17
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
18
18
import org.radarbase.output.util.Timer
@@ -21,17 +21,17 @@ import org.slf4j.LoggerFactory
21
21
import java.io.Closeable
22
22
import java.io.IOException
23
23
import java.nio.file.Path
24
- import java.nio.file.Paths
25
24
import java.time.Instant
26
25
import java.time.temporal.ChronoUnit
27
26
import java.util.concurrent.atomic.LongAdder
28
27
import kotlin.coroutines.coroutineContext
29
28
30
29
class SourceDataCleaner (
31
30
private val fileStoreFactory : FileStoreFactory ,
31
+ private val sourceStorageManager : SourceStorageManager ,
32
32
) : Closeable {
33
+ private val sourceStorage = sourceStorageManager.sourceStorage
33
34
private val lockManager = fileStoreFactory.remoteLockManager
34
- private val sourceStorage = fileStoreFactory.sourceStorage
35
35
private val excludeTopics: Set <String > = fileStoreFactory.config.topics
36
36
.mapNotNullTo(HashSet ()) { (topic, conf) ->
37
37
topic.takeIf { conf.excludeFromDelete }
@@ -45,11 +45,9 @@ class SourceDataCleaner(
45
45
private val supervisor = SupervisorJob ()
46
46
47
47
@Throws(IOException ::class , InterruptedException ::class )
48
- suspend fun process (storageIndex : StorageIndex , directoryName : String ) {
48
+ suspend fun process () {
49
49
// Get files and directories
50
- val absolutePath = Paths .get(directoryName)
51
-
52
- val paths = topicPaths(storageIndex, absolutePath)
50
+ val paths = topicPaths(sourceStorage.root)
53
51
54
52
logger.info(" {} topics found" , paths.size)
55
53
@@ -58,7 +56,7 @@ class SourceDataCleaner(
58
56
launch {
59
57
try {
60
58
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
61
- mapTopic(storageIndex, p)
59
+ mapTopic(p)
62
60
}
63
61
if (deleteCount > 0 ) {
64
62
logger.info(" Removed {} files in topic {}" , deleteCount, p.fileName)
@@ -72,7 +70,7 @@ class SourceDataCleaner(
72
70
}
73
71
}
74
72
75
- private suspend fun mapTopic (storageIndex : StorageIndex , topicPath : Path ): Long {
73
+ private suspend fun mapTopic (topicPath : Path ): Long {
76
74
val topic = topicPath.fileName.toString()
77
75
return try {
78
76
lockManager.tryWithLock(topic) {
@@ -86,7 +84,7 @@ class SourceDataCleaner(
86
84
fileStoreFactory,
87
85
)
88
86
}
89
- deleteOldFiles(storageIndex, accountant, extractionCheck, topic, topicPath).toLong()
87
+ deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
90
88
}
91
89
}
92
90
}
@@ -97,15 +95,14 @@ class SourceDataCleaner(
97
95
}
98
96
99
97
private suspend fun deleteOldFiles (
100
- storageIndex : StorageIndex ,
101
98
accountant : Accountant ,
102
99
extractionCheck : ExtractionCheck ,
103
100
topic : String ,
104
101
topicPath : Path ,
105
102
): Int {
106
103
val offsets = accountant.offsets.copyForTopic(topic)
107
104
108
- val paths = sourceStorage .listTopicFiles(storageIndex, topic, topicPath, maxFilesPerTopic) { f ->
105
+ val paths = sourceStorageManager .listTopicFiles(topic, topicPath, maxFilesPerTopic) { f ->
109
106
f.lastModified.isBefore(deleteThreshold) &&
110
107
// ensure that there is a file with a larger offset also
111
108
// processed, so the largest offset is never removed.
@@ -117,8 +114,7 @@ class SourceDataCleaner(
117
114
if (extractionCheck.isExtracted(file)) {
118
115
logger.info(" Removing {}" , file.path)
119
116
Timer .time(" cleaner.delete" ) {
120
- sourceStorage.delete(file.path)
121
- storageIndex.remove(StorageNode .StorageFile (file.path, Instant .MIN ))
117
+ sourceStorageManager.delete(file.path)
122
118
}
123
119
true
124
120
} else {
@@ -131,8 +127,8 @@ class SourceDataCleaner(
131
127
}
132
128
}
133
129
134
- private suspend fun topicPaths (storageIndex : StorageIndex , path : Path ): List <Path > =
135
- sourceStorage .listTopics(storageIndex, path, excludeTopics)
130
+ private suspend fun topicPaths (path : Path ): List <Path > =
131
+ sourceStorageManager .listTopics(path, excludeTopics)
136
132
// different services start on different topics to decrease lock contention
137
133
.shuffled()
138
134
@@ -149,14 +145,14 @@ class SourceDataCleaner(
149
145
null
150
146
}
151
147
152
- private suspend fun runCleaner (factory : FileStoreFactory ) {
153
- SourceDataCleaner (factory).useSuspended { cleaner ->
154
- for ((input, indexManager) in factory.storageIndexManagers) {
155
- indexManager.update()
156
- logger.info(" Cleaning {}" , input)
157
- cleaner.process(indexManager.storageIndex, input.toString())
148
+ private suspend fun runCleaner (factory : FileStoreFactory ) = coroutineScope {
149
+ factory.sourceStorage.launchJoin { sourceStorage ->
150
+ SourceDataCleaner (factory, sourceStorage).useSuspended { cleaner ->
151
+ sourceStorage.storageIndexManager.update()
152
+ logger.info(" Cleaning {}" , sourceStorage.sourceStorage.root)
153
+ cleaner.process()
154
+ logger.info(" Cleaned up {} files" , cleaner.deletedFileCount.format())
158
155
}
159
- logger.info(" Cleaned up {} files" , cleaner.deletedFileCount.format())
160
156
}
161
157
}
162
158
}
0 commit comments