Skip to content

Commit ec846ff

Browse files
pkumar-singhPrashant Kumar
andauthored
[fix] When topic is terminated. Client must not retry connecting. Pending messages should be failed (#1128)
### Motivation GoLang Pulsar client library has no support for Topic termination. When a topic is terminated following should happen at client library side. 1. Producers should stop reconnecting. As once topic is terminated, it is permanent. 2. All the pending messages should be failed. ### Modifications If reconnect is failing with TopicTerminated error. Run through the pending messages queue and complete the callback. After that exit the reconnect loop and set producer state as closing. Marking producer state producerClosing will ensure that new messages are immediately failed. Co-authored-by: Prashant Kumar <[email protected]>
1 parent ef0ba67 commit ec846ff

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

pulsar/producer_partition.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,27 @@ func (p *partitionProducer) reconnectToBroker() {
447447
break
448448
}
449449

450+
if strings.Contains(errMsg, "TopicTerminatedError") {
451+
p.log.Info("Topic was terminated, failing pending messages, will not reconnect")
452+
pendingItems := p.pendingQueue.ReadableSlice()
453+
for _, item := range pendingItems {
454+
pi := item.(*pendingItem)
455+
if pi != nil {
456+
pi.Lock()
457+
requests := pi.sendRequests
458+
for _, req := range requests {
459+
sr := req.(*sendRequest)
460+
if sr != nil {
461+
sr.done(nil, newError(TopicTerminated, err.Error()))
462+
}
463+
}
464+
pi.Unlock()
465+
}
466+
}
467+
p.setProducerState(producerClosing)
468+
break
469+
}
470+
450471
if maxRetry > 0 {
451472
maxRetry--
452473
}

pulsar/producer_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,62 @@ func TestFailedSchemaEncode(t *testing.T) {
11581158
wg.Wait()
11591159
}
11601160

1161+
func TestTopicTermination(t *testing.T) {
1162+
client, err := NewClient(ClientOptions{
1163+
URL: serviceURL,
1164+
})
1165+
assert.NoError(t, err)
1166+
defer client.Close()
1167+
1168+
topicName := newTopicName()
1169+
consumer, err := client.Subscribe(ConsumerOptions{
1170+
Topic: topicName,
1171+
SubscriptionName: "send_timeout_sub",
1172+
})
1173+
assert.Nil(t, err)
1174+
defer consumer.Close() // subscribe but do nothing
1175+
1176+
producer, err := client.CreateProducer(ProducerOptions{
1177+
Topic: topicName,
1178+
SendTimeout: 2 * time.Second,
1179+
})
1180+
assert.Nil(t, err)
1181+
defer producer.Close()
1182+
1183+
afterCh := time.After(5 * time.Second)
1184+
terminatedChan := make(chan bool)
1185+
go func() {
1186+
for {
1187+
_, err := producer.Send(context.Background(), &ProducerMessage{
1188+
Payload: make([]byte, 1024),
1189+
})
1190+
if err != nil {
1191+
e := err.(*Error)
1192+
if e.result == TopicTerminated {
1193+
terminatedChan <- true
1194+
} else {
1195+
terminatedChan <- false
1196+
}
1197+
}
1198+
time.Sleep(1 * time.Millisecond)
1199+
}
1200+
}()
1201+
1202+
terminateURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/terminate"
1203+
log.Info(terminateURL)
1204+
makeHTTPCall(t, http.MethodPost, terminateURL, "")
1205+
1206+
for {
1207+
select {
1208+
case d := <-terminatedChan:
1209+
assert.Equal(t, d, true)
1210+
return
1211+
case <-afterCh:
1212+
assert.Fail(t, "Time is up. Topic should have been terminated by now")
1213+
}
1214+
}
1215+
}
1216+
11611217
func TestSendTimeout(t *testing.T) {
11621218
quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota"
11631219
quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`

0 commit comments

Comments
 (0)