@@ -24,6 +24,7 @@ use aws_sdk_s3::Error as AwsSdkError;
24
24
use aws_sdk_s3:: RetryConfig ;
25
25
use aws_sdk_s3:: { Client , Credentials , Endpoint , Region } ;
26
26
use aws_smithy_async:: rt:: sleep:: default_async_sleep;
27
+ use base64:: encode;
27
28
use bytes:: Bytes ;
28
29
use clap:: builder:: ArgPredicate ;
29
30
@@ -35,6 +36,7 @@ use datafusion::datasource::object_store::ObjectStoreRegistry;
35
36
use datafusion:: execution:: runtime_env:: { RuntimeConfig , RuntimeEnv } ;
36
37
use futures:: StreamExt ;
37
38
use http:: Uri ;
39
+ use md5:: { Digest , Md5 } ;
38
40
use object_store:: aws:: AmazonS3Builder ;
39
41
use object_store:: limit:: LimitStore ;
40
42
use relative_path:: RelativePath ;
@@ -105,6 +107,15 @@ pub struct S3Config {
105
107
default_value_if( "demo" , ArgPredicate :: IsPresent , DEFAULT_S3_BUCKET )
106
108
) ]
107
109
pub s3_bucket_name : String ,
110
+
111
+ /// Set client to send content_md5 header on every put request
112
+ #[ arg(
113
+ long,
114
+ env = "P_S3_SET_CONTENT_MD5" ,
115
+ value_name = "bool" ,
116
+ default_value = "false"
117
+ ) ]
118
+ pub content_md5 : bool ,
108
119
}
109
120
110
121
impl ObjectStorageProvider for S3Config {
@@ -153,6 +164,7 @@ impl ObjectStorageProvider for S3Config {
153
164
Arc :: new ( S3 {
154
165
client,
155
166
bucket : self . s3_bucket_name . clone ( ) ,
167
+ set_content_md5 : self . content_md5 ,
156
168
} )
157
169
}
158
170
@@ -164,6 +176,7 @@ impl ObjectStorageProvider for S3Config {
164
176
pub struct S3 {
165
177
client : aws_sdk_s3:: Client ,
166
178
bucket : String ,
179
+ set_content_md5 : bool ,
167
180
}
168
181
169
182
impl S3 {
@@ -233,16 +246,24 @@ impl S3 {
233
246
Ok ( logstreams)
234
247
}
235
248
236
- async fn _upload_file ( & self , key : & str , path : & Path ) -> Result < ( ) , AwsSdkError > {
237
- let body = ByteStream :: from_path ( path) . await . unwrap ( ) ;
249
+ async fn _upload_file (
250
+ & self ,
251
+ key : & str ,
252
+ path : & Path ,
253
+ md5 : Option < String > ,
254
+ ) -> Result < ( ) , AwsSdkError > {
255
+ let body = ByteStream :: from_path ( & path) . await . unwrap ( ) ;
256
+
238
257
let resp = self
239
258
. client
240
259
. put_object ( )
241
260
. bucket ( & self . bucket )
242
261
. key ( key)
243
262
. body ( body)
263
+ . set_content_md5 ( md5)
244
264
. send ( )
245
265
. await ?;
266
+
246
267
log:: trace!( "{:?}" , resp) ;
247
268
248
269
Ok ( ( ) )
@@ -260,12 +281,18 @@ impl ObjectStorage for S3 {
260
281
path : & RelativePath ,
261
282
resource : Bytes ,
262
283
) -> Result < ( ) , ObjectStorageError > {
263
- let _resp = self
264
- . client
284
+ let hash = self . set_content_md5 . then ( || {
285
+ let mut hash = Md5 :: new ( ) ;
286
+ hash. update ( & resource) ;
287
+ encode ( hash. finalize ( ) )
288
+ } ) ;
289
+
290
+ self . client
265
291
. put_object ( )
266
292
. bucket ( & self . bucket )
267
293
. key ( path. as_str ( ) )
268
294
. body ( resource. into ( ) )
295
+ . set_content_md5 ( hash)
269
296
. send ( )
270
297
. await
271
298
. map_err ( |err| ObjectStorageError :: ConnectionError ( Box :: new ( err) ) ) ?;
@@ -296,7 +323,16 @@ impl ObjectStorage for S3 {
296
323
}
297
324
298
325
async fn upload_file ( & self , key : & str , path : & Path ) -> Result < ( ) , ObjectStorageError > {
299
- self . _upload_file ( key, path) . await ?;
326
+ let hash = if self . set_content_md5 {
327
+ let mut file = std:: fs:: File :: open ( path) ?;
328
+ let mut digest = Md5 :: new ( ) ;
329
+ std:: io:: copy ( & mut file, & mut digest) ?;
330
+ Some ( encode ( digest. finalize ( ) ) )
331
+ } else {
332
+ None
333
+ } ;
334
+
335
+ self . _upload_file ( key, path, hash) . await ?;
300
336
301
337
Ok ( ( ) )
302
338
}
0 commit comments