@@ -11,6 +11,7 @@ use spin_core::async_trait;
11
11
use spin_factor_blobstore:: { Error , Container , ContainerManager } ;
12
12
13
13
pub struct BlobStoreS3 {
14
+ builder : object_store:: aws:: AmazonS3Builder ,
14
15
client : async_once_cell:: Lazy <
15
16
aws_sdk_s3:: Client ,
16
17
std:: pin:: Pin < Box < dyn std:: future:: Future < Output = aws_sdk_s3:: Client > + Send > > ,
@@ -69,6 +70,16 @@ impl BlobStoreS3 {
69
70
region : String ,
70
71
auth_options : BlobStoreS3AuthOptions ,
71
72
) -> Result < Self > {
73
+ let builder = match & auth_options {
74
+ BlobStoreS3AuthOptions :: RuntimeConfigValues ( config) =>
75
+ object_store:: aws:: AmazonS3Builder :: new ( )
76
+ . with_region ( & region)
77
+ . with_access_key_id ( & config. access_key )
78
+ . with_secret_access_key ( & config. secret_key )
79
+ . with_token ( config. token . clone ( ) . unwrap_or_default ( ) ) ,
80
+ BlobStoreS3AuthOptions :: Environmental => object_store:: aws:: AmazonS3Builder :: from_env ( ) ,
81
+ } ;
82
+
72
83
let region_clone = region. clone ( ) ;
73
84
let client_fut = Box :: pin ( async move {
74
85
let sdk_config = match auth_options {
@@ -84,15 +95,18 @@ impl BlobStoreS3 {
84
95
aws_sdk_s3:: Client :: new ( & sdk_config)
85
96
} ) ;
86
97
87
- Ok ( Self { client : async_once_cell:: Lazy :: from_future ( client_fut) } )
98
+ Ok ( Self { builder , client : async_once_cell:: Lazy :: from_future ( client_fut) } )
88
99
}
89
100
}
90
101
91
102
#[ async_trait]
92
103
impl ContainerManager for BlobStoreS3 {
93
104
async fn get ( & self , name : & str ) -> Result < Arc < dyn Container > , Error > {
105
+ let store = self . builder . clone ( ) . with_bucket_name ( name) . build ( ) . map_err ( |e| e. to_string ( ) ) ?;
106
+
94
107
Ok ( Arc :: new ( S3Container {
95
108
name : name. to_owned ( ) ,
109
+ store,
96
110
client : self . client . get_unpin ( ) . await . clone ( ) ,
97
111
} ) )
98
112
}
@@ -108,6 +122,7 @@ impl ContainerManager for BlobStoreS3 {
108
122
109
123
struct S3Container {
110
124
name : String ,
125
+ store : object_store:: aws:: AmazonS3 ,
111
126
client : aws_sdk_s3:: Client ,
112
127
}
113
128
@@ -183,14 +198,25 @@ impl Container for S3Container {
183
198
Ok ( Box :: new ( S3IncomingData :: new ( resp) ) )
184
199
}
185
200
186
- async fn connect_stm ( & self , name : & str , stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < ( ) > ) -> anyhow:: Result < ( ) > {
187
- let client = self . client . clone ( ) ;
188
- let bucket = self . name . clone ( ) ;
189
- let name = name. to_owned ( ) ;
190
- let byte_stm = to_byte_stream ( stm) ;
201
+ async fn connect_stm ( & self , name : & str , mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < ( ) > ) -> anyhow:: Result < ( ) > {
202
+ let store = self . store . clone ( ) ;
203
+ let path = object_store:: path:: Path :: from ( name) ;
191
204
192
205
tokio:: spawn ( async move {
193
- client. put_object ( ) . bucket ( & bucket) . key ( name) . body ( byte_stm) . send ( ) . await . unwrap ( ) ;
206
+ use object_store:: ObjectStore ;
207
+ let mupload = store. put_multipart ( & path) . await . unwrap ( ) ;
208
+ let mut writer = object_store:: WriteMultipart :: new ( mupload) ;
209
+ loop {
210
+ use tokio:: io:: AsyncReadExt ;
211
+ let mut buf = vec ! [ 0 ; 5 * 1024 * 1024 ] ;
212
+ let read_amount = stm. read ( & mut buf) . await . unwrap ( ) ;
213
+ if read_amount == 0 {
214
+ break ;
215
+ }
216
+ buf. truncate ( read_amount) ;
217
+ writer. put ( buf. into ( ) ) ;
218
+ }
219
+ writer. finish ( ) . await . unwrap ( ) ;
194
220
finished_tx. send ( ( ) ) . await . expect ( "should sent finish tx" ) ;
195
221
} ) ;
196
222
@@ -203,14 +229,6 @@ impl Container for S3Container {
203
229
}
204
230
}
205
231
206
- fn to_byte_stream ( read : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > ) -> aws_sdk_s3:: primitives:: ByteStream {
207
- use futures:: StreamExt ;
208
-
209
- let stm = tokio_util:: io:: ReaderStream :: new ( read) . map ( |item| item. map ( |by| http_body:: Frame :: data ( by) ) ) ;
210
- let stm_body = http_body_util:: StreamBody :: new ( stm) ;
211
- aws_sdk_s3:: primitives:: ByteStream :: from_body_1_x ( stm_body)
212
- }
213
-
214
232
struct S3IncomingData {
215
233
get_obj_resp : Option < get_object:: GetObjectOutput > ,
216
234
}
0 commit comments