1919#
2020
2121import optparse
22- from proton import Message , Url
22+ from proton import Disposition , Message , Url
2323from proton .reactor import Container
2424from proton .handlers import MessagingHandler , TransactionHandler
2525
@@ -28,9 +28,10 @@ class TxSend(MessagingHandler, TransactionHandler):
2828 def __init__ (self , url , messages , batch_size ):
2929 super (TxSend , self ).__init__ ()
3030 self .url = Url (url )
31+ self .msg_id = 0
32+ self .accepted = 0
3133 self .current_batch = 0
3234 self .committed = 0
33- self .confirmed = 0
3435 self .total = messages
3536 self .batch_size = batch_size
3637
@@ -49,28 +50,50 @@ def on_sendable(self, event):
4950 self .send ()
5051
5152 def send (self ):
52- while self .transaction and self .sender .credit and ( self .committed + self . current_batch ) < self .total :
53- seq = self .committed + self . current_batch + 1
54- msg = Message (id = seq , body = {'sequence' : seq })
53+ while self .transaction and self .sender .credit and self .current_batch < self .batch_size :
54+ self .msg_id += 1
55+ msg = Message (id = self . msg_id , body = {'sequence' : self . msg_id })
5556 self .transaction .send (self .sender , msg )
5657 self .current_batch += 1
57- if self .current_batch == self .batch_size :
58- self .transaction .commit ()
59- self .transaction = None
6058
61- def on_accepted (self , event ):
62- if event .sender == self .sender :
63- self .confirmed += 1
59+ def on_delivery_updated (self , event ):
60+ """Transactional deliveries are updated via transactional state updates"""
61+ delivery = event .delivery
62+ disposition = delivery .remote
63+ # Is this a transactioned delivery update?
64+ if disposition .type == Disposition .TRANSACTIONAL_STATE :
65+ tid = disposition .id
66+ outcome = disposition .outcome_type
67+ if outcome == Disposition .ACCEPTED :
68+ self .accepted += 1
69+ if self .accepted == self .batch_size :
70+ self .transaction .commit ()
71+ self .transaction = None
72+ else :
73+ print (f"delivery { delivery .tag } not accepted - rollback transaction { tid } " )
74+ if self .transaction and self .transaction .id == tid :
75+ self .transaction .abort ()
76+ self .transaction = None
77+ elif event .sender == self .sender :
78+ print (f"delivery { delivery .tag } - Unexpected non-transactional update - aborting transaction" )
79+ if self .transaction :
80+ self .transaction .abort ()
81+ self .transaction = None
6482
6583 def on_transaction_committed (self , event ):
6684 self .committed += self .current_batch
6785 if self .committed == self .total :
68- print ("all messages committed" )
86+ print (f" { self . committed } messages committed" )
6987 event .connection .close ()
7088 else :
89+ self .accepted = 0
7190 self .current_batch = 0
7291 self .container .declare_transaction (self .conn , handler = self )
7392
93+ def on_transaction_aborted (self , event ):
94+ print (f"{ self .committed } committed, transaction aborted - closing" )
95+ event .connection .close ()
96+
7497 def on_disconnected (self , event ):
7598 self .current_batch = 0
7699
0 commit comments