Skip to content

Commit 3fa6bc0

Browse files
committed
Renaming and reorganising
Signed-off-by: itowlson <[email protected]>
1 parent 871ebcd commit 3fa6bc0

File tree

13 files changed

+335
-535
lines changed

13 files changed

+335
-535
lines changed

crates/blobstore-azure/src/lib.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ mod store;
33
use serde::Deserialize;
44
use spin_factor_blobstore::runtime_config::spin::MakeBlobStore;
55
use store::{
6-
BlobStoreAzureBlob,
7-
// KeyValueAzureCosmos, KeyValueAzureCosmosAuthOptions, KeyValueAzureCosmosRuntimeConfigOptions,
6+
AzureContainerManager,
7+
auth::{AzureBlobAuthOptions, AzureKeyAuth},
88
};
99

1010
/// A key-value store that uses Azure Cosmos as the backend.
1111
#[derive(Default)]
12-
pub struct AzureBlobStore {
12+
pub struct AzureBlobStoreBuilder {
1313
_priv: (),
1414
}
1515

16-
impl AzureBlobStore {
17-
/// Creates a new `AzureBlobStore`.
16+
impl AzureBlobStoreBuilder {
17+
/// Creates a new `AzureBlobStoreBuilder`.
1818
pub fn new() -> Self {
1919
Self::default()
2020
}
@@ -29,30 +29,23 @@ pub struct AzureBlobStoreRuntimeConfig {
2929
account: String,
3030
}
3131

32-
impl MakeBlobStore for AzureBlobStore {
32+
impl MakeBlobStore for AzureBlobStoreBuilder {
3333
const RUNTIME_CONFIG_TYPE: &'static str = "azure_blob";
3434

3535
type RuntimeConfig = AzureBlobStoreRuntimeConfig;
3636

37-
type ContainerManager = BlobStoreAzureBlob;
37+
type ContainerManager = AzureContainerManager;
3838

3939
fn make_store(
4040
&self,
4141
runtime_config: Self::RuntimeConfig,
4242
) -> anyhow::Result<Self::ContainerManager> {
4343
let auth = match &runtime_config.key {
44-
Some(key) => store::BlobStoreAzureAuthOptions::RuntimeConfigValues(store::BlobStoreAzureRuntimeConfigOptions::new(runtime_config.account.clone(), key.clone())),
45-
None => store::BlobStoreAzureAuthOptions::Environmental,
44+
Some(key) => AzureBlobAuthOptions::AccountKey(AzureKeyAuth::new(runtime_config.account.clone(), key.clone())),
45+
None => AzureBlobAuthOptions::Environmental,
4646
};
47-
4847

49-
// let account = &runtime_config.account;
50-
// let key = &runtime_config.key;
51-
52-
// let credentials = azure_storage::prelude::StorageCredentials::access_key(account, key);
53-
// let client = azure_storage_blobs::prelude::ClientBuilder::new(account, credentials);
54-
55-
let blob_store = BlobStoreAzureBlob::new(auth)?;
48+
let blob_store = AzureContainerManager::new(auth)?;
5649
Ok(blob_store)
5750
}
5851
}

crates/blobstore-azure/src/store.rs

Lines changed: 26 additions & 228 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,31 @@
11
use std::sync::Arc;
2-
use tokio::sync::Mutex;
32

43
use anyhow::Result;
5-
// use azure_data_cosmos::{
6-
// prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query},
7-
// CosmosEntity,
8-
// };
94
use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient};
10-
// use futures::StreamExt;
11-
// use serde::{Deserialize, Serialize};
125
use spin_core::async_trait;
136
use spin_factor_blobstore::{Error, Container, ContainerManager};
147

15-
pub struct BlobStoreAzureBlob {
16-
client: BlobServiceClient,
17-
// client: CollectionClient,
18-
}
8+
pub mod auth;
9+
mod incoming_data;
10+
mod object_names;
1911

20-
/// Azure Cosmos Key / Value runtime config literal options for authentication
21-
#[derive(Clone, Debug)]
22-
pub struct BlobStoreAzureRuntimeConfigOptions {
23-
account: String,
24-
key: String,
25-
}
12+
use auth::AzureBlobAuthOptions;
13+
use incoming_data::AzureIncomingData;
14+
use object_names::AzureObjectNames;
2615

27-
impl BlobStoreAzureRuntimeConfigOptions {
28-
pub fn new(account: String, key: String) -> Self {
29-
Self { account, key }
30-
}
31-
}
32-
33-
/// Azure Cosmos Key / Value enumeration for the possible authentication options
34-
#[derive(Clone, Debug)]
35-
pub enum BlobStoreAzureAuthOptions {
36-
/// Runtime Config values indicates the account and key have been specified directly
37-
RuntimeConfigValues(BlobStoreAzureRuntimeConfigOptions),
38-
/// Environmental indicates that the environment variables of the process should be used to
39-
/// create the StorageCredentials for the storage client. For now this uses old school credentials:
40-
///
41-
/// STORAGE_ACCOUNT
42-
/// STORAGE_ACCESS_KEY
43-
///
44-
/// TODO: Thorsten pls make this proper with *hand waving* managed identity and stuff!
45-
Environmental,
16+
pub struct AzureContainerManager {
17+
client: BlobServiceClient,
4618
}
4719

48-
impl BlobStoreAzureBlob {
20+
impl AzureContainerManager {
4921
pub fn new(
50-
// account: String,
51-
// container: String,
52-
auth_options: BlobStoreAzureAuthOptions,
22+
auth_options: AzureBlobAuthOptions,
5323
) -> Result<Self> {
5424
let (account, credentials) = match auth_options {
55-
BlobStoreAzureAuthOptions::RuntimeConfigValues(config) => {
25+
AzureBlobAuthOptions::AccountKey(config) => {
5626
(config.account.clone(), azure_storage::StorageCredentials::access_key(&config.account, config.key.clone()))
5727
},
58-
BlobStoreAzureAuthOptions::Environmental => {
28+
AzureBlobAuthOptions::Environmental => {
5929
let account = std::env::var("STORAGE_ACCOUNT").expect("missing STORAGE_ACCOUNT");
6030
let access_key = std::env::var("STORAGE_ACCESS_KEY").expect("missing STORAGE_ACCOUNT_KEY");
6131
(account.clone(), azure_storage::StorageCredentials::access_key(account, access_key))
@@ -68,10 +38,10 @@ impl BlobStoreAzureBlob {
6838
}
6939

7040
#[async_trait]
71-
impl ContainerManager for BlobStoreAzureBlob {
41+
impl ContainerManager for AzureContainerManager {
7242
async fn get(&self, name: &str) -> Result<Arc<dyn Container>, Error> {
73-
Ok(Arc::new(AzureBlobContainer {
74-
_name: name.to_owned(),
43+
Ok(Arc::new(AzureContainer {
44+
_label: name.to_owned(),
7545
client: self.client.container_client(name),
7646
}))
7747
}
@@ -85,16 +55,16 @@ impl ContainerManager for BlobStoreAzureBlob {
8555
}
8656
}
8757

88-
struct AzureBlobContainer {
89-
_name: String,
58+
struct AzureContainer {
59+
_label: String,
9060
client: ContainerClient,
9161
}
9262

9363
/// Azure doesn't provide us with a container creation time
9464
const DUMMY_CREATED_AT: u64 = 0;
9565

9666
#[async_trait]
97-
impl Container for AzureBlobContainer {
67+
impl Container for AzureContainer {
9868
async fn exists(&self) -> anyhow::Result<bool> {
9969
Ok(self.client.exists().await?)
10070
}
@@ -150,32 +120,24 @@ impl Container for AzureBlobContainer {
150120
azure_core::request_options::Range::Range(start..(end + 1))
151121
};
152122
let client = self.client.blob_client(name);
153-
Ok(Box::new(AzureBlobIncomingData::new(client, range)))
123+
Ok(Box::new(AzureIncomingData::new(client, range)))
154124
}
155125

156126
async fn connect_stm(&self, name: &str, mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<()>) -> anyhow::Result<()> {
157127
use tokio::io::AsyncReadExt;
158128

159-
// It seems like we can't construct a SeekableStream over a SimplexStream, which
160-
// feels unfortunate. I am not sure that the outgoing-value interface gives
161-
// us a way to construct a len-able stream, because we don't know until finish
162-
// time how much the guest is going to write to it. (We might be able to do resettable...
163-
// but len-able...) So for now we read it into a buffer and then zoosh that up in
164-
// one go.
165-
//
166-
// We can kind of work around this by doing a series of Put Block calls followed by
167-
// a Put Block List. So we need to buffer only each block. But that still requires
168-
// care as you are limited to 50_000 committed / 100_000 uncommitted blocks.
169-
170-
const APPROX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
129+
// Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
130+
// enough for most use cases. If users need flexibility for larger blobs, we could make
131+
// the block size configurable via the runtime config ("size hint" or something).
132+
const BLOCK_SIZE: usize = 2 * 1024 * 1024;
171133

172134
let client = self.client.blob_client(name);
173135

174136
tokio::spawn(async move {
175137
let mut blocks = vec![];
176138

177139
'put_blocks: loop {
178-
let mut bytes = Vec::with_capacity(APPROX_BLOCK_SIZE); // 2MB buffer x 50k blocks per blob = 100GB. WHICH SHOULD BE ENOUGH FOR ANYONE.
140+
let mut bytes = Vec::with_capacity(BLOCK_SIZE);
179141
loop {
180142
let read = stm.read_buf(&mut bytes).await.unwrap();
181143
let len = bytes.len();
@@ -188,7 +150,7 @@ impl Container for AzureBlobContainer {
188150
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
189151
break 'put_blocks;
190152
}
191-
if len >= APPROX_BLOCK_SIZE {
153+
if len >= BLOCK_SIZE {
192154
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
193155
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
194156
client.put_block(block_id.clone(), bytes).await.unwrap();
@@ -211,171 +173,7 @@ impl Container for AzureBlobContainer {
211173

212174
async fn list_objects(&self) -> anyhow::Result<Box<dyn spin_factor_blobstore::ObjectNames>> {
213175
let stm = self.client.list_blobs().into_stream();
214-
Ok(Box::new(AzureBlobBlobsList::new(stm)))
176+
Ok(Box::new(AzureObjectNames::new(stm)))
215177
}
216178
}
217179

218-
struct AzureBlobIncomingData {
219-
// The Mutex is used to make it Send
220-
stm: Mutex<Option<
221-
azure_core::Pageable<
222-
azure_storage_blobs::blob::operations::GetBlobResponse,
223-
azure_core::error::Error
224-
>
225-
>>,
226-
client: azure_storage_blobs::prelude::BlobClient,
227-
}
228-
229-
impl AzureBlobIncomingData {
230-
fn new(client: azure_storage_blobs::prelude::BlobClient, range: azure_core::request_options::Range) -> Self {
231-
let stm = client.get().range(range).into_stream();
232-
Self {
233-
stm: Mutex::new(Some(stm)),
234-
client,
235-
}
236-
}
237-
238-
fn consume_async_impl(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream {
239-
use futures::TryStreamExt;
240-
use tokio_util::compat::FuturesAsyncReadCompatExt;
241-
let stm = self.consume_as_stream();
242-
let ar = stm.into_async_read();
243-
let arr = ar.compat();
244-
wasmtime_wasi::pipe::AsyncReadStream::new(arr)
245-
}
246-
247-
fn consume_as_stream(&mut self) -> impl futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>> {
248-
use futures::StreamExt;
249-
let opt_stm = self.stm.get_mut();
250-
let stm = opt_stm.take().unwrap();
251-
let byte_stm = stm.flat_map(|chunk| streamify_chunk(chunk.unwrap().data));
252-
byte_stm
253-
}
254-
}
255-
256-
fn streamify_chunk(chunk: azure_core::ResponseBody) -> impl futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>> {
257-
use futures::StreamExt;
258-
chunk.map(|c| Ok(c.unwrap().to_vec()))
259-
}
260-
261-
262-
struct AzureBlobBlobsList {
263-
// The Mutex is used to make it Send
264-
stm: Mutex<
265-
azure_core::Pageable<
266-
azure_storage_blobs::container::operations::ListBlobsResponse,
267-
azure_core::error::Error
268-
>
269-
>,
270-
read_but_not_yet_returned: Vec<String>,
271-
end_stm_after_read_but_not_yet_returned: bool,
272-
}
273-
274-
impl AzureBlobBlobsList {
275-
fn new(stm: azure_core::Pageable<
276-
azure_storage_blobs::container::operations::ListBlobsResponse,
277-
azure_core::error::Error
278-
>) -> Self {
279-
Self {
280-
stm: Mutex::new(stm),
281-
read_but_not_yet_returned: Default::default(),
282-
end_stm_after_read_but_not_yet_returned: false,
283-
}
284-
}
285-
286-
async fn read_impl(&mut self, len: u64) -> anyhow::Result<(Vec<String>,bool)> {
287-
use futures::StreamExt;
288-
289-
let len: usize = len.try_into().unwrap();
290-
291-
// If we have names outstanding, send that first. (We are allowed to send less than len,
292-
// and so sending all pending stuff before paging, rather than trying to manage a mix of
293-
// pending stuff with newly retrieved chunks, simplifies the code.)
294-
if !self.read_but_not_yet_returned.is_empty() {
295-
if self.read_but_not_yet_returned.len() <= len {
296-
// We are allowed to send all pending names
297-
let to_return = self.read_but_not_yet_returned.drain(..).collect();
298-
return Ok((to_return, self.end_stm_after_read_but_not_yet_returned));
299-
} else {
300-
// Send as much as we can. The rest remains in the pending buffer to send,
301-
// so this does not represent end of stream.
302-
let to_return = self.read_but_not_yet_returned.drain(0..len).collect();
303-
return Ok((to_return, false));
304-
}
305-
}
306-
307-
// Get one chunk and send as much as we can of it. Aagin, we don't need to try to
308-
// pack the full length here - we can send chunk by chunk.
309-
310-
let Some(chunk) = self.stm.get_mut().next().await else {
311-
return Ok((vec![], false));
312-
};
313-
let chunk = chunk.unwrap();
314-
315-
// TODO: do we need to prefix these with a prefix from somewhere or do they include it?
316-
let mut names: Vec<_> = chunk.blobs.blobs().map(|blob| blob.name.clone()).collect();
317-
let at_end = chunk.next_marker.is_none();
318-
319-
if names.len() <= len {
320-
// We can send them all!
321-
return Ok((names, at_end));
322-
} else {
323-
// We have more names than we can send in this response. Send what we can and
324-
// stash the rest.
325-
let to_return: Vec<_> = names.drain(0..len).collect();
326-
self.read_but_not_yet_returned = names;
327-
self.end_stm_after_read_but_not_yet_returned = at_end;
328-
return Ok((to_return, false));
329-
}
330-
}
331-
}
332-
333-
#[async_trait]
334-
impl spin_factor_blobstore::IncomingData for AzureBlobIncomingData {
335-
async fn consume_sync(&mut self) -> anyhow::Result<Vec<u8>> {
336-
use futures::StreamExt;
337-
let mut data = vec![];
338-
let Some(pageable) = self.stm.get_mut() else {
339-
anyhow::bail!("oh no");
340-
};
341-
342-
loop {
343-
let Some(chunk) = pageable.next().await else {
344-
break;
345-
};
346-
let chunk = chunk.unwrap();
347-
let by = chunk.data.collect().await.unwrap();
348-
data.extend(by.to_vec());
349-
}
350-
351-
Ok(data)
352-
}
353-
354-
fn consume_async(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream { // Box<dyn futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>>> {
355-
self.consume_async_impl()
356-
}
357-
358-
async fn size(&mut self) -> anyhow::Result<u64> {
359-
// TODO: in theory this should be infallible once we have the IncomingData
360-
// object. But in practice if we use the Pageable for that we don't get it until
361-
// we do the first read. So that would force us to either pre-fetch the
362-
// first chunk or to issue a properties request *just in case* size() was
363-
// called. So I'm making it fallible for now.
364-
Ok(self.client.get_properties().await?.blob.properties.content_length)
365-
}
366-
}
367-
368-
#[async_trait]
369-
impl spin_factor_blobstore::ObjectNames for AzureBlobBlobsList {
370-
async fn read(&mut self, len: u64) -> anyhow::Result<(Vec<String>, bool)> {
371-
self.read_impl(len).await // Separate function because rust-analyser gives better intellisense when async_trait isn't in the picture!
372-
}
373-
374-
async fn skip(&mut self, num: u64) -> anyhow::Result<(u64, bool)> {
375-
// TODO: there is a question (raised as an issue on the repo) about the required behaviour
376-
// here. For now I assume that skipping fewer than `num` is allowed as long as we are
377-
// honest about it. Because it is easier that is why.
378-
let (skipped, at_end) = self.read_impl(num).await?;
379-
Ok((skipped.len().try_into().unwrap(), at_end))
380-
}
381-
}

0 commit comments

Comments
 (0)