Skip to content

Commit 392c450

Browse files
authored
Add Transactionalmessage interface for request routing (#744)
* Add TransactionalMessage interface. Add TransactionMessage interface so the Transport automatically routes messages to Transaction Coordinators if required.
1 parent bedcf4a commit 392c450

File tree

7 files changed

+38
-0
lines changed

7 files changed

+38
-0
lines changed

protocol/addoffsetstotxn/addoffsetstotxn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ type Request struct {
1919

2020
func (r *Request) ApiKey() protocol.ApiKey { return protocol.AddOffsetsToTxn }
2121

22+
func (r *Request) Transaction() string { return r.TransactionalID }
23+
24+
var _ protocol.TransactionalMessage = (*Request)(nil)
25+
2226
type Response struct {
2327
// We need at least one tagged field to indicate that this is a "flexible" message
2428
// type.

protocol/addpartitionstotxn/addpartitionstotxn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ type RequestTopic struct {
2828

2929
func (r *Request) ApiKey() protocol.ApiKey { return protocol.AddPartitionsToTxn }
3030

31+
func (r *Request) Transaction() string { return r.TransactionalID }
32+
33+
var _ protocol.TransactionalMessage = (*Request)(nil)
34+
3135
type Response struct {
3236
// We need at least one tagged field to indicate that this is a "flexible" message
3337
// type.

protocol/endtxn/endtxn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ type Request struct {
1919

2020
func (r *Request) ApiKey() protocol.ApiKey { return protocol.EndTxn }
2121

22+
func (r *Request) Transaction() string { return r.TransactionalID }
23+
24+
var _ protocol.TransactionalMessage = (*Request)(nil)
25+
2226
type Response struct {
2327
// We need at least one tagged field to indicate that this is a "flexible" message
2428
// type.

protocol/initproducerid/initproducerid.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ type Request struct {
1919

2020
func (r *Request) ApiKey() protocol.ApiKey { return protocol.InitProducerId }
2121

22+
func (r *Request) Transaction() string { return r.TransactionalID }
23+
24+
var _ protocol.TransactionalMessage = (*Request)(nil)
25+
2226
type Response struct {
2327
// We need at least one tagged field to indicate that this is a "flexible" message
2428
// type.

protocol/protocol.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,14 @@ type GroupMessage interface {
438438
Group() string
439439
}
440440

441+
// TransactionalMessage is an extension of the Message interface implemented by some
442+
// request types to inform the program that they should be reouted to a transaction
443+
// coordinator.
444+
type TransactionalMessage interface {
445+
// Returns the transactional id configured on the message.
446+
Transaction() string
447+
}
448+
441449
// PreparedMessage is an extension of the Message interface implemented by some
442450
// request types which may need to run some pre-processing on their state before
443451
// being sent.

protocol/txnoffsetcommit/txnoffsetcommit.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ type RequestPartition struct {
4343

4444
func (r *Request) ApiKey() protocol.ApiKey { return protocol.TxnOffsetCommit }
4545

46+
func (r *Request) Group() string { return r.GroupID }
47+
48+
var _ protocol.GroupMessage = (*Request)(nil)
49+
4650
type Response struct {
4751
// We need at least one tagged field to indicate that this is a "flexible" message
4852
// type.

transport.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,16 @@ func (p *connPool) sendRequest(ctx context.Context, req Request, state connPoolS
649649
return reject(err)
650650
}
651651
brokerID = r.(*findcoordinator.Response).NodeID
652+
case protocol.TransactionalMessage:
653+
p := p.sendRequest(ctx, &findcoordinator.Request{
654+
Key: m.Transaction(),
655+
KeyType: int8(CoordinatorKeyTypeTransaction),
656+
}, state)
657+
r, err := p.await(ctx)
658+
if err != nil {
659+
return reject(err)
660+
}
661+
brokerID = r.(*findcoordinator.Response).NodeID
652662
}
653663

654664
var c *conn

0 commit comments

Comments
 (0)