@@ -495,23 +495,20 @@ def test_encryption_failure(self):
495495 )
496496 producer .send (b"msg-0" )
497497
498- enc_key = None
499498 def verify_encryption_context (context : pulsar .EncryptionContext , failed : bool , batch_size : int ):
500- nonlocal enc_key
501499 keys = context .keys ()
502500 self .assertEqual (len (keys ), 1 )
503501 key = keys [0 ]
504502 self .assertEqual (key .key , "client-rsa.pem" )
505503 self .assertTrue (len (key .value ) > 0 )
506- if enc_key is None :
507- enc_key = key .value
508- else :
509- self .assertEqual (key .value , enc_key )
510504 self .assertEqual (key .metadata , {})
511505 self .assertTrue (len (context .param ()) > 0 )
512506 self .assertEqual (context .algorithm (), "" )
513507 self .assertEqual (context .compression_type (), CompressionType .LZ4 )
514- self .assertEqual (context .uncompressed_message_size (), len (b"msg-0" ))
508+ if batch_size == 1 :
509+ self .assertEqual (context .uncompressed_message_size (), len (b"msg-0" ))
510+ else :
511+ self .assertGreater (context .uncompressed_message_size (), len (b"msg-0" ))
515512 self .assertEqual (context .batch_size (), batch_size )
516513 self .assertEqual (context .is_decryption_failed (), failed )
517514
@@ -543,6 +540,16 @@ def verify_next_message(value: bytes):
543540
544541 producer .send (b"msg-2" )
545542 verify_next_message (b"msg-2" ) # msg-1 is skipped since the crypto failure action is DISCARD
543+ producer .close ()
544+
545+ # send batched messages
546+ producer = client .create_producer (
547+ topic = topic ,
548+ encryption_key = "client-rsa.pem" ,
549+ crypto_key_reader = crypto_key_reader ,
550+ compression_type = CompressionType .LZ4 ,
551+ batching_enabled = True ,
552+ )
546553 producer .send_async (b"msg-3" , None )
547554 producer .send_async (b"msg-4" , None )
548555 producer .flush ()
@@ -553,16 +560,18 @@ def verify_undecrypted_message(msg: pulsar.Message, i: int):
553560 verify_encryption_context (msg .encryption_context (), True , 2 if i >= 3 else - 1 )
554561
555562 # Encrypted messages will be consumed since the crypto failure action is CONSUME
563+ # Only 4 messages can be received because msg-3 and msg-4 are sent in batch and they are delivered
564+ # as a single message when decryption fails.
556565 consumer = client .subscribe (topic , 'another-sub' ,
557566 initial_position = InitialPosition .Earliest ,
558567 crypto_failure_action = pulsar .ConsumerCryptoFailureAction .CONSUME )
559- for i in range (3 ):
568+ for i in range (4 ):
560569 msg = consumer .receive (3000 )
561570 verify_undecrypted_message (msg , i )
562571
563572 reader = client .create_reader (topic , MessageId .earliest ,
564573 crypto_failure_action = pulsar .ConsumerCryptoFailureAction .CONSUME )
565- for i in range (3 ):
574+ for i in range (4 ):
566575 msg = reader .read_next (3000 )
567576 verify_undecrypted_message (msg , i )
568577
0 commit comments