Skip to content

Commit ba1b284

Browse files
Evalirclaude
andauthored
feat(perms): add stream_bundles to BuilderTxCache (#124)
* feat(perms): add stream_bundles to BuilderTxCache Add a stream_bundles() method that automatically paginates through all bundles, yielding individual CachedBundle items as a Stream. Follows the same unfold+flatten pattern used by upstream TxCache for transactions and orders. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: bump version to 0.18.0-rc.11 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 78a8b24 commit ba1b284

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "init4-bin-base"
33

44
description = "Internal utilities for binaries produced by the init4 team"
55
keywords = ["init4", "bin", "base"]
6-
version = "0.18.0-rc.10"
6+
version = "0.18.0-rc.11"
77
edition = "2021"
88
rust-version = "1.85"
99
authors = ["init4", "James Prestwich", "evalir"]
@@ -56,6 +56,7 @@ async-trait = { version = "0.1.80", optional = true }
5656
# AWS
5757
aws-config = { version = "1.1.7", optional = true }
5858
aws-sdk-kms = { version = "1.15.0", optional = true }
59+
futures-util = { version = "0.3", optional = true }
5960
reqwest = { version = "0.12.15", optional = true }
6061
rustls = { version = "0.23.31", optional = true }
6162

@@ -71,7 +72,7 @@ tokio = { version = "1.43.0", features = ["macros"] }
7172
default = ["alloy", "rustls"]
7273
alloy = ["dep:alloy"]
7374
aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"]
74-
perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache"]
75+
perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache", "dep:futures-util"]
7576
pylon = ["perms", "alloy/kzg"]
7677
block_watcher = ["dep:tokio"]
7778
rustls = ["dep:rustls", "rustls/aws-lc-rs"]

src/perms/tx_cache.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use crate::perms::oauth::SharedToken;
2+
use futures_util::future::Either;
3+
use futures_util::stream::{self, Stream, StreamExt};
24
use serde::de::DeserializeOwned;
35
use signet_tx_cache::{
46
error::TxCacheError,
@@ -148,6 +150,24 @@ impl BuilderTxCache {
148150
format!("{BUNDLES}/{bundle_id}")
149151
}
150152

153+
/// Stream all bundles from the cache, automatically paginating through
154+
/// all available pages. Yields individual [`CachedBundle`] items.
155+
pub fn stream_bundles(&self) -> impl Stream<Item = Result<CachedBundle>> + Send + '_ {
156+
stream::unfold(Some(None), move |cursor| async move {
157+
let cursor = cursor?;
158+
159+
match self.get_bundles(cursor).await {
160+
Ok(response) => {
161+
let (inner, next_cursor) = response.into_parts();
162+
let bundles = stream::iter(inner.bundles).map(Ok);
163+
Some((Either::Left(bundles), next_cursor.map(Some)))
164+
}
165+
Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
166+
}
167+
})
168+
.flatten()
169+
}
170+
151171
/// Get a bundle from the cache by its UUID. For convenience, this method
152172
/// takes a string reference, which is expected to be a valid UUID.
153173
#[instrument(skip_all)]

0 commit comments

Comments
 (0)