Skip to content

Commit 6bd4054

Browse files
authored
[Storage] Fix flaky etag tests. (Azure#23656)
* fix these tests... * update comment. * in case retries kick in in the setup. * name stress jobs. * Revert "name stress jobs." This reverts commit 9c3bb8f.
1 parent f5265a3 commit 6bd4054

File tree

5 files changed

+93
-36
lines changed

5 files changed

+93
-36
lines changed

sdk/storage/azure-storage-blob-cryptography/src/test/java/com/azure/storage/blob/specialized/cryptography/EncyptedBlockBlobAPITest.groovy

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.azure.core.http.HttpPipelinePosition
88
import com.azure.core.http.HttpResponse
99
import com.azure.core.http.policy.HttpPipelinePolicy
1010
import com.azure.identity.DefaultAzureCredentialBuilder
11+
import com.azure.storage.blob.BlobClientBuilder
1112
import com.azure.storage.blob.BlobContainerClient
1213
import com.azure.storage.blob.BlobServiceClientBuilder
1314
import com.azure.storage.blob.BlobUrlParts
@@ -51,6 +52,7 @@ import java.nio.file.Files
5152
import java.nio.file.OpenOption
5253
import java.nio.file.StandardOpenOption
5354
import java.time.Duration
55+
import java.util.concurrent.atomic.AtomicInteger
5456

5557
class EncyptedBlockBlobAPITest extends APISpec {
5658

@@ -1241,9 +1243,29 @@ class EncyptedBlockBlobAPITest extends APISpec {
12411243
def outFile = new File(namer.getResourcePrefix())
12421244
Files.deleteIfExists(file.toPath())
12431245

1246+
def counter = new AtomicInteger()
1247+
12441248
expect:
1245-
def bac = getEncryptedClientBuilder(fakeKey, null, env.primaryAccount.credential,
1249+
def bacUploading = getEncryptedClientBuilder(fakeKey, null, env.primaryAccount.credential,
1250+
ebc.getBlobUrl().toString())
1251+
.buildEncryptedBlobAsyncClient()
1252+
1253+
def bacDownloading = getEncryptedClientBuilder(fakeKey, null, env.primaryAccount.credential,
12461254
ebc.getBlobUrl().toString())
1255+
.addPolicy({ context, next ->
1256+
return next.process()
1257+
.flatMap({ r ->
1258+
if (counter.incrementAndGet() == 1) {
1259+
/*
1260+
* When the download begins trigger an upload to overwrite the downloading blob
1261+
* so that the download is able to get an ETag before it is changed.
1262+
*/
1263+
return bacUploading.upload(data.defaultFlux, null, true)
1264+
.thenReturn(r)
1265+
}
1266+
return Mono.just(r)
1267+
})
1268+
})
12471269
.buildEncryptedBlobAsyncClient()
12481270

12491271
/*
@@ -1261,12 +1283,7 @@ class EncyptedBlockBlobAPITest extends APISpec {
12611283
*/
12621284
Hooks.onErrorDropped({ ignored -> /* do nothing with it */ })
12631285

1264-
/*
1265-
* When the download begins trigger an upload to overwrite the downloading blob after waiting 500 milliseconds
1266-
* so that the download is able to get an ETag before it is changed.
1267-
*/
1268-
StepVerifier.create(bac.downloadToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false)
1269-
.doOnSubscribe({ bac.upload(data.defaultFlux, null, true).delaySubscription(Duration.ofMillis(500)).subscribe() }))
1286+
StepVerifier.create(bacDownloading.downloadToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false))
12701287
.verifyErrorSatisfies({
12711288
/*
12721289
* If an operation is running on multiple threads and multiple return an exception Reactor will combine

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
package com.azure.storage.blob
55

6-
import com.azure.core.credential.TokenRequestContext
76
import com.azure.core.http.HttpHeaders
87
import com.azure.core.http.HttpMethod
98
import com.azure.core.http.HttpPipelineCallContext
@@ -17,7 +16,9 @@ import com.azure.core.test.TestMode
1716
import com.azure.core.util.CoreUtils
1817
import com.azure.core.util.FluxUtil
1918
import com.azure.identity.EnvironmentCredentialBuilder
19+
import com.azure.storage.blob.models.BlobErrorCode
2020
import com.azure.storage.blob.models.BlobProperties
21+
import com.azure.storage.blob.models.BlobStorageException
2122
import com.azure.storage.blob.models.CopyStatusType
2223
import com.azure.storage.blob.models.LeaseStateType
2324
import com.azure.storage.blob.models.ListBlobContainersOptions
@@ -32,7 +33,6 @@ import com.azure.storage.common.implementation.Constants
3233
import com.azure.storage.common.policy.RequestRetryOptions
3334
import com.azure.storage.common.test.shared.StorageSpec
3435
import com.azure.storage.common.test.shared.TestAccount
35-
import com.azure.storage.common.test.shared.policy.MockDownloadHttpResponse
3636
import reactor.core.publisher.Flux
3737
import reactor.core.publisher.Mono
3838
import spock.lang.Timeout
@@ -106,7 +106,7 @@ class APISpec extends StorageSpec {
106106
containerName = generateContainerName()
107107
cc = primaryBlobServiceClient.getBlobContainerClient(containerName)
108108
ccAsync = primaryBlobServiceAsyncClient.getBlobContainerAsyncClient(containerName)
109-
cc.create()
109+
ignoreErrors({ cc.create() }, BlobErrorCode.CONTAINER_ALREADY_EXISTS)
110110
}
111111

112112
def cleanup() {
@@ -125,7 +125,7 @@ class APISpec extends StorageSpec {
125125
createLeaseClient(containerClient).breakLeaseWithResponse(new BlobBreakLeaseOptions().setBreakPeriod(Duration.ofSeconds(0)), null, null)
126126
}
127127

128-
containerClient.delete()
128+
ignoreErrors({ containerClient.delete() }, BlobErrorCode.CONTAINER_NOT_FOUND)
129129
}
130130
}
131131
}
@@ -672,6 +672,16 @@ class APISpec extends StorageSpec {
672672
}
673673
}
674674

675+
def ignoreErrors(Closure closure, BlobErrorCode... errors) {
676+
try {
677+
closure.call()
678+
} catch (BlobStorageException ex) {
679+
if (!errors.contains(ex.errorCode)) {
680+
throw ex
681+
}
682+
}
683+
}
684+
675685
/**
676686
* Injects one retry-able IOException failure per url.
677687
*/

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ import com.azure.storage.blob.specialized.BlobClientBase
4444
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder
4545
import com.azure.storage.common.Utility
4646
import com.azure.storage.common.implementation.Constants
47-
import com.azure.storage.common.test.shared.TestHttpClientType
4847
import com.azure.storage.common.test.shared.extensions.LiveOnly
4948
import com.azure.storage.common.test.shared.extensions.PlaybackOnly
5049
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
5150
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
5251
import com.azure.storage.common.test.shared.policy.MockRetryRangeResponsePolicy
5352
import reactor.core.Exceptions
5453
import reactor.core.publisher.Hooks
54+
import reactor.core.publisher.Mono
5555
import reactor.test.StepVerifier
5656
import spock.lang.IgnoreIf
5757
import spock.lang.Unroll
@@ -66,6 +66,7 @@ import java.nio.file.StandardOpenOption
6666
import java.security.MessageDigest
6767
import java.time.Duration
6868
import java.time.OffsetDateTime
69+
import java.util.concurrent.atomic.AtomicInteger
6970

7071
class BlobAPITest extends APISpec {
7172
BlobClient bc
@@ -1142,11 +1143,32 @@ class BlobAPITest extends APISpec {
11421143
bc.uploadFromFile(file.toPath().toString(), true)
11431144
def outFile = new File(namer.getResourcePrefix())
11441145
Files.deleteIfExists(file.toPath())
1146+
def counter = new AtomicInteger()
11451147

11461148
expect:
1147-
def bac = instrument(new BlobClientBuilder()
1148-
.pipeline(bc.getHttpPipeline())
1149-
.endpoint(bc.getBlobUrl()))
1149+
def bacUploading = instrument(new BlobClientBuilder()
1150+
.endpoint(bc.getBlobUrl())
1151+
.credential(env.primaryAccount.credential))
1152+
.buildAsyncClient()
1153+
.getBlockBlobAsyncClient()
1154+
1155+
def bacDownloading = instrument(new BlobClientBuilder()
1156+
.addPolicy({ context, next ->
1157+
return next.process()
1158+
.flatMap({ r ->
1159+
if (counter.incrementAndGet() == 1) {
1160+
/*
1161+
* When the download begins trigger an upload to overwrite the downloading blob
1162+
* so that the download is able to get an ETag before it is changed.
1163+
*/
1164+
return bacUploading.upload(data.defaultFlux, data.defaultDataSize, true)
1165+
.thenReturn(r)
1166+
}
1167+
return Mono.just(r)
1168+
})
1169+
})
1170+
.endpoint(bc.getBlobUrl())
1171+
.credential(env.primaryAccount.credential))
11501172
.buildAsyncClient()
11511173
.getBlockBlobAsyncClient()
11521174

@@ -1165,12 +1187,7 @@ class BlobAPITest extends APISpec {
11651187
*/
11661188
Hooks.onErrorDropped({ ignored -> /* do nothing with it */ })
11671189

1168-
/*
1169-
* When the download begins trigger an upload to overwrite the downloading blob after waiting 500 milliseconds
1170-
* so that the download is able to get an ETag before it is changed.
1171-
*/
1172-
StepVerifier.create(bac.downloadToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false)
1173-
.doOnSubscribe({ bac.upload(data.defaultFlux, data.defaultDataSize, true).delaySubscription(Duration.ofMillis(500)).subscribe() }))
1190+
StepVerifier.create(bacDownloading.downloadToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false))
11741191
.verifyErrorSatisfies({
11751192
/*
11761193
* If an operation is running on multiple threads and multiple return an exception Reactor will combine

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ class BlockBlobAPITest extends APISpec {
6464
blobName = generateBlobName()
6565
blobClient = cc.getBlobClient(blobName)
6666
blockBlobClient = blobClient.getBlockBlobClient()
67-
blockBlobClient.upload(data.defaultInputStream, data.defaultDataSize)
67+
blockBlobClient.upload(data.defaultInputStream, data.defaultDataSize, true)
6868
blobAsyncClient = ccAsync.getBlobAsyncClient(generateBlobName())
6969
blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient()
70-
blockBlobAsyncClient.upload(data.defaultFlux, data.defaultDataSize).block()
70+
blockBlobAsyncClient.upload(data.defaultFlux, data.defaultDataSize, true).block()
7171
}
7272

7373
def "Stage block"() {

sdk/storage/azure-storage-file-datalake/src/test/java/com/azure/storage/file/datalake/FileAPITest.groovy

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@ import com.azure.storage.blob.models.BlobStorageException
1111
import com.azure.storage.common.ParallelTransferOptions
1212
import com.azure.storage.common.ProgressReceiver
1313
import com.azure.storage.common.implementation.Constants
14-
import com.azure.storage.common.test.shared.TestHttpClientType
1514
import com.azure.storage.common.test.shared.extensions.LiveOnly
1615
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
1716
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
1817
import com.azure.storage.common.test.shared.policy.MockRetryRangeResponsePolicy
19-
import com.azure.storage.file.datalake.models.DownloadRetryOptions
2018
import com.azure.storage.file.datalake.models.AccessTier
2119
import com.azure.storage.file.datalake.models.DataLakeRequestConditions
2220
import com.azure.storage.file.datalake.models.DataLakeStorageException
@@ -46,10 +44,9 @@ import com.azure.storage.file.datalake.options.FileScheduleDeletionOptions
4644
import reactor.core.Exceptions
4745
import reactor.core.publisher.Flux
4846
import reactor.core.publisher.Hooks
47+
import reactor.core.publisher.Mono
4948
import reactor.test.StepVerifier
50-
import spock.lang.Ignore
5149
import spock.lang.IgnoreIf
52-
import spock.lang.Requires
5350
import spock.lang.Retry
5451
import spock.lang.Unroll
5552

@@ -63,6 +60,7 @@ import java.security.MessageDigest
6360
import java.time.Duration
6461
import java.time.OffsetDateTime
6562
import java.time.temporal.ChronoUnit
63+
import java.util.concurrent.atomic.AtomicInteger
6664
import java.util.function.Consumer
6765

6866
class FileAPITest extends APISpec {
@@ -1502,18 +1500,38 @@ class FileAPITest extends APISpec {
15021500
}
15031501

15041502
@LiveOnly
1505-
@Ignore("failing in ci")
15061503
def "Download file etag lock"() {
15071504
setup:
15081505
def file = getRandomFile(Constants.MB)
15091506
fc.uploadFromFile(file.toPath().toString(), true)
15101507
def outFile = new File(namer.getResourcePrefix())
15111508
Files.deleteIfExists(file.toPath())
1509+
def counter = new AtomicInteger()
15121510

15131511
expect:
1514-
def fac = new DataLakePathClientBuilder()
1515-
.pipeline(fc.getHttpPipeline())
1512+
1513+
def facUploading = instrument(new DataLakePathClientBuilder()
1514+
.endpoint(fc.getPathUrl())
1515+
.credential(env.dataLakeAccount.credential))
1516+
.buildFileAsyncClient()
1517+
1518+
def facDownloading = instrument(new DataLakePathClientBuilder()
1519+
.addPolicy({ context, next ->
1520+
return next.process()
1521+
.flatMap({ r ->
1522+
if (counter.incrementAndGet() == 1) {
1523+
/*
1524+
* When the download begins trigger an upload to overwrite the downloading blob
1525+
* so that the download is able to get an ETag before it is changed.
1526+
*/
1527+
return facUploading.upload(data.defaultFlux, null, true)
1528+
.thenReturn(r)
1529+
}
1530+
return Mono.just(r)
1531+
})
1532+
})
15161533
.endpoint(fc.getPathUrl())
1534+
.credential(env.dataLakeAccount.credential))
15171535
.buildFileAsyncClient()
15181536

15191537
/*
@@ -1531,12 +1549,7 @@ class FileAPITest extends APISpec {
15311549
*/
15321550
Hooks.onErrorDropped({ ignored -> /* do nothing with it */ })
15331551

1534-
/*
1535-
* When the download begins trigger an upload to overwrite the downloading blob after waiting 500 milliseconds
1536-
* so that the download is able to get an ETag before it is changed.
1537-
*/
1538-
StepVerifier.create(fac.readToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false, null)
1539-
.doOnSubscribe({ fac.upload(data.defaultFlux, null, true).delaySubscription(Duration.ofMillis(500)).subscribe() }))
1552+
StepVerifier.create(facDownloading.readToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false, null))
15401553
.verifyErrorSatisfies({
15411554
/*
15421555
* If an operation is running on multiple threads and multiple return an exception Reactor will combine

0 commit comments

Comments
 (0)