Skip to content

Commit 92d6728

Browse files
Demogorgon314eolivelli
authored andcommitted
Fix pulsar entry formatter encode zero timestamp record caused exception (#1839)
### Motivation When using pulsar entry formatter and publishing zero timestamp record will cause request timeout, the root cause is we will set the event time when timestamp >= 0, but the event time has a check to ensure time > 0. See: ```java public PulsarMessageBuilder eventTime(long timestamp) { checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp); metadata.setEventTime(timestamp); return this; } ``` ### Modifications * Do not check the event time before setting the event time. * Add test to verify this issue. (cherry picked from commit c7cb1cf)
1 parent decacce commit 92d6728

File tree

3 files changed

+34
-8
lines changed

3 files changed

+34
-8
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private static MessageImpl<ByteBuffer> recordToEntry(Record record) {
154154

155155
// timestamp
156156
if (record.timestamp() >= 0) {
157-
builder.eventTime(record.timestamp());
157+
builder.getMetadataBuilder().setEventTime(record.timestamp());
158158
builder.getMetadataBuilder().setPublishTime(record.timestamp());
159159
} else {
160160
builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis());

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/PulsarMessageBuilder.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,6 @@ public PulsarMessageBuilder properties(Map<String, String> properties) {
9393
return this;
9494
}
9595

96-
public PulsarMessageBuilder eventTime(long timestamp) {
97-
checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
98-
metadata.setEventTime(timestamp);
99-
return this;
100-
}
101-
10296
public MessageMetadata getMetadataBuilder() {
10397
return metadata;
10498
}
@@ -113,4 +107,4 @@ public Message<ByteBuffer> getMessage() {
113107
return MessageImpl.create(metadata, content, SCHEMA, null);
114108
}
115109

116-
}
110+
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndPulsarTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
2121
import java.time.Duration;
2222
import java.util.Arrays;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Optional;
27+
import java.util.Properties;
2628
import java.util.TreeMap;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.stream.Collectors;
2931
import java.util.stream.IntStream;
3032
import lombok.Cleanup;
33+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3134
import org.apache.kafka.clients.consumer.KafkaConsumer;
3235
import org.apache.kafka.clients.producer.KafkaProducer;
36+
import org.apache.kafka.clients.producer.ProducerRecord;
3337
import org.apache.pulsar.broker.service.Topic;
3438
import org.apache.pulsar.client.api.Consumer;
3539
import org.apache.pulsar.client.api.Producer;
@@ -136,4 +140,32 @@ public void testSkipReplicatedSubscriptionsMarker() throws Exception {
136140
kafkaConsumer.commitSync(Duration.ofSeconds(1));
137141
kafkaConsumer.close();
138142
}
143+
144+
@Test(timeOut = 30000)
145+
public void testPublishZeroTimestampRecord() {
146+
String topic = "test-publish-zero-timestamp-record";
147+
String subscription = "test-group";
148+
Properties properties = newKafkaProducerProperties();
149+
@Cleanup
150+
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
151+
producer.send(new ProducerRecord<>(
152+
topic,
153+
null,
154+
0L,
155+
"k1",
156+
"v1",
157+
null)
158+
);
159+
160+
producer.flush();
161+
162+
@Cleanup
163+
final KafkaConsumer<String, String> consumer = newKafkaConsumer(topic, subscription);
164+
consumer.subscribe(Collections.singleton(topic));
165+
List<ConsumerRecord<String, String>> consumerRecords = receiveRecords(consumer, 1);
166+
assertEquals(consumerRecords.size(), 1);
167+
assertEquals(consumerRecords.get(0).key(), "k1");
168+
assertEquals(consumerRecords.get(0).value(), "v1");
169+
assertEquals(consumerRecords.get(0).timestamp(), 0L);
170+
}
139171
}

0 commit comments

Comments
 (0)