diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 3fd7b5347d86..3fc078e83d7e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -321,9 +321,7 @@ public MetricUpdates getUpdates() { // Add any metricKey labels to the monitoringInfoLabels. if (!metricName.getLabels().isEmpty()) { - for (Map.Entry entry : metricName.getLabels().entrySet()) { - builder.setLabel(entry.getKey(), entry.getValue()); - } + builder.setLabels(metricName.getLabels()); } return builder; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index 5de273a11434..7b1b3f698935 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -182,12 +182,12 @@ public static double decodeDoubleCounter(ByteString payload) { } } - /** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + /** Encodes to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM}. */ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) { return inputHistogram.toProto().toByteString(); } - /** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + /** Decodes to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM}. */ public static HistogramData decodeInt64Histogram(ByteString payload) { try { return new HistogramData(HistogramValue.parseFrom(payload)); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 5e16447241e7..3ab6c4f502e7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -447,123 +447,126 @@ public ProcessContinuation processElement( long skippedRecords = 0L; final Stopwatch sw = Stopwatch.createStarted(); - while (true) { - // Fetch the record size accumulator. - final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor); - rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); - // When there are no records available for the current TopicPartition, self-checkpoint - // and move to process the next element. - if (rawRecords.isEmpty()) { - if (!topicPartitionExists( - kafkaSourceDescriptor.getTopicPartition(), - consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) { - return ProcessContinuation.stop(); - } - if (timestampPolicy != null) { - updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); - } - return ProcessContinuation.resume(); - } - for (ConsumerRecord rawRecord : rawRecords) { - // If the Kafka consumer returns a record with an offset that is already processed - // the record can be safely skipped. This is needed because there is a possibility - // that the seek() above fails to move the offset to the desired position. In which - // case poll() would return records that are already cnsumed. - if (rawRecord.offset() < startOffset) { - // If the start offset is not reached even after skipping the records for 10 seconds - // then the processing is stopped with a backoff to give the Kakfa server some time - // catch up. - if (sw.elapsed().getSeconds() > 10L) { - LOG.error( - "The expected offset ({}) was not reached even after" - + " skipping consumed records for 10 seconds. The offset we could" - + " reach was {}. The processing of this bundle will be attempted" - + " at a later time.", - expectedOffset, - rawRecord.offset()); - return ProcessContinuation.resume() - .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); - } - skippedRecords++; - continue; - } - if (skippedRecords > 0L) { - LOG.warn( - "{} records were skipped due to seek returning an" - + " earlier position than requested position of {}", - skippedRecords, - expectedOffset); - skippedRecords = 0L; - } - if (!tracker.tryClaim(rawRecord.offset())) { - return ProcessContinuation.stop(); - } - try { - KafkaRecord kafkaRecord = - new KafkaRecord<>( - rawRecord.topic(), - rawRecord.partition(), - rawRecord.offset(), - ConsumerSpEL.getRecordTimestamp(rawRecord), - ConsumerSpEL.getRecordTimestampType(rawRecord), - ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, - ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord), - ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord)); - int recordSize = - (rawRecord.key() == null ? 0 : rawRecord.key().length) - + (rawRecord.value() == null ? 0 : rawRecord.value().length); - avgRecordSize.update(recordSize); - rawSizes.update(recordSize); - expectedOffset = rawRecord.offset() + 1; - Instant outputTimestamp; - // The outputTimestamp and watermark will be computed by timestampPolicy, where the - // WatermarkEstimator should be a manual one. - if (timestampPolicy != null) { - TimestampPolicyContext context = - updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); - outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord); - } else { - Preconditions.checkStateNotNull(this.extractOutputTimestampFn); - outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord); + KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); + try { + while (true) { + // Fetch the record size accumulator. + final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor); + rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics); + // When there are no records available for the current TopicPartition, self-checkpoint + // and move to process the next element. + if (rawRecords.isEmpty()) { + if (!topicPartitionExists( + kafkaSourceDescriptor.getTopicPartition(), + consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) { + return ProcessContinuation.stop(); } - receiver - .get(recordTag) - .outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp); - } catch (SerializationException e) { - // This exception should only occur during the key and value deserialization when - // creating the Kafka Record - badRecordRouter.route( - receiver, - rawRecord, - null, - e, - "Failure deserializing Key or Value of Kakfa record reading from Kafka"); if (timestampPolicy != null) { updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); } + return ProcessContinuation.resume(); + } + for (ConsumerRecord rawRecord : rawRecords) { + // If the Kafka consumer returns a record with an offset that is already processed + // the record can be safely skipped. This is needed because there is a possibility + // that the seek() above fails to move the offset to the desired position. In which + // case poll() would return records that are already cnsumed. + if (rawRecord.offset() < startOffset) { + // If the start offset is not reached even after skipping the records for 10 seconds + // then the processing is stopped with a backoff to give the Kakfa server some time + // catch up. + if (sw.elapsed().getSeconds() > 10L) { + LOG.error( + "The expected offset ({}) was not reached even after" + + " skipping consumed records for 10 seconds. The offset we could" + + " reach was {}. The processing of this bundle will be attempted" + + " at a later time.", + expectedOffset, + rawRecord.offset()); + return ProcessContinuation.resume() + .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); + } + skippedRecords++; + continue; + } + if (skippedRecords > 0L) { + LOG.warn( + "{} records were skipped due to seek returning an" + + " earlier position than requested position of {}", + skippedRecords, + expectedOffset); + skippedRecords = 0L; + } + if (!tracker.tryClaim(rawRecord.offset())) { + return ProcessContinuation.stop(); + } + try { + KafkaRecord kafkaRecord = + new KafkaRecord<>( + rawRecord.topic(), + rawRecord.partition(), + rawRecord.offset(), + ConsumerSpEL.getRecordTimestamp(rawRecord), + ConsumerSpEL.getRecordTimestampType(rawRecord), + ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, + ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord), + ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord)); + int recordSize = + (rawRecord.key() == null ? 0 : rawRecord.key().length) + + (rawRecord.value() == null ? 0 : rawRecord.value().length); + avgRecordSize.update(recordSize); + rawSizes.update(recordSize); + expectedOffset = rawRecord.offset() + 1; + Instant outputTimestamp; + // The outputTimestamp and watermark will be computed by timestampPolicy, where the + // WatermarkEstimator should be a manual one. + if (timestampPolicy != null) { + TimestampPolicyContext context = + updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); + outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord); + } else { + Preconditions.checkStateNotNull(this.extractOutputTimestampFn); + outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord); + } + receiver + .get(recordTag) + .outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp); + } catch (SerializationException e) { + // This exception should only occur during the key and value deserialization when + // creating the Kafka Record + badRecordRouter.route( + receiver, + rawRecord, + null, + e, + "Failure deserializing Key or Value of Kakfa record reading from Kafka"); + if (timestampPolicy != null) { + updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); + } + } } - } - backlogBytes.set( - (long) - (BigDecimal.valueOf( - Preconditions.checkStateNotNull( - offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) - .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) - .doubleValue() - * avgRecordSize.get())); - KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); - kafkaResults.updateBacklogBytes( - kafkaSourceDescriptor.getTopic(), - kafkaSourceDescriptor.getPartition(), - (long) - (BigDecimal.valueOf( - Preconditions.checkStateNotNull( - offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) - .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) - .doubleValue() - * avgRecordSize.get())); - kafkaResults.flushBufferedMetrics(); + backlogBytes.set( + (long) + (BigDecimal.valueOf( + Preconditions.checkStateNotNull( + offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) + .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) + .doubleValue() + * avgRecordSize.get())); + kafkaMetrics.updateBacklogBytes( + kafkaSourceDescriptor.getTopic(), + kafkaSourceDescriptor.getPartition(), + (long) + (BigDecimal.valueOf( + Preconditions.checkStateNotNull( + offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) + .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) + .doubleValue() + * avgRecordSize.get())); + } + } finally { + kafkaMetrics.flushBufferedMetrics(); } } } @@ -577,13 +580,16 @@ private boolean topicPartitionExists( // see https://github.com/apache/beam/issues/25962 private ConsumerRecords poll( - Consumer consumer, TopicPartition topicPartition) { + Consumer consumer, TopicPartition topicPartition, KafkaMetrics kafkaMetrics) { final Stopwatch sw = Stopwatch.createStarted(); long previousPosition = -1; - java.time.Duration elapsed = java.time.Duration.ZERO; java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); + java.time.Duration elapsed = java.time.Duration.ZERO; while (true) { final ConsumerRecords rawRecords = consumer.poll(timeout.minus(elapsed)); + elapsed = sw.elapsed(); + kafkaMetrics.updateSuccessfulRpcMetrics( + topicPartition.topic(), java.time.Duration.ofMillis(elapsed.toMillis())); if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -592,7 +598,6 @@ private ConsumerRecords poll( // there was no progress on the offset/position, which indicates end of stream return rawRecords; } - elapsed = sw.elapsed(); if (elapsed.toMillis() >= timeout.toMillis()) { // timeout is over LOG.warn(