@@ -173,13 +173,13 @@ def do_POST(self):
173173 return
174174 body = self .rfile .read (content_length )
175175 if self .server .cloudevents :
176- event = from_http (self .headers , body )
176+ event = from_http (self .headers , body )
177177 record = json .loads (body )['Records' ][0 ]
178178 assert_equal (event ['specversion' ], '1.0' )
179179 assert_equal (event ['id' ], record ['responseElements' ]['x-amz-request-id' ] + '.' + record ['responseElements' ]['x-amz-id-2' ])
180180 assert_equal (event ['source' ], 'ceph:s3.' + record ['awsRegion' ] + '.' + record ['s3' ]['bucket' ]['name' ])
181181 assert_equal (event ['type' ], 'com.amazonaws.' + record ['eventName' ])
182- assert_equal (event ['datacontenttype' ], 'application/json' )
182+ assert_equal (event ['datacontenttype' ], 'application/json' )
183183 assert_equal (event ['subject' ], record ['s3' ]['object' ]['key' ])
184184 assert_equal (parser .parse (event ['time' ]), parser .parse (record ['eventTime' ]))
185185 log .info ('HTTP Server received event: %s' , str (body ))
@@ -238,7 +238,7 @@ def append(self, event):
238238 self .acquire_lock ()
239239 self .events .append (event )
240240 self .lock .release ()
241-
241+
242242 def verify_s3_events (self , keys , exact_match = False , deletions = False , expected_sizes = {}, etags = []):
243243 """verify stored s3 records agains a list of keys"""
244244 self .acquire_lock ()
@@ -419,8 +419,8 @@ def __init__(self, topic, security_type):
419419 port = 9093
420420 while remaining_retries > 0 :
421421 try :
422- self .consumer = KafkaConsumer (topic ,
423- bootstrap_servers = kafka_server + ':' + str (port ),
422+ self .consumer = KafkaConsumer (topic ,
423+ bootstrap_servers = kafka_server + ':' + str (port ),
424424 security_protocol = security_type ,
425425 consumer_timeout_ms = 16000 ,
426426 auto_offset_reset = 'earliest' )
@@ -505,7 +505,7 @@ def connection():
505505
506506 conn = S3Connection (aws_access_key_id = vstart_access_key ,
507507 aws_secret_access_key = vstart_secret_key ,
508- is_secure = False , port = port_no , host = hostname ,
508+ is_secure = False , port = port_no , host = hostname ,
509509 calling_format = 'boto.s3.connection.OrdinaryCallingFormat' )
510510
511511 return conn
@@ -519,7 +519,7 @@ def connection2():
519519
520520 conn = S3Connection (aws_access_key_id = vstart_access_key ,
521521 aws_secret_access_key = vstart_secret_key ,
522- is_secure = False , port = port_no , host = hostname ,
522+ is_secure = False , port = port_no , host = hostname ,
523523 calling_format = 'boto.s3.connection.OrdinaryCallingFormat' )
524524
525525 return conn
@@ -543,7 +543,7 @@ def another_user(user=None, tenant=None, account=None):
543543
544544 conn = S3Connection (aws_access_key_id = access_key ,
545545 aws_secret_access_key = secret_key ,
546- is_secure = False , port = get_config_port (), host = get_config_host (),
546+ is_secure = False , port = get_config_port (), host = get_config_host (),
547547 calling_format = 'boto.s3.connection.OrdinaryCallingFormat' )
548548 return conn , arn
549549
@@ -663,13 +663,13 @@ def connect_random_user(tenant=''):
663663@attr ('basic_test' )
664664def test_ps_s3_topic_on_master ():
665665 """ test s3 topics set/get/delete on master """
666-
666+
667667 tenant = 'kaboom'
668668 conn = connect_random_user (tenant )
669-
669+
670670 # make sure there are no leftover topics
671671 delete_all_topics (conn , tenant , get_config_cluster ())
672-
672+
673673 zonegroup = get_config_zonegroup ()
674674 bucket_name = gen_bucket_name ()
675675 topic_name = bucket_name + TOPIC_SUFFIX
@@ -729,13 +729,13 @@ def test_ps_s3_topic_on_master():
729729@attr ('basic_test' )
730730def test_ps_s3_topic_admin_on_master ():
731731 """ test s3 topics set/get/delete on master """
732-
732+
733733 tenant = 'kaboom'
734734 conn = connect_random_user (tenant )
735-
735+
736736 # make sure there are no leftover topics
737737 delete_all_topics (conn , tenant , get_config_cluster ())
738-
738+
739739 zonegroup = get_config_zonegroup ()
740740 bucket_name = gen_bucket_name ()
741741 topic_name = bucket_name + TOPIC_SUFFIX
@@ -1038,7 +1038,7 @@ def test_ps_s3_notification_filter_on_master():
10381038 """ test s3 notification filter on master """
10391039
10401040 hostname = get_ip ()
1041-
1041+
10421042 conn = connection ()
10431043 ps_zone = conn
10441044
@@ -1057,7 +1057,7 @@ def test_ps_s3_notification_filter_on_master():
10571057 # create s3 topic
10581058 endpoint_address = 'amqp://' + hostname
10591059 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
1060-
1060+
10611061 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
10621062 topic_arn = topic_conf .set_config ()
10631063
@@ -1923,7 +1923,7 @@ def create_thread(bucket, obj_prefix, i, content):
19231923
19241924def ps_s3_creation_triggers_on_master (external_endpoint_address = None , ca_location = None , verify_ssl = 'true' ):
19251925 """ test object creation s3 notifications in using put/copy/post on master"""
1926-
1926+
19271927 if not external_endpoint_address :
19281928 hostname = get_ip ()
19291929 proc = init_rabbitmq ()
@@ -2327,7 +2327,7 @@ def metadata_filter(endpoint_type, conn):
23272327 # create bucket
23282328 bucket_name = gen_bucket_name ()
23292329 bucket = conn .create_bucket (bucket_name )
2330- topic_name = bucket_name + TOPIC_SUFFIX
2330+ topic_name = bucket_name + TOPIC_SUFFIX
23312331
23322332 # start endpoint receiver
23332333 host = get_ip ()
@@ -2389,7 +2389,7 @@ def metadata_filter(endpoint_type, conn):
23892389 key_name = 'copy_of_foo'
23902390 bucket .copy_key (key_name , bucket .name , key .name )
23912391 expected_keys .append (key_name )
2392-
2392+
23932393 # create another objects in the bucket using COPY
23942394 # but override the metadata value
23952395 key_name = 'another_copy_of_foo'
@@ -2511,7 +2511,7 @@ def test_ps_s3_metadata_on_master():
25112511 # create objects in the bucket using COPY
25122512 key_name = 'copy_of_foo'
25132513 bucket .copy_key (key_name , bucket .name , key .name )
2514-
2514+
25152515 # create objects in the bucket using multi-part upload
25162516 fp = tempfile .NamedTemporaryFile (mode = 'w+b' )
25172517 chunk_size = 1024 * 1024 * 5 # 5MB
@@ -2725,7 +2725,7 @@ def test_ps_s3_versioning_on_master():
27252725 if version not in versions :
27262726 print ('version mismatch: ' + version + ' not in: ' + str (versions ))
27272727 # TODO: copy_key() does not return the version of the copied object
2728- #assert False
2728+ #assert False
27292729 else :
27302730 print ('version ok: ' + version + ' in: ' + str (versions ))
27312731
@@ -2814,7 +2814,7 @@ def test_ps_s3_versioned_deletion_on_master():
28142814 size = event ['s3' ]['object' ]['size' ]
28152815 if version not in versions :
28162816 print ('version mismatch: ' + version + ' not in: ' + str (versions ))
2817- assert False
2817+ assert False
28182818 else :
28192819 print ('version ok: ' + version + ' in: ' + str (versions ))
28202820 if event ['eventName' ] == 'ObjectRemoved:Delete' :
@@ -3406,7 +3406,7 @@ def test_ps_s3_notification_kafka_idle_behaviour():
34063406 # name is constant for manual testing
34073407 topic_name = bucket_name + '_topic'
34083408 # create consumer on the topic
3409-
3409+
34103410 task , receiver = create_kafka_receiver_thread (topic_name + '_1' )
34113411 task .start ()
34123412
@@ -4077,7 +4077,7 @@ def test_ps_s3_topic_update():
40774077 amqp_task .start ()
40784078 #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
40794079 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = 'amqp-exchange=' + exchange + '&amqp-ack-level=none' )
4080-
4080+
40814081 topic_arn = topic_conf .set_config ()
40824082 #result, status = topic_conf.set_config()
40834083 #assert_equal(status/100, 2)
@@ -4341,7 +4341,7 @@ def test_ps_s3_multiple_topics_notification():
43414341 keys = list (bucket .list ())
43424342 # TODO: use exact match
43434343 verify_s3_records_by_elements (records , keys , exact_match = False )
4344- receiver .verify_s3_events (keys , exact_match = False )
4344+ receiver .verify_s3_events (keys , exact_match = False )
43454345 result , _ = sub_conf2 .get_events ()
43464346 parsed_result = json .loads (result )
43474347 for record in parsed_result ['Records' ]:
@@ -4479,7 +4479,7 @@ def test_ps_s3_topic_no_permissions():
44794479 zonegroup = 'default'
44804480 bucket_name = gen_bucket_name ()
44814481 topic_name = bucket_name + TOPIC_SUFFIX
4482-
4482+
44834483 # create s3 topic without policy
44844484 endpoint_address = 'amqp://127.0.0.1:7001'
44854485 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=amqp.direct&amqp-ack-level=none'
@@ -4525,7 +4525,7 @@ def test_ps_s3_topic_no_permissions():
45254525 s3_notification_conf2 = PSNotificationS3 (conn2 , bucket_name , topic_conf_list )
45264526 _ , status = s3_notification_conf2 .set_config ()
45274527 assert_equal (status , 200 )
4528-
4528+
45294529 try :
45304530 # 2nd user tries to delete the topic
45314531 status = topic_conf2 .del_config (topic_arn = topic_arn )
@@ -4579,11 +4579,11 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F
45794579 endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true&ca-location=' + KAFKA_DIR + '/y-ca.crt'
45804580
45814581 topic_conf = PSTopicS3 (conn , topic_name , zonegroup , endpoint_args = endpoint_args )
4582-
4582+
45834583 # create consumer on the topic
45844584 task , receiver = create_kafka_receiver_thread (topic_name )
45854585 task .start ()
4586-
4586+
45874587 topic_arn = topic_conf .set_config ()
45884588 # create s3 notification
45894589 notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -5133,7 +5133,7 @@ def test_ps_s3_data_path_v2_mixed_migration():
51335133 tenants_list .append ('' )
51345134 # make sure there are no leftover topics
51355135 delete_all_topics (conn , '' , get_config_cluster ())
5136-
5136+
51375137 # make sure that we start at v2
51385138 zonegroup_modify_feature (enable = True , feature_name = zonegroup_feature_notification_v2 )
51395139
0 commit comments