Skip to content

Commit 6fdb010

Browse files
set lag to 0 if no messages consumed (opensearch-project#19393)
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent df3b3ae commit 6fdb010

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6363
- [Flaky Test] Fix flaky test in SecureReactorNetty4HttpServerTransportTests with reproducible seed ([#19327](https://github.com/opensearch-project/OpenSearch/pull/19327))
6464
- Remove unnecessary looping in field data cache clear ([#19116](https://github.com/opensearch-project/OpenSearch/pull/19116))
6565
- [Flaky Test] Fix flaky test IngestFromKinesisIT.testAllActiveIngestion ([#19380](https://github.com/opensearch-project/OpenSearch/pull/19380))
66+
- Fix lag metric for pull-based ingestion when streaming source is empty ([#19393](https://github.com/opensearch-project/OpenSearch/pull/19393))
6667

6768
### Dependencies
6869
- Update to Gradle 9.1.0 ([#19329](https://github.com/opensearch-project/OpenSearch/pull/19329))

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ protected void startPoll() {
232232

233233
if (results.isEmpty()) {
234234
// no new records
235+
setLastPolledMessageTimestamp(0);
236+
Thread.sleep(DEFAULT_POLLER_SLEEP_PERIOD_MS);
235237
continue;
236238
}
237239

@@ -267,7 +269,7 @@ private IngestionShardPointer processRecords(
267269
}
268270
totalPolledCount.inc();
269271
blockingQueueContainer.add(result);
270-
lastPolledMessageTimestamp = result.getMessage().getTimestamp() == null ? 0 : result.getMessage().getTimestamp();
272+
setLastPolledMessageTimestamp(result.getMessage().getTimestamp() == null ? 0 : result.getMessage().getTimestamp());
271273
logger.debug(
272274
"Put message {} with pointer {} to the blocking queue",
273275
String.valueOf(result.getMessage().getPayload()),
@@ -403,9 +405,19 @@ public PollingIngestStats getStats() {
403405
* Returns the lag in milliseconds since the last polled message
404406
*/
405407
private long computeLag() {
408+
if (lastPolledMessageTimestamp == 0 || paused) {
409+
return 0;
410+
}
411+
406412
return System.currentTimeMillis() - lastPolledMessageTimestamp;
407413
}
408414

415+
private void setLastPolledMessageTimestamp(long timestamp) {
416+
if (lastPolledMessageTimestamp != timestamp) {
417+
lastPolledMessageTimestamp = timestamp;
418+
}
419+
}
420+
409421
public State getState() {
410422
return this.state;
411423
}

0 commit comments

Comments
 (0)