Skip to content

Commit 2fd70d9

Browse files
[ML] Refactoring inference API throttled logging (#124923)
* Refactoring throttled logging * Renaming setting * Resetting log counter when attempt to log for repeated messages * Clean up * Refactoring locking * Comments * Working tests * Addressing logbuilder issue * Fixing tests
1 parent f04761c commit 2fd70d9

File tree

5 files changed

+450
-282
lines changed

5 files changed

+450
-282
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,9 @@ public List<RestHandler> getRestHandlers(
248248
@Override
249249
public Collection<?> createComponents(PluginServices services) {
250250
var components = new ArrayList<>();
251-
var throttlerManager = new ThrottlerManager(settings, services.threadPool(), services.clusterService());
251+
var throttlerManager = new ThrottlerManager(settings, services.threadPool());
252+
throttlerManager.init(services.clusterService());
253+
252254
var truncator = new Truncator(settings, services.clusterService());
253255
serviceComponents.set(new ServiceComponents(services.threadPool(), throttlerManager, settings, truncator));
254256
threadPoolSetOnce.set(services.threadPool());

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/logging/Throttler.java

Lines changed: 145 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -7,152 +7,222 @@
77

88
package org.elasticsearch.xpack.inference.logging;
99

10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.LogBuilder;
1012
import org.apache.logging.log4j.LogManager;
1113
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.core.Nullable;
1216
import org.elasticsearch.core.TimeValue;
1317
import org.elasticsearch.threadpool.Scheduler;
1418
import org.elasticsearch.threadpool.ThreadPool;
1519

1620
import java.io.Closeable;
1721
import java.time.Clock;
18-
import java.time.Duration;
1922
import java.time.Instant;
2023
import java.util.Objects;
2124
import java.util.concurrent.ConcurrentHashMap;
2225
import java.util.concurrent.ConcurrentMap;
2326
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicLong;
2428
import java.util.concurrent.atomic.AtomicReference;
25-
import java.util.function.Consumer;
29+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2630

27-
import static org.elasticsearch.core.Strings.format;
2831
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;
2932

3033
/**
31-
* A class that throttles calls to a logger. If a log call is made during the throttle period a counter is incremented.
32-
* If a log call occurs after the throttle period, then the call will proceed, and it will include a message like
33-
* "repeated X times" to indicate how often the message was attempting to be logged.
34+
* A class that throttles calls to a logger. The first unique log message is permitted to emit a message. Any subsequent log messages
35+
* matching a message that has already been emitted will only increment a counter. A thread runs on an interval
36+
* to emit any log messages that have been repeated beyond the initial emitted message. Once the thread emits a repeated
37+
* message the counter is reset. If another message is received matching a previously emitted message by the thread, it will be consider
38+
* the first time a unique message is received and will be logged.
3439
*/
3540
public class Throttler implements Closeable {
3641

3742
private static final Logger classLogger = LogManager.getLogger(Throttler.class);
3843

39-
private final TimeValue resetInterval;
40-
private Duration durationToWait;
44+
private final TimeValue loggingInterval;
4145
private final Clock clock;
4246
private final ConcurrentMap<String, LogExecutor> logExecutors;
4347
private final AtomicReference<Scheduler.Cancellable> cancellableTask = new AtomicReference<>();
4448
private final AtomicBoolean isRunning = new AtomicBoolean(true);
49+
private final ThreadPool threadPool;
50+
// This lock governs the ability of the utility thread to get exclusive access to remove entries
51+
// from the map
52+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
4553

4654
/**
47-
* Constructs the throttler and kicks of a scheduled tasks to clear the internal stats.
48-
*
49-
* @param resetInterval the frequency for clearing the internal stats. This protects against an ever growing
50-
* cache
51-
* @param durationToWait the amount of time to wait before logging a message after the threshold
52-
* is reached
55+
* @param loggingInterval the frequency to run a task to emit repeated log messages
5356
* @param threadPool a thread pool for running a scheduled task to clear the internal stats
5457
*/
55-
public Throttler(TimeValue resetInterval, TimeValue durationToWait, ThreadPool threadPool) {
56-
this(resetInterval, durationToWait, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>());
58+
public Throttler(TimeValue loggingInterval, ThreadPool threadPool) {
59+
this(loggingInterval, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>());
60+
}
61+
62+
/**
63+
* @param oldThrottler a previous throttler that is being replaced
64+
* @param loggingInterval the frequency to run a task to emit repeated log messages
65+
*/
66+
public Throttler(Throttler oldThrottler, TimeValue loggingInterval) {
67+
this(loggingInterval, oldThrottler.clock, oldThrottler.threadPool, new ConcurrentHashMap<>(oldThrottler.logExecutors));
5768
}
5869

5970
/**
6071
* This should only be used directly for testing.
6172
*/
62-
Throttler(
63-
TimeValue resetInterval,
64-
TimeValue durationToWait,
65-
Clock clock,
66-
ThreadPool threadPool,
67-
ConcurrentMap<String, LogExecutor> logExecutors
68-
) {
69-
Objects.requireNonNull(durationToWait);
70-
Objects.requireNonNull(threadPool);
71-
72-
this.resetInterval = Objects.requireNonNull(resetInterval);
73-
this.durationToWait = Duration.ofMillis(durationToWait.millis());
73+
Throttler(TimeValue loggingInterval, Clock clock, ThreadPool threadPool, ConcurrentMap<String, LogExecutor> logExecutors) {
74+
this.threadPool = Objects.requireNonNull(threadPool);
75+
this.loggingInterval = Objects.requireNonNull(loggingInterval);
7476
this.clock = Objects.requireNonNull(clock);
7577
this.logExecutors = Objects.requireNonNull(logExecutors);
78+
}
7679

77-
this.cancellableTask.set(startResetTask(threadPool));
80+
public void init() {
81+
cancellableTask.set(startRepeatingLogEmitter());
7882
}
7983

80-
private Scheduler.Cancellable startResetTask(ThreadPool threadPool) {
81-
classLogger.debug(() -> format("Reset task scheduled with interval [%s]", resetInterval));
84+
private Scheduler.Cancellable startRepeatingLogEmitter() {
85+
classLogger.debug(() -> Strings.format("Scheduling repeating log emitter with interval [%s]", loggingInterval));
86+
87+
return threadPool.scheduleWithFixedDelay(this::emitRepeatedLogs, loggingInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME));
88+
}
89+
90+
private void emitRepeatedLogs() {
91+
if (isRunning.get() == false) {
92+
return;
93+
}
94+
95+
final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
8296

83-
return threadPool.scheduleWithFixedDelay(logExecutors::clear, resetInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME));
97+
writeLock.lock();
98+
try {
99+
for (var iter = logExecutors.values().iterator(); iter.hasNext();) {
100+
var executor = iter.next();
101+
executor.logRepeatedMessages();
102+
iter.remove();
103+
}
104+
} finally {
105+
writeLock.unlock();
106+
}
84107
}
85108

86-
public void setDurationToWait(TimeValue durationToWait) {
87-
this.durationToWait = Duration.ofMillis(durationToWait.millis());
109+
public void execute(Logger logger, Level level, String message, Throwable t) {
110+
executeInternal(logger, level, message, t);
88111
}
89112

90-
public void execute(String message, Consumer<String> consumer) {
113+
public void execute(Logger logger, Level level, String message) {
114+
executeInternal(logger, level, message, null);
115+
}
116+
117+
private void executeInternal(Logger logger, Level level, String message, Throwable throwable) {
91118
if (isRunning.get() == false) {
92119
return;
93120
}
94121

95-
LogExecutor logExecutor = logExecutors.compute(message, (key, value) -> {
96-
if (value == null) {
97-
return new LogExecutor(clock, consumer);
98-
}
122+
final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
99123

100-
return value.compute(consumer, durationToWait);
101-
});
124+
readLock.lock();
125+
try {
126+
var logExecutor = logExecutors.compute(
127+
message,
128+
(key, value) -> Objects.requireNonNullElseGet(value, () -> new LogExecutor(clock, logger, level, message, throwable))
129+
);
102130

103-
// This executes an internal consumer that wraps the passed in one, it will either log the message passed here
104-
// unchanged, do nothing if it is in the throttled period, or log this message + some text saying how many times it was repeated
105-
logExecutor.log(message);
131+
logExecutor.logFirstMessage();
132+
} finally {
133+
readLock.unlock();
134+
}
106135
}
107136

108137
@Override
109138
public void close() {
110139
isRunning.set(false);
111-
cancellableTask.get().cancel();
112-
logExecutors.clear();
140+
if (cancellableTask.get() != null) {
141+
cancellableTask.get().cancel();
142+
}
143+
144+
clearLogExecutors();
145+
}
146+
147+
private void clearLogExecutors() {
148+
final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
149+
writeLock.lock();
150+
try {
151+
logExecutors.clear();
152+
} finally {
153+
writeLock.unlock();
154+
}
113155
}
114156

115157
private static class LogExecutor {
116-
private final long skippedLogCalls;
117-
private final Instant timeOfLastLogCall;
158+
// -1 here because we need to determine if we haven't logged the first time
159+
// After the first time we'll set it to 0, then the thread that runs on an interval
160+
// needs to know if there are any repeated message, if it sees 0, it knows there are none
161+
// and skips emitting the message again.
162+
private static final long INITIAL_LOG_COUNTER_VALUE = -1;
163+
164+
private final AtomicLong skippedLogCalls = new AtomicLong(INITIAL_LOG_COUNTER_VALUE);
165+
private final AtomicReference<Instant> timeOfLastLogCall;
118166
private final Clock clock;
119-
private final Consumer<String> consumer;
167+
private final Logger throttledLogger;
168+
private final Level level;
169+
private final String originalMessage;
170+
private final Throwable throwable;
120171

121-
LogExecutor(Clock clock, Consumer<String> throttledConsumer) {
122-
this(clock, 0, throttledConsumer);
123-
}
124-
125-
LogExecutor(Clock clock, long skippedLogCalls, Consumer<String> consumer) {
126-
this.skippedLogCalls = skippedLogCalls;
172+
LogExecutor(Clock clock, Logger logger, Level level, String originalMessage, @Nullable Throwable throwable) {
127173
this.clock = Objects.requireNonNull(clock);
128-
timeOfLastLogCall = Instant.now(this.clock);
129-
this.consumer = Objects.requireNonNull(consumer);
174+
timeOfLastLogCall = new AtomicReference<>(Instant.now(this.clock));
175+
this.throttledLogger = Objects.requireNonNull(logger);
176+
this.level = Objects.requireNonNull(level);
177+
this.originalMessage = Objects.requireNonNull(originalMessage);
178+
this.throwable = throwable;
130179
}
131180

132-
void log(String message) {
133-
this.consumer.accept(message);
181+
void logRepeatedMessages() {
182+
var numSkippedLogCalls = skippedLogCalls.get();
183+
if (hasRepeatedLogsToEmit(numSkippedLogCalls) == false) {
184+
return;
185+
}
186+
187+
String enrichedMessage;
188+
if (numSkippedLogCalls == 1) {
189+
enrichedMessage = Strings.format("%s, repeated 1 time, last message at [%s]", originalMessage, timeOfLastLogCall.get());
190+
} else {
191+
enrichedMessage = Strings.format(
192+
"%s, repeated %s times, last message at [%s]",
193+
originalMessage,
194+
skippedLogCalls,
195+
timeOfLastLogCall.get()
196+
);
197+
}
198+
199+
log(enrichedMessage);
134200
}
135201

136-
LogExecutor compute(Consumer<String> executor, Duration durationToWait) {
137-
if (hasDurationExpired(durationToWait)) {
138-
String messageToAppend = "";
139-
if (this.skippedLogCalls == 1) {
140-
messageToAppend = ", repeated 1 time";
141-
} else if (this.skippedLogCalls > 1) {
142-
messageToAppend = format(", repeated %s times", this.skippedLogCalls);
143-
}
144-
145-
final String stringToAppend = messageToAppend;
146-
return new LogExecutor(this.clock, 0, (message) -> executor.accept(message.concat(stringToAppend)));
202+
private void log(String enrichedMessage) {
203+
LogBuilder builder = throttledLogger.atLevel(level);
204+
if (throwable != null) {
205+
builder = builder.withThrowable(throwable);
147206
}
148207

149-
// This creates a consumer that won't do anything because the original consumer is being throttled
150-
return new LogExecutor(this.clock, this.skippedLogCalls + 1, (message) -> {});
208+
builder.log(enrichedMessage);
209+
}
210+
211+
private static boolean hasRepeatedLogsToEmit(long numSkippedLogCalls) {
212+
return numSkippedLogCalls > 0;
213+
}
214+
215+
void logFirstMessage() {
216+
timeOfLastLogCall.set(Instant.now(this.clock));
217+
218+
if (hasLoggedOriginalMessage(skippedLogCalls.getAndIncrement()) == false) {
219+
log(originalMessage);
220+
}
151221
}
152222

153-
private boolean hasDurationExpired(Duration durationToWait) {
154-
Instant now = Instant.now(clock);
155-
return now.isAfter(timeOfLastLogCall.plus(durationToWait));
223+
private static boolean hasLoggedOriginalMessage(long numSkippedLogCalls) {
224+
// a negative value indicates that we haven't yet logged the original message
225+
return numSkippedLogCalls >= 0;
156226
}
157227
}
158228
}

0 commit comments

Comments
 (0)