Skip to content

Commit 62da3c5

Browse files
committed
Write blobs
Signed-off-by: itowlson <[email protected]>
1 parent 4e5e0cd commit 62da3c5

File tree

10 files changed

+465
-28
lines changed

10 files changed

+465
-28
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ sha2 = "0.10"
147147
tempfile = "3"
148148
test-environment = { git = "https://github.com/fermyon/conformance-tests", rev = "ecd22a45bcc5c775a56c67689a89aa4005866ac0" }
149149
thiserror = "1"
150-
tokio = "1"
150+
tokio = "1.40"
151151
toml = "0.8"
152152
tracing = { version = "0.1", features = ["log"] }
153153
tracing-opentelemetry = { version = "0.28", default-features = false, features = ["metrics"] }

crates/blobstore-azure/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ spin-factor-blobstore = { path = "../factor-blobstore" }
2020
tokio = { workspace = true }
2121
tokio-stream = "0.1.16"
2222
tokio-util = "0.7.12"
23+
uuid = { version = "1.0", features = ["v4"] }
2324
wasmtime-wasi = { workspace = true }
2425

2526
[lints]

crates/blobstore-azure/src/store.rs

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,62 @@ impl Container for AzureBlobContainer {
153153
Ok(Box::new(AzureBlobIncomingData::new(client, range)))
154154
}
155155

156+
async fn connect_stm(&self, name: &str, mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<()>) -> anyhow::Result<()> {
157+
use tokio::io::AsyncReadExt;
158+
159+
// It seems like we can't construct a SeekableStream over a SimplexStream, which
160+
// feels unfortunate. I am not sure that the outgoing-value interface gives
161+
// us a way to construct a len-able stream, because we don't know until finish
162+
// time how much the guest is going to write to it. (We might be able to do resettable...
163+
// but len-able...) So for now we read it into a buffer and then zoosh that up in
164+
// one go.
165+
//
166+
// We can kind of work around this by doing a series of Put Block calls followed by
167+
// a Put Block List. So we need to buffer only each block. But that still requires
168+
// care as you are limited to 50_000 committed / 100_000 uncommitted blocks.
169+
170+
const APPROX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
171+
172+
let client = self.client.blob_client(name);
173+
174+
tokio::spawn(async move {
175+
let mut blocks = vec![];
176+
177+
'put_blocks: loop {
178+
let mut bytes = Vec::with_capacity(APPROX_BLOCK_SIZE); // 2MB buffer x 50k blocks per blob = 100GB. WHICH SHOULD BE ENOUGH FOR ANYONE.
179+
loop {
180+
let read = stm.read_buf(&mut bytes).await.unwrap();
181+
let len = bytes.len();
182+
183+
if read == 0 {
184+
// end of stream - send the last block and go
185+
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
186+
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
187+
client.put_block(block_id.clone(), bytes).await.unwrap();
188+
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
189+
break 'put_blocks;
190+
}
191+
if len >= APPROX_BLOCK_SIZE {
192+
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
193+
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
194+
client.put_block(block_id.clone(), bytes).await.unwrap();
195+
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
196+
break;
197+
}
198+
}
199+
}
200+
201+
let block_list = azure_storage_blobs::blob::BlockList {
202+
blocks
203+
};
204+
client.put_block_list(block_list).await.unwrap();
205+
206+
finished_tx.send(()).await.expect("should sent finish tx");
207+
});
208+
209+
Ok(())
210+
}
211+
156212
async fn list_objects(&self) -> anyhow::Result<Box<dyn spin_factor_blobstore::ObjectNames>> {
157213
let stm = self.client.list_blobs().into_stream();
158214
Ok(Box::new(AzureBlobBlobsList::new(stm)))
@@ -179,16 +235,13 @@ impl AzureBlobIncomingData {
179235
}
180236
}
181237

182-
fn consume_async_impl(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream { // Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>>> {
238+
fn consume_async_impl(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream {
183239
use futures::TryStreamExt;
184240
use tokio_util::compat::FuturesAsyncReadCompatExt;
185241
let stm = self.consume_as_stream();
186242
let ar = stm.into_async_read();
187243
let arr = ar.compat();
188244
wasmtime_wasi::pipe::AsyncReadStream::new(arr)
189-
// Box::new(stm)
190-
// let async_read = stm.into_async_read();
191-
// todo!()
192245
}
193246

194247
fn consume_as_stream(&mut self) -> impl futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>> {

crates/blobstore-memory/src/lib.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ impl BlobStoreInMemory {
4848

4949
/// The serialized runtime configuration for the in memory blob store.
5050
#[derive(Deserialize, Serialize)]
51-
pub struct MemoryBlobStoreRuntimeConfig {
52-
ignored: Option<String>,
53-
}
51+
pub struct MemoryBlobStoreRuntimeConfig {}
5452

5553
#[async_trait]
5654
impl spin_factor_blobstore::ContainerManager for BlobStoreInMemory {
@@ -142,6 +140,21 @@ impl spin_factor_blobstore::Container for InMemoryContainer {
142140

143141
Ok(Box::new(InMemoryBlobContent { data }))
144142
}
143+
144+
async fn connect_stm(&self, name: &str, mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<()>) -> anyhow::Result<()> {
145+
use tokio::io::AsyncReadExt;
146+
let name = name.to_owned();
147+
let blobs = self.blobs.clone();
148+
tokio::spawn(async move {
149+
let mut bytes = Vec::with_capacity(1024);
150+
stm.read_to_end(&mut bytes).await.unwrap();
151+
let mut lock = blobs.write().await;
152+
lock.insert(name, bytes);
153+
finished_tx.send(()).await.expect("should sent finish tx");
154+
});
155+
Ok(())
156+
}
157+
145158
async fn list_objects(&self) -> anyhow::Result<Box<dyn spin_factor_blobstore::ObjectNames>> {
146159
let blobs = self.read().await;
147160
let names = blobs.keys().map(|k| k.to_string()).collect();
@@ -181,21 +194,21 @@ impl spin_factor_blobstore::ObjectNames for InMemoryBlobNames {
181194
async fn read(&mut self, len: u64) -> anyhow::Result<(Vec<String>, bool)> {
182195
let len = len.try_into().unwrap();
183196
if len > self.names.len() {
184-
Ok((self.names.drain(..).collect(), false))
197+
Ok((self.names.drain(..).collect(), true))
185198
} else {
186199
let taken = self.names.drain(0..len).collect();
187-
Ok((taken, !self.names.is_empty()))
200+
Ok((taken, self.names.is_empty()))
188201
}
189202
}
190203

191204
async fn skip(&mut self, num: u64) -> anyhow::Result<(u64,bool)> {
192205
let len = num.try_into().unwrap();
193-
let (count, more) = if len > self.names.len() {
194-
(self.names.drain(..).len(), false)
206+
let (count, at_end) = if len > self.names.len() {
207+
(self.names.drain(..).len(), true)
195208
} else {
196209
let taken = self.names.drain(0..len).len();
197-
(taken, !self.names.is_empty())
210+
(taken, self.names.is_empty())
198211
};
199-
Ok((count.try_into().unwrap(), more))
212+
Ok((count.try_into().unwrap(), at_end))
200213
}
201214
}

crates/factor-blobstore/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = { workspace = true }
66

77
[dependencies]
88
anyhow = { workspace = true }
9+
bytes = { workspace = true }
910
futures = { workspace = true }
1011
lru = "0.12"
1112
serde = { workspace = true }
@@ -15,7 +16,7 @@ spin-factors = { path = "../factors" }
1516
spin-locked-app = { path = "../locked-app" }
1617
spin-resource-table = { path = "../table" }
1718
spin-world = { path = "../world" }
18-
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
19+
tokio = { workspace = true, features = ["macros", "sync", "rt", "io-util"] }
1920
toml = { workspace = true }
2021
tracing = { workspace = true }
2122
wasmtime-wasi = { workspace = true }
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod write_stream;

0 commit comments

Comments
 (0)