Skip to content

Commit 9440b67

Browse files
committed
Added a StorageIndex for the source storage to reduce LIST calls
Before every restructuring or cleaning operation, the index is updated. The StorageIndex can be updated with a configurable sync time to make a full sync, otherwise it just updates directories that have files in them. A separate sync time can be set to also scan empty directories. The first implementation is only a InMemoryStorageIndex. For very large datasets, a file-based index might be needed.
1 parent 0de47d2 commit 9440b67

25 files changed

+435
-80
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,16 @@ source:
118118
# only actually needed if source type is hdfs
119119
azure:
120120
# azure options
121+
index:
122+
# Interval to fully synchronize the index with the source storage
123+
fullSyncInterval: 3600
124+
# Interval to sync empty directories with.
125+
# They are also synced during a full sync.
126+
emptyDirectorySyncInterval: 900
121127
```
122128

129+
The index makes a scan of the source before any operations. Further list operations are done on the index only. This is especially relevant for S3 storage where list operations are priced.
130+
123131
The target is similar, and in addition supports the local file system (`local`).
124132

125133
```yaml

restructure.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ source:
3434
# only actually needed if source type is hdfs
3535
hdfs:
3636
nameNodes: [hdfs-namenode-1, hdfs-namenode-2]
37+
index:
38+
# Interval to fully synchronize the index with the storage
39+
fullSyncInterval: 3600
40+
# Interval to sync empty directories with.
41+
# They are also synced during a full sync.
42+
emptyDirectorySyncInterval: 900
3743

3844
# Target data resource
3945
# @since: 0.7.0

src/main/java/org/radarbase/output/Application.kt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import org.radarbase.output.config.CommandLineArgs
2828
import org.radarbase.output.config.RestructureConfig
2929
import org.radarbase.output.format.RecordConverterFactory
3030
import org.radarbase.output.path.RecordPathFactory
31-
import org.radarbase.output.source.SourceStorage
32-
import org.radarbase.output.source.SourceStorageFactory
31+
import org.radarbase.output.source.*
3332
import org.radarbase.output.target.TargetStorage
3433
import org.radarbase.output.target.TargetStorageFactory
3534
import org.radarbase.output.util.Timer
@@ -39,7 +38,9 @@ import org.radarbase.output.worker.RadarKafkaRestructure
3938
import org.slf4j.LoggerFactory
4039
import redis.clients.jedis.JedisPool
4140
import java.io.IOException
41+
import java.nio.file.Path
4242
import java.text.NumberFormat
43+
import java.time.Duration
4344
import java.time.LocalDateTime
4445
import java.time.format.DateTimeFormatter
4546
import java.util.concurrent.atomic.LongAdder
@@ -78,9 +79,27 @@ class Application(
7879

7980
override val workerSemaphore = Semaphore(config.worker.numThreads * 2)
8081

82+
override val storageIndexManagers: Map<Path, StorageIndexManager>
83+
8184
private val jobs: List<Job>
8285

8386
init {
87+
val indexConfig = config.source.index
88+
val (fullScan, emptyScan) = if (indexConfig == null) {
89+
listOf(3600L, 900L)
90+
} else {
91+
listOf(indexConfig.fullSyncInterval, indexConfig.emptyDirectorySyncInterval)
92+
}.map { Duration.ofSeconds(it) }
93+
94+
storageIndexManagers = config.paths.inputs.associateWith { input ->
95+
MutableStorageIndexManager(
96+
InMemoryStorageIndex(),
97+
sourceStorage,
98+
fullScan,
99+
emptyScan,
100+
input,
101+
)
102+
}
84103
val serviceMutex = Mutex()
85104
jobs = listOfNotNull(
86105
RadarKafkaRestructure.job(config, serviceMutex),

src/main/java/org/radarbase/output/FileStoreFactory.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ import org.radarbase.output.config.RestructureConfig
2626
import org.radarbase.output.format.RecordConverterFactory
2727
import org.radarbase.output.path.RecordPathFactory
2828
import org.radarbase.output.source.SourceStorage
29+
import org.radarbase.output.source.StorageIndexManager
2930
import org.radarbase.output.target.TargetStorage
3031
import org.radarbase.output.worker.FileCacheStore
3132
import java.io.IOException
33+
import java.nio.file.Path
3234

3335
/** Factory for all factory classes and settings. */
3436
interface FileStoreFactory {
@@ -42,6 +44,7 @@ interface FileStoreFactory {
4244
val redisHolder: RedisHolder
4345
val offsetPersistenceFactory: OffsetPersistenceFactory
4446
val workerSemaphore: Semaphore
47+
val storageIndexManagers: Map<Path, StorageIndexManager>
4548

4649
@Throws(IOException::class)
4750
fun newFileCacheStore(accountant: Accountant): FileCacheStore

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import org.radarbase.output.FileStoreFactory
1111
import org.radarbase.output.accounting.Accountant
1212
import org.radarbase.output.accounting.AccountantImpl
1313
import org.radarbase.output.config.RestructureConfig
14+
import org.radarbase.output.source.StorageIndex
15+
import org.radarbase.output.source.StorageNode
1416
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
1517
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
1618
import org.radarbase.output.util.Timer
@@ -43,11 +45,11 @@ class SourceDataCleaner(
4345
private val supervisor = SupervisorJob()
4446

4547
@Throws(IOException::class, InterruptedException::class)
46-
suspend fun process(directoryName: String) {
48+
suspend fun process(storageIndex: StorageIndex, directoryName: String) {
4749
// Get files and directories
4850
val absolutePath = Paths.get(directoryName)
4951

50-
val paths = topicPaths(absolutePath)
52+
val paths = topicPaths(storageIndex, absolutePath)
5153

5254
logger.info("{} topics found", paths.size)
5355

@@ -56,7 +58,7 @@ class SourceDataCleaner(
5658
launch {
5759
try {
5860
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
59-
mapTopic(p)
61+
mapTopic(storageIndex, p)
6062
}
6163
if (deleteCount > 0) {
6264
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
@@ -70,7 +72,7 @@ class SourceDataCleaner(
7072
}
7173
}
7274

73-
private suspend fun mapTopic(topicPath: Path): Long {
75+
private suspend fun mapTopic(storageIndex: StorageIndex, topicPath: Path): Long {
7476
val topic = topicPath.fileName.toString()
7577
return try {
7678
lockManager.tryWithLock(topic) {
@@ -84,7 +86,7 @@ class SourceDataCleaner(
8486
fileStoreFactory,
8587
)
8688
}
87-
deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
89+
deleteOldFiles(storageIndex, accountant, extractionCheck, topic, topicPath).toLong()
8890
}
8991
}
9092
}
@@ -95,14 +97,15 @@ class SourceDataCleaner(
9597
}
9698

9799
private suspend fun deleteOldFiles(
100+
storageIndex: StorageIndex,
98101
accountant: Accountant,
99102
extractionCheck: ExtractionCheck,
100103
topic: String,
101104
topicPath: Path,
102105
): Int {
103106
val offsets = accountant.offsets.copyForTopic(topic)
104107

105-
val paths = sourceStorage.listTopicFiles(topic, topicPath, maxFilesPerTopic) { f ->
108+
val paths = sourceStorage.listTopicFiles(storageIndex, topic, topicPath, maxFilesPerTopic) { f ->
106109
f.lastModified.isBefore(deleteThreshold) &&
107110
// ensure that there is a file with a larger offset also
108111
// processed, so the largest offset is never removed.
@@ -115,6 +118,7 @@ class SourceDataCleaner(
115118
logger.info("Removing {}", file.path)
116119
Timer.time("cleaner.delete") {
117120
sourceStorage.delete(file.path)
121+
storageIndex.remove(StorageNode.StorageFile(file.path, Instant.MIN))
118122
}
119123
true
120124
} else {
@@ -127,8 +131,8 @@ class SourceDataCleaner(
127131
}
128132
}
129133

130-
private suspend fun topicPaths(path: Path): List<Path> =
131-
sourceStorage.listTopics(path, excludeTopics)
134+
private suspend fun topicPaths(storageIndex: StorageIndex, path: Path): List<Path> =
135+
sourceStorage.listTopics(storageIndex, path, excludeTopics)
132136
// different services start on different topics to decrease lock contention
133137
.shuffled()
134138

@@ -147,9 +151,10 @@ class SourceDataCleaner(
147151

148152
private suspend fun runCleaner(factory: FileStoreFactory) {
149153
SourceDataCleaner(factory).useSuspended { cleaner ->
150-
for (input in factory.config.paths.inputs) {
154+
for ((input, indexManager) in factory.storageIndexManagers) {
155+
indexManager.update()
151156
logger.info("Cleaning {}", input)
152-
cleaner.process(input.toString())
157+
cleaner.process(indexManager.storageIndex, input.toString())
153158
}
154159
logger.info("Cleaned up {} files", cleaner.deletedFileCount.format())
155160
}

src/main/java/org/radarbase/output/config/ResourceConfig.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ data class ResourceConfig(
1111
val hdfs: HdfsConfig? = null,
1212
val local: LocalConfig? = null,
1313
val azure: AzureConfig? = null,
14+
val index: StorageIndexConfig? = null,
1415
) {
1516
@get:JsonIgnore
1617
val sourceType: ResourceType by lazy {
@@ -33,3 +34,4 @@ data class ResourceConfig(
3334
ResourceType.AZURE -> copyOnChange(azure, { it?.withEnv(prefix) }) { copy(azure = it) }
3435
}
3536
}
37+
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.radarbase.output.config
2+
3+
data class StorageIndexConfig(
4+
/** How often to fully sync the storage index, in seconds. */
5+
val fullSyncInterval: Long = 3600L,
6+
/**
7+
* How often to sync empty directories with the storage index, in seconds.
8+
* If this is very large, empty directories will only be scanned during
9+
* full sync.
10+
*/
11+
val emptyDirectorySyncInterval: Long = 900L,
12+
)

src/main/java/org/radarbase/output/source/AzureSourceStorage.kt

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.radarbase.output.util.TemporaryDirectory
1111
import org.radarbase.output.util.withoutFirstSegment
1212
import java.nio.file.Path
1313
import java.nio.file.Paths
14+
import java.time.Instant
1415
import kotlin.io.path.createTempFile
1516
import kotlin.io.path.deleteIfExists
1617

@@ -24,22 +25,28 @@ class AzureSourceStorage(
2425

2526
private fun blobClient(path: Path) = blobContainerClient.getBlobClient(path.withoutFirstSegment())
2627

27-
override suspend fun list(path: Path, maxKeys: Int?): List<SimpleFileStatus> =
28+
override suspend fun list(path: Path, startAfter: Path?, maxKeys: Int?): List<StorageNode> =
2829
withContext(Dispatchers.IO) {
2930
var iterable: Iterable<BlobItem> = blobContainerClient.listBlobsByHierarchy("$path/")
31+
if (startAfter != null) {
32+
iterable = iterable.filter { Paths.get(it.name) > startAfter }
33+
}
3034
if (maxKeys != null) {
3135
iterable = iterable.take(maxKeys)
3236
}
3337
iterable.map {
34-
SimpleFileStatus(
35-
Paths.get(it.name),
36-
it.isPrefix ?: false,
37-
it.properties?.lastModified?.toInstant(),
38-
)
38+
if (it.isPrefix == true) {
39+
StorageNode.StorageFile(
40+
Paths.get(it.name),
41+
it.properties?.lastModified?.toInstant() ?: Instant.now(),
42+
)
43+
} else {
44+
StorageNode.StorageDirectory(Paths.get(it.name))
45+
}
3946
}
4047
}
4148

42-
override suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
49+
override suspend fun createTopicFile(topic: String, status: StorageNode): TopicFile {
4350
var topicFile = super.createTopicFile(topic, status)
4451

4552
if (readOffsetFromMetadata && topicFile.range.range.to == null) {
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.radarbase.output.source
2+
3+
class DelegatingStorageIndex(
4+
private val sourceStorage: SourceStorage,
5+
) : StorageIndex {
6+
override suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int?): List<StorageNode> =
7+
sourceStorage.list(dir.path, maxKeys = maxKeys)
8+
9+
override suspend fun remove(file: StorageNode.StorageFile) = Unit
10+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package org.radarbase.output.source
2+
3+
import org.radarbase.output.source.StorageIndex.Companion.ROOT
4+
import java.nio.file.Path
5+
import java.util.concurrent.ConcurrentHashMap
6+
import java.util.concurrent.ConcurrentMap
7+
8+
class InMemoryStorageIndex : MutableStorageIndex {
9+
private val fileIndex: ConcurrentMap<StorageNode.StorageDirectory, Map<Path, StorageNode>> = ConcurrentHashMap()
10+
private val rootSet = ConcurrentHashMap<Path, StorageNode>()
11+
12+
init {
13+
fileIndex[ROOT] = rootSet
14+
}
15+
16+
override suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int?): List<StorageNode> {
17+
val listing = if (dir === ROOT) {
18+
rootSet
19+
} else {
20+
fileIndex[dir] ?: return listOf()
21+
}
22+
23+
return if (maxKeys != null) {
24+
listing.values.take(maxKeys)
25+
} else {
26+
listing.values.toList()
27+
}
28+
}
29+
30+
private fun add(node: StorageNode) {
31+
var currentNode = node
32+
var parent = currentNode.parent()
33+
if (currentNode is StorageNode.StorageDirectory) {
34+
fileIndex.computeIfAbsent(currentNode) {
35+
mapOf()
36+
}
37+
}
38+
while (parent != null) {
39+
fileIndex.compute(parent) { _, map ->
40+
if (map == null) {
41+
mapOf(currentNode.path to currentNode)
42+
} else {
43+
val newMap = map.toMutableMap()
44+
newMap[currentNode.path] = currentNode
45+
newMap
46+
}
47+
}
48+
currentNode = parent
49+
parent = currentNode.parent()
50+
}
51+
rootSet[currentNode.path] = currentNode
52+
}
53+
54+
override suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>): Collection<StorageNode> {
55+
add(parent)
56+
nodes.asSequence()
57+
.filterIsInstance<StorageNode.StorageDirectory>()
58+
.forEach { node ->
59+
fileIndex.computeIfAbsent(node) {
60+
mapOf()
61+
}
62+
}
63+
val newMap = fileIndex.compute(parent) { _, map ->
64+
if (map == null) {
65+
buildMap(nodes.size) {
66+
nodes.forEach { put(it.path, it) }
67+
}
68+
} else {
69+
buildMap(nodes.size + map.size) {
70+
putAll(map)
71+
nodes.forEach { put(it.path, it) }
72+
}
73+
}
74+
} ?: mapOf()
75+
76+
return newMap.values
77+
}
78+
79+
override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>): Collection<StorageNode> {
80+
add(parent)
81+
val newMap = fileIndex.compute(parent) { _, map ->
82+
if (map == null) {
83+
buildMap(nodes.size) {
84+
nodes.forEach { put(it.path, it) }
85+
}
86+
} else {
87+
buildMap(nodes.size) {
88+
nodes.forEach { put(it.path, it) }
89+
}
90+
}
91+
} ?: mapOf()
92+
93+
nodes.asSequence()
94+
.filterIsInstance<StorageNode.StorageDirectory>()
95+
.filter { it.path !in newMap }
96+
.forEach { removeRecursive(it) }
97+
98+
return newMap.values
99+
}
100+
101+
override suspend fun remove(file: StorageNode.StorageFile) {
102+
val parent = file.parent()
103+
104+
if (parent != null) {
105+
fileIndex.computeIfPresent(parent) { _, map ->
106+
(map - file.path).takeIf { it.isNotEmpty() }
107+
}
108+
} else {
109+
rootSet.remove(file.path)
110+
}
111+
}
112+
113+
private fun removeRecursive(node: StorageNode.StorageDirectory) {
114+
val directoriesToRemove = ArrayDeque<StorageNode.StorageDirectory>()
115+
fileIndex.remove(node)?.values?.filterIsInstanceTo(directoriesToRemove)
116+
while (directoriesToRemove.isNotEmpty()) {
117+
val first = directoriesToRemove.removeFirst()
118+
fileIndex.remove(first)?.values?.filterIsInstanceTo(directoriesToRemove)
119+
}
120+
}
121+
}

0 commit comments

Comments
 (0)