We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 0a7d80f commit 4f9d560Copy full SHA for 4f9d560
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -494,11 +494,10 @@ synchronized long approxBacklogInBytes() {
494
}
495
496
synchronized long backlogMessageCount() {
497
- if (latestOffset < 0 || nextOffset < 0) {
+ if (latestOffset < 0 || nextOffset < 0 || latestOffset < nextOffset) {
498
return UnboundedReader.BACKLOG_UNKNOWN;
499
500
- double remaining = latestOffset - nextOffset;
501
- return Math.max(0, (long) Math.ceil(remaining));
+ return latestOffset - nextOffset;
502
503
504
synchronized TimestampPolicyContext mkTimestampPolicyContext() {
0 commit comments