@@ -37,6 +37,7 @@ pub struct Spec {
3737 max_file_size : Option < i64 > ,
3838 sqs_queue_url : Option < String > ,
3939 redis : Option < RedisConfig > ,
40+ force_path_style : Option < bool > ,
4041}
4142
4243struct SqsContext {
@@ -470,7 +471,13 @@ impl SourceFactoryBase for Factory {
470471 spec : Spec ,
471472 _context : Arc < FlowInstanceContext > ,
472473 ) -> Result < Box < dyn SourceExecutor > > {
473- let config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
474+ let base_config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
475+
476+ let mut s3_config_builder = aws_sdk_s3:: config:: Builder :: from ( & base_config) ;
477+ if let Some ( force_path_style) = spec. force_path_style {
478+ s3_config_builder = s3_config_builder. force_path_style ( force_path_style) ;
479+ }
480+ let s3_config = s3_config_builder. build ( ) ;
474481
475482 let redis_context = if let Some ( redis_config) = & spec. redis {
476483 Some ( Arc :: new (
@@ -480,19 +487,21 @@ impl SourceFactoryBase for Factory {
480487 None
481488 } ;
482489
490+ let sqs_context = spec. sqs_queue_url . map ( |url| {
491+ Arc :: new ( SqsContext {
492+ client : aws_sdk_sqs:: Client :: new ( & base_config) ,
493+ queue_url : url,
494+ } )
495+ } ) ;
496+
483497 Ok ( Box :: new ( Executor {
484- client : Client :: new ( & config ) ,
498+ client : Client :: from_conf ( s3_config ) ,
485499 bucket_name : spec. bucket_name ,
486500 prefix : spec. prefix ,
487501 binary : spec. binary ,
488502 pattern_matcher : PatternMatcher :: new ( spec. included_patterns , spec. excluded_patterns ) ?,
489503 max_file_size : spec. max_file_size ,
490- sqs_context : spec. sqs_queue_url . map ( |url| {
491- Arc :: new ( SqsContext {
492- client : aws_sdk_sqs:: Client :: new ( & config) ,
493- queue_url : url,
494- } )
495- } ) ,
504+ sqs_context,
496505 redis_context,
497506 } ) )
498507 }
0 commit comments