Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ public List<RestHandler> getRestHandlers(
@Override
public Collection<?> createComponents(PluginServices services) {
var components = new ArrayList<>();
var throttlerManager = new ThrottlerManager(settings, services.threadPool(), services.clusterService());
var throttlerManager = new ThrottlerManager(settings, services.threadPool());
throttlerManager.init(services.clusterService());

var truncator = new Truncator(settings, services.clusterService());
serviceComponents.set(new ServiceComponents(services.threadPool(), throttlerManager, settings, truncator));
threadPoolSetOnce.set(services.threadPool());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,152 +7,222 @@

package org.elasticsearch.xpack.inference.logging;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
Copy link
Contributor Author

@jonathan-buttner jonathan-buttner Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would ignore the previous version of this class and review this like it's a new file. It has changed pretty significantly.

import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;

/**
* A class that throttles calls to a logger. If a log call is made during the throttle period a counter is incremented.
* If a log call occurs after the throttle period, then the call will proceed, and it will include a message like
* "repeated X times" to indicate how often the message was attempting to be logged.
* A class that throttles calls to a logger. The first unique log message is permitted to emit a message. Any subsequent log messages
* matching a message that has already been emitted will only increment a counter. A thread runs on an interval
* to emit any log messages that have been repeated beyond the initial emitted message. Once the thread emits a repeated
* message the counter is reset. If another message is received matching a previously emitted message by the thread, it will be consider
* the first time a unique message is received and will be logged.
*/
public class Throttler implements Closeable {

private static final Logger classLogger = LogManager.getLogger(Throttler.class);

private final TimeValue resetInterval;
private Duration durationToWait;
private final TimeValue loggingInterval;
private final Clock clock;
private final ConcurrentMap<String, LogExecutor> logExecutors;
private final AtomicReference<Scheduler.Cancellable> cancellableTask = new AtomicReference<>();
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final ThreadPool threadPool;
// This lock governs the ability of the utility thread to get exclusive access to remove entries
// from the map
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

/**
* Constructs the throttler and kicks of a scheduled tasks to clear the internal stats.
*
* @param resetInterval the frequency for clearing the internal stats. This protects against an ever growing
* cache
* @param durationToWait the amount of time to wait before logging a message after the threshold
* is reached
* @param loggingInterval the frequency to run a task to emit repeated log messages
* @param threadPool a thread pool for running a scheduled task to clear the internal stats
*/
public Throttler(TimeValue resetInterval, TimeValue durationToWait, ThreadPool threadPool) {
this(resetInterval, durationToWait, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>());
public Throttler(TimeValue loggingInterval, ThreadPool threadPool) {
this(loggingInterval, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>());
}

/**
* @param oldThrottler a previous throttler that is being replaced
* @param loggingInterval the frequency to run a task to emit repeated log messages
*/
public Throttler(Throttler oldThrottler, TimeValue loggingInterval) {
this(loggingInterval, oldThrottler.clock, oldThrottler.threadPool, new ConcurrentHashMap<>(oldThrottler.logExecutors));
}

/**
* This should only be used directly for testing.
*/
Throttler(
TimeValue resetInterval,
TimeValue durationToWait,
Clock clock,
ThreadPool threadPool,
ConcurrentMap<String, LogExecutor> logExecutors
) {
Objects.requireNonNull(durationToWait);
Objects.requireNonNull(threadPool);

this.resetInterval = Objects.requireNonNull(resetInterval);
this.durationToWait = Duration.ofMillis(durationToWait.millis());
Throttler(TimeValue loggingInterval, Clock clock, ThreadPool threadPool, ConcurrentMap<String, LogExecutor> logExecutors) {
this.threadPool = Objects.requireNonNull(threadPool);
this.loggingInterval = Objects.requireNonNull(loggingInterval);
this.clock = Objects.requireNonNull(clock);
this.logExecutors = Objects.requireNonNull(logExecutors);
}

this.cancellableTask.set(startResetTask(threadPool));
public void init() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init() class are to avoid a very unlikely this leak if the object hadn't finished it construction yet.

cancellableTask.set(startRepeatingLogEmitter());
}

private Scheduler.Cancellable startResetTask(ThreadPool threadPool) {
classLogger.debug(() -> format("Reset task scheduled with interval [%s]", resetInterval));
private Scheduler.Cancellable startRepeatingLogEmitter() {
classLogger.debug(() -> Strings.format("Scheduling repeating log emitter with interval [%s]", loggingInterval));

return threadPool.scheduleWithFixedDelay(this::emitRepeatedLogs, loggingInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME));
}

private void emitRepeatedLogs() {
if (isRunning.get() == false) {
return;
}

final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

return threadPool.scheduleWithFixedDelay(logExecutors::clear, resetInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME));
writeLock.lock();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locks so that we don't get inconsistencies while the logging the repeated messages. Otherwise the counts could be incorrect.

try {
for (var iter = logExecutors.values().iterator(); iter.hasNext();) {
var executor = iter.next();
executor.logRepeatedMessages();
iter.remove();
}
} finally {
writeLock.unlock();
}
}

public void setDurationToWait(TimeValue durationToWait) {
this.durationToWait = Duration.ofMillis(durationToWait.millis());
public void execute(Logger logger, Level level, String message, Throwable t) {
executeInternal(logger, level, message, t);
}

public void execute(String message, Consumer<String> consumer) {
public void execute(Logger logger, Level level, String message) {
executeInternal(logger, level, message, null);
}

private void executeInternal(Logger logger, Level level, String message, Throwable throwable) {
if (isRunning.get() == false) {
return;
}

LogExecutor logExecutor = logExecutors.compute(message, (key, value) -> {
if (value == null) {
return new LogExecutor(clock, consumer);
}
final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

return value.compute(consumer, durationToWait);
});
readLock.lock();
try {
var logExecutor = logExecutors.compute(
message,
(key, value) -> Objects.requireNonNullElseGet(value, () -> new LogExecutor(clock, logger, level, message, throwable))
);

// This executes an internal consumer that wraps the passed in one, it will either log the message passed here
// unchanged, do nothing if it is in the throttled period, or log this message + some text saying how many times it was repeated
logExecutor.log(message);
logExecutor.logFirstMessage();
} finally {
readLock.unlock();
}
}

@Override
public void close() {
isRunning.set(false);
cancellableTask.get().cancel();
logExecutors.clear();
if (cancellableTask.get() != null) {
cancellableTask.get().cancel();
}

clearLogExecutors();
}

private void clearLogExecutors() {
final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
try {
logExecutors.clear();
} finally {
writeLock.unlock();
}
}

private static class LogExecutor {
private final long skippedLogCalls;
private final Instant timeOfLastLogCall;
// -1 here because we need to determine if we haven't logged the first time
// After the first time we'll set it to 0, then the thread that runs on an interval
// needs to know if there are any repeated message, if it sees 0, it knows there are none
// and skips emitting the message again.
private static final long INITIAL_LOG_COUNTER_VALUE = -1;

private final AtomicLong skippedLogCalls = new AtomicLong(INITIAL_LOG_COUNTER_VALUE);
private final AtomicReference<Instant> timeOfLastLogCall;
private final Clock clock;
private final Consumer<String> consumer;
private final Logger throttledLogger;
private final Level level;
private final String originalMessage;
private final Throwable throwable;

LogExecutor(Clock clock, Consumer<String> throttledConsumer) {
this(clock, 0, throttledConsumer);
}

LogExecutor(Clock clock, long skippedLogCalls, Consumer<String> consumer) {
this.skippedLogCalls = skippedLogCalls;
LogExecutor(Clock clock, Logger logger, Level level, String originalMessage, @Nullable Throwable throwable) {
this.clock = Objects.requireNonNull(clock);
timeOfLastLogCall = Instant.now(this.clock);
this.consumer = Objects.requireNonNull(consumer);
timeOfLastLogCall = new AtomicReference<>(Instant.now(this.clock));
this.throttledLogger = Objects.requireNonNull(logger);
this.level = Objects.requireNonNull(level);
this.originalMessage = Objects.requireNonNull(originalMessage);
this.throwable = throwable;
}

void log(String message) {
this.consumer.accept(message);
void logRepeatedMessages() {
var numSkippedLogCalls = skippedLogCalls.get();
if (hasRepeatedLogsToEmit(numSkippedLogCalls) == false) {
return;
}

String enrichedMessage;
if (numSkippedLogCalls == 1) {
enrichedMessage = Strings.format("%s, repeated 1 time, last message at [%s]", originalMessage, timeOfLastLogCall.get());
} else {
enrichedMessage = Strings.format(
"%s, repeated %s times, last message at [%s]",
originalMessage,
skippedLogCalls,
timeOfLastLogCall.get()
);
}

log(enrichedMessage);
}

LogExecutor compute(Consumer<String> executor, Duration durationToWait) {
if (hasDurationExpired(durationToWait)) {
String messageToAppend = "";
if (this.skippedLogCalls == 1) {
messageToAppend = ", repeated 1 time";
} else if (this.skippedLogCalls > 1) {
messageToAppend = format(", repeated %s times", this.skippedLogCalls);
}

final String stringToAppend = messageToAppend;
return new LogExecutor(this.clock, 0, (message) -> executor.accept(message.concat(stringToAppend)));
private void log(String enrichedMessage) {
LogBuilder builder = throttledLogger.atLevel(level);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally tried constructing the LogBuilder in the manager class but it will fail to log because the LogBuilder log() method must be called on the constructing thread.

if (throwable != null) {
builder = builder.withThrowable(throwable);
}

// This creates a consumer that won't do anything because the original consumer is being throttled
return new LogExecutor(this.clock, this.skippedLogCalls + 1, (message) -> {});
builder.log(enrichedMessage);
}

private static boolean hasRepeatedLogsToEmit(long numSkippedLogCalls) {
return numSkippedLogCalls > 0;
}

void logFirstMessage() {
timeOfLastLogCall.set(Instant.now(this.clock));

if (hasLoggedOriginalMessage(skippedLogCalls.getAndIncrement()) == false) {
log(originalMessage);
}
}

private boolean hasDurationExpired(Duration durationToWait) {
Instant now = Instant.now(clock);
return now.isAfter(timeOfLastLogCall.plus(durationToWait));
private static boolean hasLoggedOriginalMessage(long numSkippedLogCalls) {
// a negative value indicates that we haven't yet logged the original message
return numSkippedLogCalls >= 0;
}
}
}
Loading
Loading