Skip to content

Commit 1f9e828

Browse files
[Issue 1399] add eventTime in reconsumeLaterWithCustomProperties func (#1400)
Fixes #1399 Master Issue: #1399 ### Motivation `ReconsumeLater` doesnot adds the eventTime in new rebuilded consumer Message before pushing to Retry or Dlq topic ### Modifications ```go // line 639 (consumer_impl.go) consumerMsg := ConsumerMessage{ Consumer: c, Message: &message{ payLoad: msg.Payload(), properties: props, msgID: msgID, eventTime: msg.EventTime(), //<-- eventTime is added }, } if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { c.dlq.Chan() <- consumerMsg } else { c.rlq.Chan() <- RetryMessage{ consumerMsg: consumerMsg, producerMsg: ProducerMessage{ Payload: msg.Payload(), Key: msg.Key(), OrderingKey: msg.OrderingKey(), Properties: props, DeliverAfter: delay, EventTime: msg.EventTime(), //<--eventTime is added }, } } ```
1 parent 5a320e8 commit 1f9e828

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

pulsar/consumer_impl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
642642
payLoad: msg.Payload(),
643643
properties: props,
644644
msgID: msgID,
645+
eventTime: msg.EventTime(),
645646
},
646647
}
647648
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
@@ -655,6 +656,7 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert
655656
OrderingKey: msg.OrderingKey(),
656657
Properties: props,
657658
DeliverAfter: delay,
659+
EventTime: msg.EventTime(),
658660
},
659661
}
660662
}

pulsar/consumer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,6 +2047,75 @@ func TestRLQWithCustomProperties(t *testing.T) {
20472047
assert.Nil(t, checkMsg)
20482048
}
20492049

2050+
// Test function to test Retry Logic with Custom Properties and Event Time
2051+
func TestRLQWithCustomPropertiesEventTime(t *testing.T) {
2052+
topic := newTopicName()
2053+
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
2054+
makeHTTPCall(t, http.MethodPut, testURL, "3")
2055+
2056+
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
2057+
maxRedeliveries := 2
2058+
ctx := context.Background()
2059+
2060+
client, err := NewClient(ClientOptions{URL: lookupURL})
2061+
assert.Nil(t, err)
2062+
defer client.Close()
2063+
2064+
// 1. Create producer and send a message with custom event time
2065+
producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
2066+
assert.Nil(t, err)
2067+
defer producer.Close()
2068+
2069+
expectedEventTime := timeFromUnixTimestampMillis(uint64(1565161612000)) // Custom event time
2070+
_, err = producer.Send(ctx, &ProducerMessage{
2071+
Payload: []byte("MESSAGE_WITH_EVENT_TIME"),
2072+
EventTime: expectedEventTime,
2073+
})
2074+
assert.Nil(t, err)
2075+
2076+
// 2. Create consumer on the Retry Topic
2077+
rlqConsumer, err := client.Subscribe(ConsumerOptions{
2078+
Topic: topic,
2079+
SubscriptionName: subName,
2080+
Type: Shared,
2081+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
2082+
DLQ: &DLQPolicy{
2083+
MaxDeliveries: uint32(maxRedeliveries),
2084+
},
2085+
RetryEnable: true,
2086+
NackRedeliveryDelay: 1 * time.Second,
2087+
})
2088+
assert.Nil(t, err)
2089+
defer rlqConsumer.Close()
2090+
2091+
// 3. Receive the original message and verify event time
2092+
msg, err := rlqConsumer.Receive(ctx)
2093+
assert.Nil(t, err)
2094+
assert.Equal(t, expectedEventTime.Unix(), msg.EventTime().Unix(),
2095+
"Original message should have the expected event time")
2096+
2097+
// 4. ReconsumeLater with custom properties and verify event time is preserved
2098+
customProps := map[string]string{
2099+
"custom-key-1": "custom-value-1",
2100+
}
2101+
rlqConsumer.ReconsumeLaterWithCustomProperties(msg, customProps, 1*time.Second)
2102+
2103+
// 5. Receive the reconsumed message and verify event time is preserved
2104+
retryMsg, err := rlqConsumer.Receive(ctx)
2105+
assert.Nil(t, err)
2106+
assert.Equal(t, expectedEventTime.Unix(), retryMsg.EventTime().Unix(),
2107+
"Reconsumed message should preserve the original event time")
2108+
2109+
// 6. Verify custom properties are also preserved
2110+
msgProps := retryMsg.Properties()
2111+
value, ok := msgProps["custom-key-1"]
2112+
assert.True(t, ok, "Custom property should be present")
2113+
assert.Equal(t, "custom-value-1", value, "Custom property value should match")
2114+
2115+
// 7. Clean up - ack the message to avoid further redeliveries
2116+
rlqConsumer.Ack(retryMsg)
2117+
}
2118+
20502119
func TestAckWithResponse(t *testing.T) {
20512120
now := time.Now().Unix()
20522121
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)

0 commit comments

Comments
 (0)