Skip to content

Commit 8cdc5a4

Browse files
committed
Tested restructure on Azure
1 parent b53f822 commit 8cdc5a4

File tree

5 files changed

+55
-13
lines changed

5 files changed

+55
-13
lines changed

restructure.yml

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,49 @@ service:
77
# Source data resource
88
# @since: 0.7.0
99
source:
10-
type: s3 # hdfs or s3
10+
type: s3 # hdfs, azure or s3
1111
s3:
1212
endpoint: http://localhost:9000 # using AWS S3 endpoint is also possible.
1313
bucket: radar
1414
accessToken: minioadmin
1515
secretKey: minioadmin
16+
azure:
17+
endpoint: https://MyBlobStorageAccount.blob.core.windows.net
18+
# when using personal login
19+
#username: User
20+
#password: Password
21+
# when using shared access tokens
22+
#accountName: MyBlobStorageAccount
23+
#accountKey: MyLongToken
24+
# when using a specially made SAS token
25+
#sasToken: MyLongToken
26+
# if no credentials are supplied, this only works with a publicly writable blob storage
27+
container: MySourceContainer
1628
# only actually needed if source type is hdfs
1729
hdfs:
1830
nameNodes: [hdfs-namenode-1, hdfs-namenode-2]
1931

2032
# Target data resource
2133
# @since: 0.7.0
2234
target:
23-
type: s3 # s3 or local
35+
type: s3 # s3, azure or local
2436
s3:
2537
endpoint: http://localhost:9000
2638
bucket: out
2739
accessToken: minioadmin
2840
secretKey: minioadmin
41+
azure:
42+
endpoint: https://MyBlobStorageAccount.blob.core.windows.net
43+
# when using personal login
44+
#username: User
45+
#password: Password
46+
# when using shared access tokens
47+
#accountName: MyBlobStorageAccount
48+
#accountKey: MyLongToken
49+
# when using a specially made SAS token
50+
#sasToken: MyLongToken
51+
# if no credentials are supplied, this only works with a publicly writable blob storage
52+
container: MyTargetContainer
2953
# only actually needed if target type is local
3054
local:
3155
userId: 1000 # write as regular user, use -1 to use current user (default).
@@ -102,7 +126,7 @@ paths:
102126
# Output directory in target
103127
output: /output
104128
# Output path construction factory
105-
factory: org.radarbase.output.path.MonthlyObservationKeyPathFactory
129+
#factory: org.radarbase.output.path.MonthlyObservationKeyPathFactory
106130
# Additional properties
107131
# properties: {}
108132

src/main/java/org/radarbase/output/accounting/OffsetRangeSet.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.radarbase.output.accounting
1818

19+
import com.fasterxml.jackson.annotation.JsonIgnore
1920
import org.radarbase.output.util.FunctionalValue
2021
import org.radarbase.output.util.LockedFunctionalValue
2122
import org.radarbase.output.util.ReadOnlyFunctionalValue
@@ -164,6 +165,7 @@ class OffsetRangeSet {
164165

165166

166167
data class Range(val from: Long, val to: Long?, val lastProcessed: Instant = Instant.now()) {
168+
@JsonIgnore
167169
val size: Long? = to?.let { it - from + 1 }
168170
fun ensureToOffset(): Range = if (to == null) copy(to = from) else this
169171
override fun toString() = "($from - $to, $lastProcessed)"

src/main/java/org/radarbase/output/accounting/OffsetRedisPersistence.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.radarbase.output.accounting
1818

1919
import com.fasterxml.jackson.databind.ObjectReader
2020
import com.fasterxml.jackson.databind.ObjectWriter
21+
import com.fasterxml.jackson.databind.SerializationFeature
2122
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
2223
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
2324
import org.radarbase.output.util.PostponedWriter
@@ -95,8 +96,10 @@ class OffsetRedisPersistence(
9596
val ranges: List<OffsetRangeSet.Range>)
9697

9798
private val logger = LoggerFactory.getLogger(OffsetRedisPersistence::class.java)
98-
private val mapper = jacksonObjectMapper()
99-
.registerModule(JavaTimeModule())
99+
private val mapper = jacksonObjectMapper().apply {
100+
registerModule(JavaTimeModule())
101+
configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
102+
}
100103
val redisOffsetWriter: ObjectWriter = mapper.writerFor(RedisOffsetRangeSet::class.java)
101104
val redisOffsetReader: ObjectReader = mapper.readerFor(RedisOffsetRangeSet::class.java)
102105
}

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.radarbase.output.config
33
import com.azure.core.credential.BasicAuthenticationCredential
44
import com.azure.storage.blob.BlobServiceClient
55
import com.azure.storage.blob.BlobServiceClientBuilder
6+
import com.azure.storage.common.StorageSharedKeyCredential
67
import com.fasterxml.jackson.annotation.JsonIgnore
78
import io.minio.MinioClient
89
import org.apache.hadoop.conf.Configuration
@@ -329,11 +330,23 @@ data class S3Config(
329330
data class AzureConfig(
330331
val endpoint: String,
331332
val container: String,
332-
val username: String,
333-
val password: String
333+
val username: String?,
334+
val password: String?,
335+
val accountName: String?,
336+
val accountKey: String?,
337+
val sasToken: String?
334338
) {
335-
fun createAzureClient(): BlobServiceClient = BlobServiceClientBuilder()
336-
.endpoint(endpoint)
337-
.credential(BasicAuthenticationCredential(username, password))
338-
.buildClient()
339+
fun createAzureClient(): BlobServiceClient = BlobServiceClientBuilder().apply {
340+
endpoint(endpoint)
341+
when {
342+
username != null && password != null -> credential(BasicAuthenticationCredential(username, password))
343+
accountName != null && accountKey != null -> credential(StorageSharedKeyCredential(accountName, accountKey))
344+
sasToken != null -> sasToken(sasToken)
345+
else -> logger.warn("No Azure credentials supplied. Assuming a public blob storage.")
346+
}
347+
}.buildClient()
348+
349+
companion object {
350+
private val logger = LoggerFactory.getLogger(AzureConfig::class.java)
351+
}
339352
}

src/main/java/org/radarbase/output/source/AzureSourceStorage.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class AzureSourceStorage(
1919

2020
override fun list(path: Path): Sequence<SimpleFileStatus> = blobContainerClient.listBlobsByHierarchy("$path/")
2121
.asSequence()
22-
.map { SimpleFileStatus(Paths.get(it.name), it.isPrefix, it.properties.lastModified.toInstant()) }
22+
.map { SimpleFileStatus(Paths.get(it.name), it.isPrefix ?: false, it.properties?.lastModified?.toInstant()) }
2323

2424
override fun delete(path: Path) {
2525
blobContainerClient.getBlobClient(path.toKey())
@@ -38,7 +38,7 @@ class AzureSourceStorage(
3838

3939
blobContainerClient
4040
.getBlobClient(file.path.toKey())
41-
.downloadToFile(fileName.toString())
41+
.downloadToFile(fileName.toString(), true)
4242

4343
return object : SeekableFileInput(fileName.toFile()) {
4444
override fun close() {

0 commit comments

Comments
 (0)