Skip to content

Commit 6ab93d5

Browse files
fix: return error when the client transaction coordinator is nil to p… (#1444)
* fix: return error when the client transaction coordinator is nil to prevent panic * test: add testcase to ensure error is actually returned
1 parent dff07bd commit 6ab93d5

File tree

2 files changed

+33
-11
lines changed

2 files changed

+33
-11
lines changed

pulsar/client_impl.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"sync"
2424
"time"
2525

26+
"errors"
27+
2628
"github.com/apache/pulsar-client-go/pulsar/auth"
2729
"github.com/apache/pulsar-client-go/pulsar/internal"
2830
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -39,6 +41,8 @@ const (
3941
minConnMaxIdleTime = 60 * time.Second
4042
)
4143

44+
var ErrClientTransactionsNotEnabled = errors.New("transactions are not enabled with the client")
45+
4246
type client struct {
4347
cnxPool internal.ConnectionPool
4448
rpcClient internal.RPCClient
@@ -196,6 +200,10 @@ func newClient(options ClientOptions) (Client, error) {
196200
}
197201

198202
func (c *client) NewTransaction(timeout time.Duration) (Transaction, error) {
203+
if c.tcClient == nil {
204+
return nil, ErrClientTransactionsNotEnabled
205+
}
206+
199207
id, err := c.tcClient.newTransaction(timeout)
200208
if err != nil {
201209
return nil, err

pulsar/transaction_test.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestTxn_TCClient(t *testing.T) {
3535
//1. Prepare: create PulsarClient and init transaction coordinator client.
3636
topic := newTopicName()
3737
sub := "my-sub"
38-
tc, client := createTcClient(t)
38+
tc, client := createClientWithTC(t)
3939
//2. Prepare: create Topic and Subscription.
4040
consumer, err := client.Subscribe(ConsumerOptions{
4141
Topic: topic,
@@ -100,7 +100,7 @@ func TestTxn_TCClient(t *testing.T) {
100100

101101
// Test abort and commit txn
102102
func TestTxn_ImplCommitOrAbort(t *testing.T) {
103-
tc, _ := createTcClient(t)
103+
tc, _ := createClientWithTC(t)
104104
//1. Open a transaction and then commit it.
105105
//The operations of committing txn1 should success at the first time and fail at the second time.
106106
txn1 := createTxn(tc, t)
@@ -133,7 +133,7 @@ func TestTxn_ImplCommitOrAbort(t *testing.T) {
133133

134134
// Test the internal API including the registerSendOrAckOp and endSendOrAckOp.
135135
func TestTxn_RegisterOpAndEndOp(t *testing.T) {
136-
tc, _ := createTcClient(t)
136+
tc, _ := createClientWithTC(t)
137137
//1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted.
138138
res := registerOpAndEndOp(t, tc, 4, 3, nil, true)
139139
assert.Equal(t, res.(*Error).Result(), TimeoutError)
@@ -158,7 +158,7 @@ func TestTxn_RegisterTopic(t *testing.T) {
158158
//1. Prepare: create PulsarClient and init transaction coordinator client.
159159
topic := newTopicName()
160160
sub := "my-sub"
161-
tc, client := createTcClient(t)
161+
tc, client := createClientWithTC(t)
162162
//2. Prepare: create Topic and Subscription.
163163
_, err := client.Subscribe(ConsumerOptions{
164164
Topic: topic,
@@ -205,8 +205,22 @@ func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction {
205205
return newTransaction(*id, tc, txnTimeout)
206206
}
207207

208-
// createTcClient Create a transaction coordinator client to send request
209-
func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
208+
func TestTxn_NoTransactionCoordinator(t *testing.T) {
209+
clientWithNoTC, err := NewClient(ClientOptions{
210+
URL: webServiceURLTLS,
211+
TLSTrustCertsFilePath: caCertsPath,
212+
Authentication: NewAuthenticationTLS(tlsClientCertPath, tlsClientKeyPath),
213+
EnableTransaction: false,
214+
})
215+
require.Nil(t, err, "Failed to create client.")
216+
217+
tx, err := clientWithNoTC.NewTransaction(txnTimeout)
218+
require.Nil(t, tx, "Did not fail creating a new transaction, transaction should be nil")
219+
require.ErrorIs(t, err, ErrClientTransactionsNotEnabled)
220+
}
221+
222+
// createClientWithTC creates a new client with a transaction coordinator client to send request
223+
func createClientWithTC(t *testing.T) (*transactionCoordinatorClient, *client) {
210224
c, err := NewClient(ClientOptions{
211225
URL: webServiceURLTLS,
212226
TLSTrustCertsFilePath: caCertsPath,
@@ -236,7 +250,7 @@ func TestTxn_ConsumeAndProduce(t *testing.T) {
236250
// Step 1: Prepare - Create PulsarClient and initialize the transaction coordinator client.
237251
topic := newTopicName()
238252
sub := "my-sub"
239-
_, client := createTcClient(t)
253+
_, client := createClientWithTC(t)
240254
// Step 2: Prepare - Create Topic and Subscription.
241255
consumer, err := client.Subscribe(ConsumerOptions{
242256
Topic: topic,
@@ -330,7 +344,7 @@ func TestTxn_AckAndSend(t *testing.T) {
330344
sourceTopic := newTopicName()
331345
sinkTopic := newTopicName()
332346
sub := "my-sub"
333-
_, client := createTcClient(t)
347+
_, client := createClientWithTC(t)
334348

335349
// Prepare: Create source and sink topics and subscriptions.
336350
sourceConsumer, _ := client.Subscribe(ConsumerOptions{
@@ -399,7 +413,7 @@ func TestTxn_TransactionAbort(t *testing.T) {
399413
// Prepare: Create PulsarClient and initialize the transaction coordinator client.
400414
topic := newTopicName()
401415
sub := "my-sub"
402-
_, client := createTcClient(t)
416+
_, client := createClientWithTC(t)
403417

404418
// Prepare: Create Topic and Subscription.
405419
consumer, _ := client.Subscribe(ConsumerOptions{
@@ -456,7 +470,7 @@ func TestTxn_AckChunkMessage(t *testing.T) {
456470
sub := "my-sub"
457471

458472
// Prepare: Create PulsarClient and initialize the transaction coordinator client.
459-
_, client := createTcClient(t)
473+
_, client := createClientWithTC(t)
460474

461475
// Create transaction and register the send operation.
462476
txn, err := client.NewTransaction(txnTimeout)
@@ -548,7 +562,7 @@ func TestTxn_ConnReconnect(t *testing.T) {
548562
defer cancel()
549563

550564
topic := newTopicName()
551-
_, cli := createTcClient(t)
565+
_, cli := createClientWithTC(t)
552566

553567
txn, err := cli.NewTransaction(5 * time.Minute)
554568
assert.NoError(t, err)

0 commit comments

Comments
 (0)