Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -1145,6 +1146,8 @@ protected void close(Timer timer, CloseOptions.GroupMembershipOperation membersh
log.warn("Close timed out with {} pending requests to coordinator, terminating client connections",
client.pendingRequestCount(coordinator));
}

Utils.closeQuietly(sensors, "coordinator metrics");
}
}

Expand Down Expand Up @@ -1330,97 +1333,76 @@ boolean generationUnchanged() {
}
}

protected final Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
return new Meter(new WindowedCount(),
metrics.metricName(baseName + "-rate", groupName,
String.format("The number of %s per second", descriptiveName)),
metrics.metricName(baseName + "-total", groupName,
String.format("The total number of %s", descriptiveName)));
}

/**
* Visible for testing.
*/
protected BaseHeartbeatThread heartbeatThread() {
return heartbeatThread;
}

private class GroupCoordinatorMetrics {
public final String metricGrpName;

private class GroupCoordinatorMetrics extends AbstractConsumerMetricsManager {
public final Sensor heartbeatSensor;
public final Sensor joinSensor;
public final Sensor syncSensor;
public final Sensor successfulRebalanceSensor;
public final Sensor failedRebalanceSensor;

@SuppressWarnings({"this-escape"})
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
super(metrics, metricGrpPrefix + "-coordinator-metrics");

this.heartbeatSensor = metrics.sensor("heartbeat-latency");
this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max",
this.metricGrpName,
this.heartbeatSensor = sensor("heartbeat-latency");
this.heartbeatSensor.add(metricName("heartbeat-response-time-max",
"The max time taken to receive a response to a heartbeat request"), new Max());
this.heartbeatSensor.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats"));
this.heartbeatSensor.add(createMeter("heartbeat", "heartbeats"));

this.joinSensor = metrics.sensor("join-latency");
this.joinSensor.add(metrics.metricName("join-time-avg",
this.metricGrpName,
this.joinSensor = sensor("join-latency");
this.joinSensor.add(metricName("join-time-avg",
"The average time taken for a group rejoin"), new Avg());
this.joinSensor.add(metrics.metricName("join-time-max",
this.metricGrpName,
this.joinSensor.add(metricName("join-time-max",
"The max time taken for a group rejoin"), new Max());
this.joinSensor.add(createMeter(metrics, metricGrpName, "join", "group joins"));
this.joinSensor.add(createMeter("join", "group joins"));

this.syncSensor = metrics.sensor("sync-latency");
this.syncSensor.add(metrics.metricName("sync-time-avg",
this.metricGrpName,
this.syncSensor = sensor("sync-latency");
this.syncSensor.add(metricName("sync-time-avg",
"The average time taken for a group sync"), new Avg());
this.syncSensor.add(metrics.metricName("sync-time-max",
this.metricGrpName,
this.syncSensor.add(metricName("sync-time-max",
"The max time taken for a group sync"), new Max());
this.syncSensor.add(createMeter(metrics, metricGrpName, "sync", "group syncs"));
this.syncSensor.add(createMeter("sync", "group syncs"));

this.successfulRebalanceSensor = metrics.sensor("rebalance-latency");
this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg",
this.metricGrpName,
this.successfulRebalanceSensor = sensor("rebalance-latency");
this.successfulRebalanceSensor.add(metricName("rebalance-latency-avg",
"The average time taken for a group to complete a successful rebalance, which may be composed of " +
"several failed re-trials until it succeeded"), new Avg());
this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-max",
this.metricGrpName,
this.successfulRebalanceSensor.add(metricName("rebalance-latency-max",
"The max time taken for a group to complete a successful rebalance, which may be composed of " +
"several failed re-trials until it succeeded"), new Max());
this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-total",
this.metricGrpName,
this.successfulRebalanceSensor.add(metricName("rebalance-latency-total",
"The total number of milliseconds this consumer has spent in successful rebalances since creation"),
new CumulativeSum());
this.successfulRebalanceSensor.add(
metrics.metricName("rebalance-total",
this.metricGrpName,
metricName("rebalance-total",
"The total number of successful rebalance events, each event is composed of " +
"several failed re-trials until it succeeded"),
new CumulativeCount()
);
this.successfulRebalanceSensor.add(
metrics.metricName(
metricName(
"rebalance-rate-per-hour",
this.metricGrpName,
"The number of successful rebalance events per hour, each event is composed of " +
"several failed re-trials until it succeeded"),
new Rate(TimeUnit.HOURS, new WindowedCount())
);

this.failedRebalanceSensor = metrics.sensor("failed-rebalance");
this.failedRebalanceSensor = sensor("failed-rebalance");
this.failedRebalanceSensor.add(
metrics.metricName("failed-rebalance-total",
this.metricGrpName,
metricName("failed-rebalance-total",
"The total number of failed rebalance events"),
new CumulativeCount()
);
this.failedRebalanceSensor.add(
metrics.metricName(
metricName(
"failed-rebalance-rate-per-hour",
this.metricGrpName,
"The number of failed rebalance events per hour"),
new Rate(TimeUnit.HOURS, new WindowedCount())
);
Expand All @@ -1432,8 +1414,7 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
else
return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS);
};
metrics.addMetric(metrics.metricName("last-rebalance-seconds-ago",
this.metricGrpName,
addMetric(metricName("last-rebalance-seconds-ago",
"The number of seconds since the last successful rebalance event"),
lastRebalance);

Expand All @@ -1444,11 +1425,18 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
else
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
};
metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
this.metricGrpName,
addMetric(metricName("last-heartbeat-seconds-ago",
"The number of seconds since the last coordinator heartbeat was sent"),
lastHeartbeat);
}

protected final Meter createMeter(String baseName, String descriptiveName) {
return new Meter(new WindowedCount(),
metricName(baseName + "-rate",
String.format("The number of %s per second", descriptiveName)),
metricName(baseName + "-total",
String.format("The total number of %s", descriptiveName)));
}
}

private class HeartbeatThread extends BaseHeartbeatThread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final AtomicReference<Optional<ConsumerGroupMetadata>> groupMetadata = new AtomicReference<>(Optional.empty());
private final FetchMetricsManager fetchMetricsManager;
private final RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager;
private final AsyncConsumerMetrics asyncConsumerMetrics;
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
Expand Down Expand Up @@ -416,7 +418,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);

FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
this.fetchMetricsManager = createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
this.isolationLevel = fetchConfig.isolationLevel;

Expand Down Expand Up @@ -475,11 +477,12 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
requestManagersSupplier,
asyncConsumerMetrics
);
this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
new RebalanceCallbackMetricsManager(metrics)
rebalanceCallbackMetricsManager
);
this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s ->
new StreamsRebalanceListenerInvoker(logContext, s));
Expand Down Expand Up @@ -518,6 +521,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
String clientId,
Deserializers<K, V> deserializers,
FetchBuffer fetchBuffer,
FetchMetricsManager fetchMetricsManager,
RebalanceCallbackMetricsManager rebalanceCallbackMetricsManager,
FetchCollector<K, V> fetchCollector,
ConsumerInterceptors<K, V> interceptors,
Time time,
Expand All @@ -537,6 +542,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.subscriptions = subscriptions;
this.clientId = clientId;
this.fetchBuffer = fetchBuffer;
this.fetchMetricsManager = fetchMetricsManager;
this.rebalanceCallbackMetricsManager = rebalanceCallbackMetricsManager;
this.fetchCollector = fetchCollector;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
Expand Down Expand Up @@ -591,7 +598,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.clientTelemetryReporter = Optional.empty();

ConsumerMetrics metricsRegistry = new ConsumerMetrics();
FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
this.fetchCollector = new FetchCollector<>(logContext,
metadata,
subscriptions,
Expand All @@ -616,11 +623,12 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
time,
asyncConsumerMetrics
);
this.rebalanceCallbackMetricsManager = new RebalanceCallbackMetricsManager(metrics);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
new RebalanceCallbackMetricsManager(metrics)
rebalanceCallbackMetricsManager
);
ApiVersions apiVersions = new ApiVersions();
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
Expand Down Expand Up @@ -1463,6 +1471,8 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe
closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(asyncConsumerMetrics, "async consumer metrics", firstException);
closeQuietly(fetchMetricsManager, "consumer fetch metrics", firstException);
closeQuietly(rebalanceCallbackMetricsManager, "consumer rebalance callback metrics");
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, "async consumer telemetry reporter", firstException));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private final Optional<String> groupId;
private final ConsumerCoordinator coordinator;
private final Deserializers<K, V> deserializers;
private final FetchMetricsManager fetchMetricsManager;
private final Fetcher<K, V> fetcher;
private final OffsetFetcher offsetFetcher;
private final TopicMetadataFetcher topicMetadataFetcher;
Expand Down Expand Up @@ -191,7 +192,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);

FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
this.fetchMetricsManager = createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
this.isolationLevel = fetchConfig.isolationLevel;

Expand Down Expand Up @@ -362,7 +363,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
boolean checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);

ConsumerMetrics metricsRegistry = new ConsumerMetrics();
FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
this.fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
ApiVersions apiVersions = new ApiVersions();
FetchConfig fetchConfig = new FetchConfig(
minBytes,
Expand All @@ -381,7 +382,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
subscriptions,
fetchConfig,
deserializers,
metricsManager,
fetchMetricsManager,
time,
apiVersions
);
Expand Down Expand Up @@ -1179,6 +1180,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe

closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(fetchMetricsManager, "kafka fetch metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(client, "consumer network client", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
Expand Down
Loading