Skip to content

Commit 210af7b

Browse files
committed
Make reading end offset configurable
1 parent 7b65ff6 commit 210af7b

File tree

5 files changed

+46
-18
lines changed

5 files changed

+46
-18
lines changed

restructure.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ source:
1313
bucket: radar
1414
accessToken: minioadmin
1515
secretKey: minioadmin
16+
# If true, try to read the metadata property "endOffset" to determine the
17+
# final offset of an input object.
18+
#endOffsetFromTags: false
1619
azure:
1720
endpoint: https://MyBlobStorageAccount.blob.core.windows.net
1821
# when using personal login
@@ -25,6 +28,9 @@ source:
2528
#sasToken: MyLongToken
2629
# if no credentials are supplied, this only works with a publicly writable blob storage
2730
container: MySourceContainer
31+
# If true, try to read the metadata property "endOffset" to determine the
32+
# final offset of an input object.
33+
#endOffsetFromMetadata: false
2834
# only actually needed if source type is hdfs
2935
hdfs:
3036
nameNodes: [hdfs-namenode-1, hdfs-namenode-2]

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,20 +318,32 @@ data class S3Config(
318318
/** Secret key belonging to access token. */
319319
val secretKey: String,
320320
/** Bucket name. */
321-
val bucket: String) {
322-
fun createS3Client() = MinioClient.Builder()
321+
val bucket: String,
322+
/** If no endOffset is in the filename, read it from object tags. */
323+
val endOffsetFromTags: Boolean = false
324+
) {
325+
fun createS3Client(): MinioClient = MinioClient.Builder()
323326
.endpoint(endpoint)
324327
.credentials(accessToken, secretKey)
325328
.build()
326329
}
327330

328331
data class AzureConfig(
332+
/** URL to reach object store at. */
329333
val endpoint: String,
334+
/** Name of the Azure Blob Storage container. */
330335
val container: String,
336+
/** If no endOffset is in the filename, read it from object metadata. */
337+
val endOffsetFromMetadata: Boolean = false,
338+
/** Azure username. */
331339
val username: String?,
340+
/** Azure password. */
332341
val password: String?,
342+
/** Shared Azure Blob Storage account name. */
333343
val accountName: String?,
344+
/** Shared Azure Blob Storage account key. */
334345
val accountKey: String?,
346+
/** Azure SAS token for a configured service. */
335347
val sasToken: String?
336348
) {
337349
fun createAzureClient(): BlobServiceClient = BlobServiceClientBuilder().apply {

src/main/java/org/radarbase/output/source/AzureSourceStorage.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.radarbase.output.source
33
import com.azure.storage.blob.BlobServiceClient
44
import org.apache.avro.file.SeekableFileInput
55
import org.apache.avro.file.SeekableInput
6+
import org.radarbase.output.config.AzureConfig
67
import org.radarbase.output.util.TemporaryDirectory
78
import org.radarbase.output.util.toKey
89
import java.nio.file.Files
@@ -11,10 +12,12 @@ import java.nio.file.Paths
1112

1213
class AzureSourceStorage(
1314
client: BlobServiceClient,
14-
container: String,
15+
config: AzureConfig,
1516
private val tempPath: Path
1617
): SourceStorage {
17-
private val blobContainerClient = client.getBlobContainerClient(container)
18+
private val blobContainerClient = client.getBlobContainerClient(config.container)
19+
private val readOffsetFromMetadata = config.endOffsetFromMetadata
20+
1821
private fun blobClient(path: Path) = blobContainerClient.getBlobClient(path.toKey())
1922

2023
override fun list(path: Path): Sequence<SimpleFileStatus> = blobContainerClient.listBlobsByHierarchy("$path/")
@@ -25,7 +28,7 @@ class AzureSourceStorage(
2528
override fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
2629
var topicFile = super.createTopicFile(topic, status)
2730

28-
if (topicFile.range.range.to == null) {
31+
if (readOffsetFromMetadata && topicFile.range.range.to == null) {
2932
try {
3033
val endOffset = blobClient(topicFile.path).properties.metadata["endOffset"]?.toLongOrNull()
3134

@@ -36,7 +39,7 @@ class AzureSourceStorage(
3639
})
3740
}
3841
} catch (ex: Exception) {
39-
// never mind
42+
// skip reading end offset
4043
}
4144
}
4245

src/main/java/org/radarbase/output/source/S3SourceStorage.kt

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.radarbase.output.source
33
import io.minio.*
44
import org.apache.avro.file.SeekableFileInput
55
import org.apache.avro.file.SeekableInput
6+
import org.radarbase.output.config.S3Config
67
import org.radarbase.output.util.TemporaryDirectory
78
import org.radarbase.output.util.bucketBuild
89
import org.radarbase.output.util.objectBuild
@@ -14,10 +15,12 @@ import java.nio.file.Paths
1415

1516
class S3SourceStorage(
1617
private val s3Client: MinioClient,
17-
private val bucket: String,
18+
config: S3Config,
1819
private val tempPath: Path
1920
): SourceStorage {
2021
override val walker: SourceStorageWalker = GeneralSourceStorageWalker(this)
22+
private val bucket = config.bucket
23+
private val readEndOffset = config.endOffsetFromTags
2124

2225
override fun list(path: Path): Sequence<SimpleFileStatus> = s3Client.listObjects(
2326
ListObjectsArgs.Builder().bucketBuild(bucket) {
@@ -34,14 +37,18 @@ class S3SourceStorage(
3437
override fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
3538
var topicFile = super.createTopicFile(topic, status)
3639

37-
if (topicFile.range.range.to == null) {
38-
val tags = s3Client.getObjectTags(GetObjectTagsArgs.Builder().objectBuild(bucket, status.path))
39-
val endOffset = tags.get()["endOffset"]?.toLongOrNull()
40-
if (endOffset != null) {
41-
topicFile = topicFile.copy(
42-
range = topicFile.range.mapRange {
43-
it.copy(to = endOffset)
44-
})
40+
if (readEndOffset && topicFile.range.range.to == null) {
41+
try {
42+
val tags = s3Client.getObjectTags(GetObjectTagsArgs.Builder().objectBuild(bucket, status.path))
43+
val endOffset = tags.get()["endOffset"]?.toLongOrNull()
44+
if (endOffset != null) {
45+
topicFile = topicFile.copy(
46+
range = topicFile.range.mapRange {
47+
it.copy(to = endOffset)
48+
})
49+
}
50+
} catch (ex: Exception) {
51+
// skip reading end offset
4552
}
4653
}
4754

src/main/java/org/radarbase/output/source/SourceStorageFactory.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class SourceStorageFactory(private val resourceConfig: ResourceConfig, private v
2020
ResourceType.S3 -> {
2121
val s3Config = requireNotNull(resourceConfig.s3)
2222
val minioClient = requireNotNull(s3SourceClient)
23-
S3SourceStorage(minioClient, s3Config.bucket, tempPath)
23+
S3SourceStorage(minioClient, s3Config, tempPath)
2424
}
2525
ResourceType.HDFS -> {
2626
val hdfsConfig = requireNotNull(resourceConfig.hdfs)
@@ -29,8 +29,8 @@ class SourceStorageFactory(private val resourceConfig: ResourceConfig, private v
2929
}
3030
ResourceType.AZURE -> {
3131
val azureClient = requireNotNull(azureSourceClient)
32-
val config = requireNotNull(resourceConfig.azure)
33-
AzureSourceStorage(azureClient, config.container, tempPath)
32+
val azureConfig = requireNotNull(resourceConfig.azure)
33+
AzureSourceStorage(azureClient, azureConfig, tempPath)
3434
}
3535
else -> throw IllegalStateException("Cannot create kafka storage for type ${resourceConfig.sourceType}")
3636
}

0 commit comments

Comments
 (0)