Skip to content

Commit 60630af

Browse files
authored
Merge pull request #36077: [KafkaIO] Only update size metrics once per batch
2 parents 48b453a + 520b456 commit 60630af

File tree

2 files changed

+72
-54
lines changed

2 files changed

+72
-54
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,23 @@ private void setAvg(final double value) {
168168
AVG.lazySet(this, Double.doubleToRawLongBits(value));
169169
}
170170

171-
private long incrementAndGetNumUpdates() {
172-
final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1);
173-
numUpdates = nextNumUpdates;
174-
return nextNumUpdates;
171+
public void update(final double quantity) {
172+
final double prevAvg = getAvg(); // volatile load (acquire)
173+
174+
final long nextNumUpdates = numUpdates + 1; // normal load
175+
final double nextAvg = prevAvg + (quantity - prevAvg) / nextNumUpdates;
176+
177+
numUpdates = Math.min(MOVING_AVG_WINDOW, nextNumUpdates); // normal store
178+
setAvg(nextAvg); // ordered store (release)
175179
}
176180

177-
public void update(final double quantity) {
181+
public void update(final double sum, final long count) {
178182
final double prevAvg = getAvg(); // volatile load (acquire)
179183

180-
final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store
181-
final double nextAvg = prevAvg + (quantity - prevAvg) / nextNumUpdates; // normal load/store
184+
final long nextNumUpdates = numUpdates + count; // normal load
185+
final double nextAvg = prevAvg + (sum / count - prevAvg) * ((double) count / nextNumUpdates);
182186

187+
numUpdates = Math.min(MOVING_AVG_WINDOW, nextNumUpdates); // normal store
183188
setAvg(nextAvg); // ordered store (release)
184189
}
185190

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -660,57 +660,70 @@ public ProcessContinuation processElement(
660660

661661
// Visible progress within the consumer polling timeout.
662662
// Partially or fully claim and process records in this batch.
663-
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
664-
if (!tracker.tryClaim(rawRecord.offset())) {
665-
consumer.seek(topicPartition, rawRecord.offset());
666-
consumer.pause(Collections.singleton(topicPartition));
663+
long rawSizesSum = 0L;
664+
long rawSizesCount = 0L;
665+
long rawSizesMin = Long.MAX_VALUE;
666+
long rawSizesMax = Long.MIN_VALUE;
667+
try {
668+
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
669+
if (!tracker.tryClaim(rawRecord.offset())) {
670+
consumer.seek(topicPartition, rawRecord.offset());
671+
consumer.pause(Collections.singleton(topicPartition));
667672

668-
return ProcessContinuation.stop();
669-
}
670-
expectedOffset = rawRecord.offset() + 1;
671-
try {
672-
KafkaRecord<K, V> kafkaRecord =
673-
new KafkaRecord<>(
674-
rawRecord.topic(),
675-
rawRecord.partition(),
676-
rawRecord.offset(),
677-
ConsumerSpEL.getRecordTimestamp(rawRecord),
678-
ConsumerSpEL.getRecordTimestampType(rawRecord),
679-
ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
680-
ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord),
681-
ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord));
682-
int recordSize =
683-
(rawRecord.key() == null ? 0 : rawRecord.key().length)
684-
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
685-
avgRecordSize.update(recordSize);
686-
rawSizes.update(recordSize);
687-
Instant outputTimestamp;
688-
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
689-
// WatermarkEstimator should be a manual one.
690-
if (timestampPolicy != null) {
691-
TimestampPolicyContext context =
692-
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
693-
outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
694-
} else {
695-
Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
696-
outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
673+
return ProcessContinuation.stop();
697674
}
698-
receiver
699-
.get(recordTag)
700-
.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
701-
} catch (SerializationException e) {
702-
// This exception should only occur during the key and value deserialization when
703-
// creating the Kafka Record
704-
badRecordRouter.route(
705-
receiver,
706-
rawRecord,
707-
null,
708-
e,
709-
"Failure deserializing Key or Value of Kakfa record reading from Kafka");
710-
if (timestampPolicy != null) {
711-
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
675+
expectedOffset = rawRecord.offset() + 1;
676+
try {
677+
KafkaRecord<K, V> kafkaRecord =
678+
new KafkaRecord<>(
679+
rawRecord.topic(),
680+
rawRecord.partition(),
681+
rawRecord.offset(),
682+
ConsumerSpEL.getRecordTimestamp(rawRecord),
683+
ConsumerSpEL.getRecordTimestampType(rawRecord),
684+
ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
685+
ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord),
686+
ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord));
687+
int recordSize =
688+
(rawRecord.key() == null ? 0 : rawRecord.key().length)
689+
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
690+
rawSizesSum = rawSizesSum + recordSize;
691+
rawSizesCount = rawSizesCount + 1L;
692+
rawSizesMin = Math.min(rawSizesMin, recordSize);
693+
rawSizesMax = Math.max(rawSizesMax, recordSize);
694+
Instant outputTimestamp;
695+
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
696+
// WatermarkEstimator should be a manual one.
697+
if (timestampPolicy != null) {
698+
TimestampPolicyContext context =
699+
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
700+
outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
701+
} else {
702+
Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
703+
outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
704+
}
705+
receiver
706+
.get(recordTag)
707+
.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
708+
} catch (SerializationException e) {
709+
// This exception should only occur during the key and value deserialization when
710+
// creating the Kafka Record
711+
badRecordRouter.route(
712+
receiver,
713+
rawRecord,
714+
null,
715+
e,
716+
"Failure deserializing Key or Value of Kakfa record reading from Kafka");
717+
if (timestampPolicy != null) {
718+
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
719+
}
712720
}
713721
}
722+
} finally {
723+
if (rawSizesCount > 0L) {
724+
avgRecordSize.update(rawSizesSum, rawSizesCount);
725+
rawSizes.update(rawSizesSum, rawSizesCount, rawSizesMin, rawSizesMax);
726+
}
714727
}
715728

716729
// Non-visible progress within the consumer polling timeout.

0 commit comments

Comments
 (0)