Skip to content

Commit d282a35

Browse files
committed
Use specific exception class for txn mgr errors
The use of generic RuntimeError exceptions requires brittle error-handling in dependant code.
1 parent fb7b9ea commit d282a35

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

lib/kafka/transaction_manager.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def add_partitions_to_transaction(topic_partitions)
9595
force_transactional!
9696

9797
if @transaction_state.uninitialized?
98-
raise 'Transaction is uninitialized'
98+
raise Kafka::InvalidTxnStateError, 'Transaction is uninitialized'
9999
end
100100

101101
# Extract newly created partitions
@@ -138,8 +138,8 @@ def add_partitions_to_transaction(topic_partitions)
138138

139139
def begin_transaction
140140
force_transactional!
141-
raise 'Transaction has already started' if @transaction_state.in_transaction?
142-
raise 'Transaction is not ready' unless @transaction_state.ready?
141+
raise Kafka::InvalidTxnStateError, 'Transaction has already started' if @transaction_state.in_transaction?
142+
raise Kafka::InvalidTxnStateError, 'Transaction is not ready' unless @transaction_state.ready?
143143
@transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION)
144144

145145
@logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})"
@@ -159,7 +159,7 @@ def commit_transaction
159159
end
160160

161161
unless @transaction_state.in_transaction?
162-
raise 'Transaction is not valid to commit'
162+
raise Kafka::InvalidTxnStateError, 'Transaction is not valid to commit'
163163
end
164164

165165
@transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION)
@@ -192,7 +192,7 @@ def abort_transaction
192192
end
193193

194194
unless @transaction_state.in_transaction?
195-
raise 'Transaction is not valid to abort'
195+
raise Kafka::InvalidTxnStateError, 'Transaction is not valid to abort'
196196
end
197197

198198
@transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION)
@@ -221,7 +221,7 @@ def send_offsets_to_txn(offsets:, group_id:)
221221
force_transactional!
222222

223223
unless @transaction_state.in_transaction?
224-
raise 'Transaction is not valid to send offsets'
224+
raise Kafka::InvalidTxnStateError, 'Transaction is not valid to send offsets'
225225
end
226226

227227
add_response = transaction_coordinator.add_offsets_to_txn(
@@ -264,11 +264,11 @@ def close
264264

265265
def force_transactional!
266266
unless transactional?
267-
raise 'Please turn on transactional mode to use transaction'
267+
raise Kafka::InvalidTxnStateError, 'Please turn on transactional mode to use transaction'
268268
end
269269

270270
if @transactional_id.nil? || @transactional_id.empty?
271-
raise 'Please provide a transaction_id to use transactional mode'
271+
raise Kafka::InvalidTxnStateError, 'Please provide a transaction_id to use transactional mode'
272272
end
273273
end
274274

spec/transaction_manager_spec.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@
210210
manager.add_partitions_to_transaction(
211211
'hello' => [1, 2, 3]
212212
)
213-
end.to raise_error(/please turn on transactional mode/i)
213+
end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i)
214214
end
215215
end
216216
end
@@ -256,7 +256,7 @@
256256
it 'raises exception' do
257257
expect do
258258
manager.init_transactions
259-
end.to raise_error(/please turn on transactional mode/i)
259+
end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i)
260260
end
261261

262262
it 'changes state to error' do
@@ -291,7 +291,7 @@
291291
it 'raises exception' do
292292
expect do
293293
manager.begin_transaction
294-
end.to raise_error(/transaction has already started/i)
294+
end.to raise_error(Kafka::InvalidTxnStateError, /transaction has already started/i)
295295
end
296296

297297
it 'changes state to error' do
@@ -306,7 +306,7 @@
306306
it 'raises exception' do
307307
expect do
308308
manager.begin_transaction
309-
end.to raise_error(/transaction is not ready/i)
309+
end.to raise_error(Kafka::InvalidTxnStateError, /transaction is not ready/i)
310310
end
311311

312312
it 'changes state to error' do
@@ -325,7 +325,7 @@
325325
it 'raises exception' do
326326
expect do
327327
manager.begin_transaction
328-
end.to raise_error(/please turn on transactional mode/i)
328+
end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i)
329329
end
330330

331331
it 'changes state to error' do
@@ -404,7 +404,7 @@
404404
it 'raises exception' do
405405
expect do
406406
manager.abort_transaction
407-
end.to raise_error(/transaction is not valid to abort/i)
407+
end.to raise_error(Kafka::InvalidTxnStateError, /transaction is not valid to abort/i)
408408
end
409409

410410
it 'changes state to error' do
@@ -423,7 +423,7 @@
423423
it 'raises exception' do
424424
expect do
425425
manager.abort_transaction
426-
end.to raise_error(/please turn on transactional mode/i)
426+
end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i)
427427
end
428428

429429
it 'changes state to error' do
@@ -502,7 +502,7 @@
502502
it 'raises exception' do
503503
expect do
504504
manager.commit_transaction
505-
end.to raise_error(/transaction is not valid to commit/i)
505+
end.to raise_error(Kafka::InvalidTxnStateError, /transaction is not valid to commit/i)
506506
end
507507

508508
it 'changes state to error' do
@@ -521,7 +521,7 @@
521521
it 'raises exception' do
522522
expect do
523523
manager.commit_transaction
524-
end.to raise_error(/please turn on transactional mode/i)
524+
end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i)
525525
end
526526

527527
it 'changes state to error' do

0 commit comments

Comments
 (0)