@@ -95,7 +95,7 @@ def add_partitions_to_transaction(topic_partitions)
9595 force_transactional!
9696
9797 if @transaction_state . uninitialized?
98- raise 'Transaction is uninitialized'
98+ raise Kafka :: InvalidTxnStateError , 'Transaction is uninitialized'
9999 end
100100
101101 # Extract newly created partitions
@@ -138,8 +138,8 @@ def add_partitions_to_transaction(topic_partitions)
138138
139139 def begin_transaction
140140 force_transactional!
141- raise 'Transaction has already started' if @transaction_state . in_transaction?
142- raise 'Transaction is not ready' unless @transaction_state . ready?
141+ raise Kafka :: InvalidTxnStateError , 'Transaction has already started' if @transaction_state . in_transaction?
142+ raise Kafka :: InvalidTxnStateError , 'Transaction is not ready' unless @transaction_state . ready?
143143 @transaction_state . transition_to! ( TransactionStateMachine ::IN_TRANSACTION )
144144
145145 @logger . info "Begin transaction #{ @transactional_id } , Producer ID: #{ @producer_id } (Epoch #{ @producer_epoch } )"
@@ -159,7 +159,7 @@ def commit_transaction
159159 end
160160
161161 unless @transaction_state . in_transaction?
162- raise 'Transaction is not valid to commit'
162+ raise Kafka :: InvalidTxnStateError , 'Transaction is not valid to commit'
163163 end
164164
165165 @transaction_state . transition_to! ( TransactionStateMachine ::COMMITTING_TRANSACTION )
@@ -192,7 +192,8 @@ def abort_transaction
192192 end
193193
194194 unless @transaction_state . in_transaction?
195- raise 'Transaction is not valid to abort'
195+ @logger . warn ( 'Aborting transaction that was never opened on brokers' )
196+ return
196197 end
197198
198199 @transaction_state . transition_to! ( TransactionStateMachine ::ABORTING_TRANSACTION )
@@ -221,7 +222,7 @@ def send_offsets_to_txn(offsets:, group_id:)
221222 force_transactional!
222223
223224 unless @transaction_state . in_transaction?
224- raise 'Transaction is not valid to send offsets'
225+ raise Kafka :: InvalidTxnStateError , 'Transaction is not valid to send offsets'
225226 end
226227
227228 add_response = transaction_coordinator . add_offsets_to_txn (
@@ -250,6 +251,10 @@ def error?
250251 @transaction_state . error?
251252 end
252253
254+ def ready?
255+ @transaction_state . ready?
256+ end
257+
253258 def close
254259 if in_transaction?
255260 @logger . warn ( "Aborting pending transaction ..." )
@@ -264,11 +269,11 @@ def close
264269
265270 def force_transactional!
266271 unless transactional?
267- raise 'Please turn on transactional mode to use transaction'
272+ raise Kafka :: InvalidTxnStateError , 'Please turn on transactional mode to use transaction'
268273 end
269274
270275 if @transactional_id . nil? || @transactional_id . empty?
271- raise 'Please provide a transaction_id to use transactional mode'
276+ raise Kafka :: InvalidTxnStateError , 'Please provide a transaction_id to use transactional mode'
272277 end
273278 end
274279
0 commit comments