@@ -86,11 +86,11 @@ class RadarKafkaRestructure(
86
86
logger.info(" {} topics found" , paths.size)
87
87
88
88
coroutineScope {
89
- paths.map { p ->
89
+ paths.forEach { p ->
90
90
launch {
91
91
try {
92
92
val (fileCount, recordCount) = fileStoreFactory.workerSemaphore.withPermit {
93
- mapTopic(this @coroutineScope, p)
93
+ mapTopic(p)
94
94
}
95
95
processedFileCount.add(fileCount)
96
96
processedRecordsCount.add(recordCount)
@@ -102,8 +102,7 @@ class RadarKafkaRestructure(
102
102
}
103
103
}
104
104
105
- private suspend fun mapTopic (scope : CoroutineScope , topicPath : Path ): ProcessingStatistics {
106
- logger.info(" Mapping topic {}" , topicPath)
105
+ private suspend fun mapTopic (topicPath : Path ): ProcessingStatistics {
107
106
if (isClosed.get()) {
108
107
return ProcessingStatistics (0L , 0L )
109
108
}
@@ -112,9 +111,11 @@ class RadarKafkaRestructure(
112
111
113
112
return try {
114
113
val statistics = lockManager.tryWithLock(topic) {
115
- AccountantImpl (fileStoreFactory, topic).useSuspended { accountant ->
116
- accountant.initialize(scope)
117
- startWorker(topic, topicPath, accountant, accountant.offsets)
114
+ coroutineScope {
115
+ AccountantImpl (fileStoreFactory, topic).useSuspended { accountant ->
116
+ accountant.initialize(this @coroutineScope)
117
+ startWorker(topic, topicPath, accountant, accountant.offsets)
118
+ }
118
119
}
119
120
}
120
121
if (statistics == null ) {
@@ -134,16 +135,13 @@ class RadarKafkaRestructure(
134
135
seenFiles : OffsetRangeSet ): ProcessingStatistics {
135
136
return RestructureWorker (sourceStorage, accountant, fileStoreFactory, isClosed).useSuspended { worker ->
136
137
try {
137
- logger.info(" Collecting paths for topic {}" , topic)
138
138
val topicPaths = TopicFileList (topic, sourceStorage.walker.walkRecords(topic, topicPath)
139
139
.consumeAsFlow()
140
140
.filter { f -> ! seenFiles.contains(f.range)
141
141
&& f.lastModified.durationSince() >= minimumFileAge }
142
142
.take(maxFilesPerTopic)
143
143
.toList())
144
144
145
- logger.info(" Collected {} paths for topic {}" , topicPaths.numberOfFiles, topic)
146
-
147
145
if (topicPaths.numberOfFiles > 0 ) {
148
146
worker.processPaths(topicPaths)
149
147
}
0 commit comments