@@ -114,29 +114,39 @@ def cos_source(config):
114114 if 'secret_access_key' not in config :
115115 raise TypeError (
116116 'Cloud object storage source requires an secret access key.' )
117- if 'endpoint' not in config :
118- raise TypeError ('Cloud object storage source requires an endpoint.' )
119117 bucket_name = config ['bucket_name' ]
120118 access_key_id = config ['access_key_id' ]
121- secret_access_key = config ['secret_access_key' ]
122- endpoint = config ['endpoint' ]
119+ secret_access_key = 'RAW(' + config ['secret_access_key' ]+ ')' # add RAW() to deal with special characters in key
120+
121+ split_endpoint = None
123122
124- # Ensure this is a valid, supported endpoint:
125- split_endpoint = _parse_endpoint (endpoint )
123+ if 'endpoint' in config :
124+ endpoint = config ['endpoint' ]
125+
126+ # Ensure this is a valid, supported endpoint:
127+ split_endpoint = _parse_endpoint (endpoint )
126128
127129 # Resolve region:
128130 region = 'us-east'
129131 if 'region' in config :
130132 region = config ['region' ]
131133 else :
132- region = split_endpoint [1 ]
134+ if split_endpoint :
135+ region = split_endpoint [1 ]
133136
134137 # Assemble URI:
135- uri = f'aws2-s3://{ bucket_name } ?accessKey={ access_key_id } ' \
136- f'&secretKey={ secret_access_key } ' \
137- '&overrideEndpoint=true' \
138- f'&uriEndpointOverride={ endpoint } ' \
139- f'®ion={ region } '
138+ if 'endpoint' not in config :
139+
140+ uri = f'aws2-s3://{ bucket_name } ?accessKey={ access_key_id } ' \
141+ f'&secretKey={ secret_access_key } ' \
142+ f'®ion={ region } '
143+ else :
144+
145+ uri = f'aws2-s3://{ bucket_name } ?accessKey={ access_key_id } ' \
146+ f'&secretKey={ secret_access_key } ' \
147+ '&overrideEndpoint=true' \
148+ f'&uriEndpointOverride={ endpoint } ' \
149+ f'®ion={ region } '
140150
141151 # Move after read options:
142152 if 'move_after_read' in config :
@@ -448,26 +458,28 @@ def cos_sink(config):
448458 if 'secret_access_key' not in config :
449459 raise TypeError (
450460 'Cloud object storage sink requires an secret access key.' )
451- if 'endpoint' not in config :
452- raise TypeError ('Cloud object storage sink requires an endpoint.' )
453461 bucket_name = config ['bucket_name' ]
454462 access_key_id = config ['access_key_id' ]
455- secret_access_key = config ['secret_access_key' ]
456- endpoint = config ['endpoint' ]
463+ secret_access_key = 'RAW(' + config ['secret_access_key' ]+ ')' # add RAW() to deal with special characters in key
464+
465+ split_endpoint = None
466+ if 'endpoint' in config :
467+ endpoint = config ['endpoint' ]
468+ # Ensure this is a valid, supported endpoint:
469+ split_endpoint = _parse_endpoint (endpoint )
470+
457471
458472 file_name = "default.txt"
459473 if 'file_name' in config :
460474 file_name = config ['file_name' ]
461475
462- # Ensure this is a valid, supported endpoint:
463- split_endpoint = _parse_endpoint (endpoint )
464-
465476 # Resolve region:
466477 region = 'us-east'
467478 if 'region' in config :
468479 region = config ['region' ]
469480 else :
470- region = split_endpoint [1 ]
481+ if split_endpoint :
482+ region = split_endpoint [1 ]
471483
472484 # If we are uploading a file either directly or by monitoring a directory,
473485 # multi-part must be enabled:
@@ -476,11 +488,16 @@ def cos_sink(config):
476488 config ['upload_type' ] = "multi-part"
477489
478490 # Assemble URI:
479- uri = f'aws2-s3://{ bucket_name } ?accessKey={ access_key_id } ' \
480- f'&secretKey={ secret_access_key } ' \
481- '&overrideEndpoint=true' \
482- f'&uriEndpointOverride={ endpoint } ' \
483- f'®ion={ region } '
491+ if 'endpoint' not in config :
492+ uri = f'aws2-s3://{ bucket_name } ?accessKey={ access_key_id } ' \
493+ f'&secretKey={ secret_access_key } ' \
494+ f'®ion={ region } '
495+ else :
496+ uri = f'aws2-s3://{ bucket_name } ?accessKey={ access_key_id } ' \
497+ f'&secretKey={ secret_access_key } ' \
498+ '&overrideEndpoint=true' \
499+ f'&uriEndpointOverride={ endpoint } ' \
500+ f'®ion={ region } '
484501
485502 # Final result is a list of spec and uri pairs: [(spec, uri)]
486503 spec_list = []
@@ -644,7 +661,6 @@ def construct_sink(config, endpoint):
644661 else :
645662 spec ['uri' ] = from_uri
646663 final_spec_list .append ({'from' : spec })
647- # print(yaml.dump(final_spec_list))
648664 return final_spec_list
649665
650666
0 commit comments