Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ private QuorumController(
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
queue.setIdleTimeCallback(controllerMetrics::updateIdleTime);
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.resourceExists = new ConfigResourceExistenceChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.kafka.controller.metrics;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.TimeRatio;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
Expand Down Expand Up @@ -48,6 +50,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
"ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -64,8 +68,10 @@ public class QuorumControllerMetrics implements AutoCloseable {
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
"KafkaController", "NewActiveControllersCount");

private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME = "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
private static final MetricConfig METRIC_CONFIG = new MetricConfig();

private final Optional<MetricsRegistry> registry;
private final Time time;
Expand All @@ -75,6 +81,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final TimeRatio avgIdleTimeRatio;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
Expand Down Expand Up @@ -109,6 +116,7 @@ public Integer value() {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater = newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
@Override
public Long value() {
Expand Down Expand Up @@ -157,6 +165,16 @@ public Long value() {
return newActiveControllers();
}
}));
registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new Gauge<Double>() {
@Override
public Double value() {
return avgIdleTimeRatio.measure(METRIC_CONFIG, time.milliseconds());
}
}));
}

public void updateIdleTime(long idleDurationMs) {
avgIdleTimeRatio.record(METRIC_CONFIG, (double) idleDurationMs, time.milliseconds());
}

public void addTimeSinceLastHeartbeatMetric(int brokerId) {
Expand Down Expand Up @@ -291,7 +309,8 @@ public void close() {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT
NEW_ACTIVE_CONTROLLERS_COUNT,
AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testMetricNames() {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
Expand Down Expand Up @@ -188,6 +189,30 @@ public void testTimeSinceLastHeartbeatReceivedMs() {
}
}

@Test
public void testAvgIdleRatio() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
Gauge<Double> avgIdleRatio = (Gauge<Double>) registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));

// No idle time recorded yet
assertEquals(1.0, avgIdleRatio.value());

metrics.updateIdleTime(10);
time.sleep(40);
metrics.updateIdleTime(20);
assertEquals(0.5, avgIdleRatio.value());

time.sleep(20);
metrics.updateIdleTime(1);
assertEquals(0.05, avgIdleRatio.value());

} finally {
registry.shutdown();
}
}

private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.metrics.TimeRatio;

import java.util.List;
import java.util.OptionalLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

Expand Down Expand Up @@ -278,21 +280,24 @@ private void handleEvents() {
remove(toRun);
continue;
}
if (awaitNs == Long.MAX_VALUE) {
try {

long startIdleNs = time.nanoseconds();
try {
if (awaitNs == Long.MAX_VALUE) {
cond.await();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a new event. " +
"Shutting down event queue");
interrupted = true;
}
} else {
try {
} else {
cond.awaitNanos(awaitNs);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for a deferred event. " +
"Shutting down event queue");
interrupted = true;
}
} catch (InterruptedException e) {

log.warn("Interrupted while waiting for a {} event. " +
"Shutting down event queue", (awaitNs == Long.MAX_VALUE) ? "new" : "deferred");
interrupted = true;
} finally {
if (idleTimeCallback != null) {
long idleNs = Math.max(time.nanoseconds() - startIdleNs, 0);
long idleMs = TimeUnit.NANOSECONDS.toMillis(idleNs);
idleTimeCallback.accept(idleMs);
}
}
} finally {
Expand Down Expand Up @@ -440,6 +445,12 @@ int size() {
*/
private boolean interrupted;

/**
* Optional callback for queue idle time tracking.
*/
private Consumer<Long> idleTimeCallback;


public KafkaEventQueue(
Time time,
LogContext logContext,
Expand Down Expand Up @@ -503,6 +514,10 @@ public void beginShutdown(String source) {
}
}

public void setIdleTimeCallback(Consumer<Long> callback) {
this.idleTimeCallback = callback;
}

@Override
public int size() {
return eventHandler.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;

import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.internals;
package org.apache.kafka.server.metrics;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
Expand Down