Skip to content

Commit 4e67bc6

Browse files
Add list per prefix to fix productions topology snapshots (#3369)
Signed-off-by: Julien Tinguely <[email protected]>
1 parent ad92710 commit 4e67bc6

File tree

4 files changed

+19
-9
lines changed

4 files changed

+19
-9
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/PeriodicTopologySnapshotIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class PeriodicTopologySnapshotIntegrationTest[T <: BackupDumpConfig] extends Int
2727
BackupDumpConfig.Gcp(GcpBucketConfig.inferForTesting(TopologySnapshotTest), None)
2828

2929
private def listDump(utcDate: String): Seq[Blob] =
30-
bucket.list(startOffset = s"topology_snapshot_$utcDate", endOffset = "")
30+
bucket.listBlobsByPrefix(prefix = s"topology_snapshot_$utcDate")
3131

3232
override def environmentDefinition: SpliceEnvironmentDefinition =
3333
EnvironmentDefinition

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/DataExportTestUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ trait DataExportTestUtil extends TestCommon {
2525
s"$cluster/$namespace/${getFileName(instant)}"
2626
val now = Instant.now
2727
// Query everything within the last 20min and check that we have at least one.
28-
val blobs = bucket.list(name(now.plus(-20, ChronoUnit.MINUTES)), name(now))
28+
val blobs = bucket.listBlobsByOffset(name(now.plus(-20, ChronoUnit.MINUTES)), name(now))
2929
blobs should not be empty
3030
forAll(blobs) { blob =>
3131
val dump = bucket.readStringFromBucket(Paths.get(blob.getName))

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/util/BackupDump.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,7 @@ object BackupDump {
9696
case Some(p) => s"$p/"
9797
case None => ""
9898
}
99-
val blobs = gcpBucket.list(s"$pref$offset", "")
100-
loggerFactory
101-
.getLogger(this.getClass)
102-
.info(
103-
s"all matched blobs found: ${blobs.map(_.getName)}, using offset $offset, and prefix $pref, bucket ${bucketConfig.bucketName}, project: ${bucketConfig.projectId}"
104-
)
99+
val blobs = gcpBucket.listBlobsByPrefix(prefix = s"$pref$offset")
105100
blobs.nonEmpty
106101
case _ =>
107102
throw Status.UNIMPLEMENTED

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/util/GcpBucket.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class GcpBucket(config: GcpBucketConfig, override val loggerFactory: NamedLogger
4141
}
4242

4343
@SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.While"))
44-
def list(startOffset: String, endOffset: String): Seq[Blob] = {
44+
def listBlobsByOffset(startOffset: String, endOffset: String): Seq[Blob] = {
4545
val blobs = Seq.newBuilder[Blob]
4646
var page = storage.list(
4747
config.bucketName,
@@ -56,6 +56,21 @@ class GcpBucket(config: GcpBucketConfig, override val loggerFactory: NamedLogger
5656
blobs.result()
5757
}
5858

59+
@SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.While"))
60+
def listBlobsByPrefix(prefix: String): Seq[Blob] = {
61+
val blobs = Seq.newBuilder[Blob]
62+
var page = storage.list(
63+
config.bucketName,
64+
Storage.BlobListOption.prefix(prefix),
65+
)
66+
blobs ++= page.getValues().asScala
67+
while (page.hasNextPage) {
68+
page = page.getNextPage
69+
blobs ++= page.getValues().asScala
70+
}
71+
blobs.result()
72+
}
73+
5974
def readBytesFromBucket(fileName: String): Array[Byte] = {
6075
val blobId = BlobId.of(config.bucketName, fileName)
6176
val blob = storage.get(blobId)

0 commit comments

Comments
 (0)