Add sdf kafka poll latencies#34275
Conversation
|
R: @scwhittle |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
Outdated
Show resolved
Hide resolved
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
Outdated
Show resolved
Hide resolved
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Outdated
Show resolved
Hide resolved
6f9eb14 to
31585e7
Compare
|
Run Java PreCommit |
| rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); | ||
| KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); | ||
| rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics); | ||
| kafkaMetrics.flushBufferedMetrics(); |
There was a problem hiding this comment.
what about not flushing here and just falling through?
would it be ok to just flush once outside the while loop with a try { } finally { kafkaMetrics.flushBufferedMetrics(); } block?
There was a problem hiding this comment.
I didn't do it because of the returns, but apparently the finally block will run and then return, so that will work. I have modified the code.
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Show resolved
Hide resolved
|
Run Java PreCommit |
|
Run Java PreCommit |
| } else { | ||
| Preconditions.checkStateNotNull(this.extractOutputTimestampFn); | ||
| outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord); | ||
| KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); |
There was a problem hiding this comment.
can we instead have the metrics lifetime longer? Seems like avoiding allocation/flush per pull will be better for performance.
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
while (true) {
}
} finally {
}
There was a problem hiding this comment.
So the way the container is implemented is that once its flushed, we dont reuse it. I'm less usre of the context behind this decision, I dont see why we cant change it, as I agree it would make it more efficient, I'll leave that for a subsequent PR.
There was a problem hiding this comment.
I still think it would only be flushed once. it would just live the whole time and be flushed at the end once.
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Outdated
Show resolved
Hide resolved
5ad81c7 to
4ac4b06
Compare
4ac4b06 to
a900a43
Compare
a900a43 to
ad9471a
Compare
| } else { | ||
| Preconditions.checkStateNotNull(this.extractOutputTimestampFn); | ||
| outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord); | ||
| KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); |
There was a problem hiding this comment.
I still think it would only be flushed once. it would just live the whole time and be flushed at the end once.
ad9471a to
e72dc1e
Compare
|
Run Java PreCommit) |
|
Run Java PreCommit |
|
https://github.com/apache/beam/actions/runs/13978267141/job/39140793274?pr=34275 is not related to this PR. |
|
|
* add kafka sdf metrics * address comments * address more comments * address comments --------- Co-authored-by: Naireen <naireenhussain@google.com>
Adds sdf poll metrics to kafka for runner v2
Should be merged after #34244
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.