File tree Expand file tree Collapse file tree 2 files changed +26
-2
lines changed
src/main/java/org/radarbase/output/source Expand file tree Collapse file tree 2 files changed +26
-2
lines changed Original file line number Diff line number Diff line change @@ -9,7 +9,6 @@ import java.nio.file.Files
99import java.nio.file.Path
1010import java.nio.file.Paths
1111
12-
1312class AzureSourceStorage (
1413 client : BlobServiceClient ,
1514 container : String ,
@@ -22,6 +21,28 @@ class AzureSourceStorage(
2221 .asSequence()
2322 .map { SimpleFileStatus (Paths .get(it.name), it.isPrefix ? : false , it.properties?.lastModified?.toInstant()) }
2423
24+
25+ override fun createTopicFile (topic : String , status : SimpleFileStatus ): TopicFile {
26+ var topicFile = super .createTopicFile(topic, status)
27+
28+ if (topicFile.range.range.to == null ) {
29+ try {
30+ val endOffset = blobClient(topicFile.path).properties.metadata[" endOffset" ]?.toLongOrNull()
31+
32+ if (endOffset != null ) {
33+ topicFile = topicFile.copy(
34+ range = topicFile.range.mapRange {
35+ it.copy(to = endOffset)
36+ })
37+ }
38+ } catch (ex: Exception ) {
39+ // never mind
40+ }
41+ }
42+
43+ return topicFile
44+ }
45+
2546 override fun delete (path : Path ) {
2647 blobClient(path).delete()
2748 }
Original file line number Diff line number Diff line change @@ -37,7 +37,10 @@ class S3SourceStorage(
3737 val tags = s3Client.getObjectTags(GetObjectTagsArgs .Builder ().objectBuild(bucket, status.path))
3838 val endOffset = tags.get()[" endOffset" ]?.toLongOrNull()
3939 if (endOffset != null ) {
40- topicFile = topicFile.copy(range = topicFile.range.mapRange { it.copy(to = endOffset) })
40+ topicFile = topicFile.copy(
41+ range = topicFile.range.mapRange {
42+ it.copy(to = endOffset)
43+ })
4144 }
4245 }
4346
You can’t perform that action at this time.
0 commit comments