Skip to content

Commit 04252d4

Browse files
Refactoring throttled logging
1 parent df7be39 commit 04252d4

File tree

5 files changed

+232
-243
lines changed

5 files changed

+232
-243
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,9 @@ public List<RestHandler> getRestHandlers(
243243
@Override
244244
public Collection<?> createComponents(PluginServices services) {
245245
var components = new ArrayList<>();
246-
var throttlerManager = new ThrottlerManager(settings, services.threadPool(), services.clusterService());
246+
var throttlerManager = new ThrottlerManager(settings, services.threadPool());
247+
throttlerManager.init(services.clusterService());
248+
247249
var truncator = new Truncator(settings, services.clusterService());
248250
serviceComponents.set(new ServiceComponents(services.threadPool(), throttlerManager, settings, truncator));
249251
threadPoolSetOnce.set(services.threadPool());

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/logging/Throttler.java

Lines changed: 102 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -9,150 +9,185 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.core.TimeValue;
1314
import org.elasticsearch.threadpool.Scheduler;
1415
import org.elasticsearch.threadpool.ThreadPool;
1516

1617
import java.io.Closeable;
1718
import java.time.Clock;
18-
import java.time.Duration;
1919
import java.time.Instant;
2020
import java.util.Objects;
2121
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.concurrent.ConcurrentMap;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicLong;
2425
import java.util.concurrent.atomic.AtomicReference;
2526
import java.util.function.Consumer;
2627

27-
import static org.elasticsearch.core.Strings.format;
2828
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;
2929

3030
/**
31-
* A class that throttles calls to a logger. If a log call is made during the throttle period a counter is incremented.
32-
* If a log call occurs after the throttle period, then the call will proceed, and it will include a message like
33-
* "repeated X times" to indicate how often the message was attempting to be logged.
31+
* A class that throttles calls to a logger. The first unique log message is permitted to emit a message. Any subsequent log messages
32+
* matching a message that has already been emitted will only increment a counter. A thread runs on an interval
33+
* to emit any log messages that have been repeated beyond the initial emitted message. Once the thread emits a repeated
34+
* message the counter is reset. If another message is received matching a previously emitted message by the thread, it will be consider
35+
* the first time a unique message is received and will be logged.
3436
*/
3537
public class Throttler implements Closeable {
3638

3739
private static final Logger classLogger = LogManager.getLogger(Throttler.class);
3840

39-
private final TimeValue resetInterval;
40-
private Duration durationToWait;
41+
private final TimeValue loggingInterval;
4142
private final Clock clock;
4243
private final ConcurrentMap<String, LogExecutor> logExecutors;
4344
private final AtomicReference<Scheduler.Cancellable> cancellableTask = new AtomicReference<>();
4445
private final AtomicBoolean isRunning = new AtomicBoolean(true);
46+
private final ThreadPool threadPool;
4547

4648
/**
47-
* Constructs the throttler and kicks of a scheduled tasks to clear the internal stats.
48-
*
49-
* @param resetInterval the frequency for clearing the internal stats. This protects against an ever growing
50-
* cache
51-
* @param durationToWait the amount of time to wait before logging a message after the threshold
52-
* is reached
49+
* @param loggingInterval the frequency to run a task to emit repeated log messages
5350
* @param threadPool a thread pool for running a scheduled task to clear the internal stats
5451
*/
55-
public Throttler(TimeValue resetInterval, TimeValue durationToWait, ThreadPool threadPool) {
56-
this(resetInterval, durationToWait, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>());
52+
public Throttler(TimeValue loggingInterval, ThreadPool threadPool) {
53+
this(loggingInterval, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>());
54+
}
55+
56+
/**
57+
* @param oldThrottler a previous throttler that is being replaced
58+
* @param loggingInterval the frequency to run a task to emit repeated log messages
59+
*/
60+
public Throttler(Throttler oldThrottler, TimeValue loggingInterval) {
61+
this(loggingInterval, oldThrottler.clock, oldThrottler.threadPool, new ConcurrentHashMap<>(oldThrottler.logExecutors));
5762
}
5863

5964
/**
6065
* This should only be used directly for testing.
6166
*/
62-
Throttler(
63-
TimeValue resetInterval,
64-
TimeValue durationToWait,
65-
Clock clock,
66-
ThreadPool threadPool,
67-
ConcurrentMap<String, LogExecutor> logExecutors
68-
) {
69-
Objects.requireNonNull(durationToWait);
70-
Objects.requireNonNull(threadPool);
71-
72-
this.resetInterval = Objects.requireNonNull(resetInterval);
73-
this.durationToWait = Duration.ofMillis(durationToWait.millis());
67+
Throttler(TimeValue loggingInterval, Clock clock, ThreadPool threadPool, ConcurrentMap<String, LogExecutor> logExecutors) {
68+
this.threadPool = Objects.requireNonNull(threadPool);
69+
this.loggingInterval = Objects.requireNonNull(loggingInterval);
7470
this.clock = Objects.requireNonNull(clock);
7571
this.logExecutors = Objects.requireNonNull(logExecutors);
72+
}
7673

77-
this.cancellableTask.set(startResetTask(threadPool));
74+
public void init() {
75+
cancellableTask.set(startRepeatingLogEmitter());
7876
}
7977

80-
private Scheduler.Cancellable startResetTask(ThreadPool threadPool) {
81-
classLogger.debug(() -> format("Reset task scheduled with interval [%s]", resetInterval));
78+
private Scheduler.Cancellable startRepeatingLogEmitter() {
79+
classLogger.debug(() -> Strings.format("Scheduling repeating log emitter with interval [%s]", loggingInterval));
8280

83-
return threadPool.scheduleWithFixedDelay(logExecutors::clear, resetInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME));
81+
return threadPool.scheduleWithFixedDelay(this::emitRepeatedLogs, loggingInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME));
8482
}
8583

86-
public void setDurationToWait(TimeValue durationToWait) {
87-
this.durationToWait = Duration.ofMillis(durationToWait.millis());
84+
private void emitRepeatedLogs() {
85+
if (isRunning.get() == false) {
86+
return;
87+
}
88+
89+
for (var iter = logExecutors.values().iterator(); iter.hasNext();) {
90+
var executor = iter.next();
91+
iter.remove();
92+
executor.logRepeatedMessages();
93+
}
8894
}
8995

90-
public void execute(String message, Consumer<String> consumer) {
96+
public void execute(String message, Consumer<String> logCallback) {
9197
if (isRunning.get() == false) {
9298
return;
9399
}
94100

95-
LogExecutor logExecutor = logExecutors.compute(message, (key, value) -> {
101+
var logExecutor = logExecutors.compute(message, (key, value) -> {
96102
if (value == null) {
97-
return new LogExecutor(clock, consumer);
103+
return new LogExecutor(clock, logCallback, message);
98104
}
99105

100-
return value.compute(consumer, durationToWait);
106+
return value;
101107
});
102108

103-
// This executes an internal consumer that wraps the passed in one, it will either log the message passed here
104-
// unchanged, do nothing if it is in the throttled period, or log this message + some text saying how many times it was repeated
105-
logExecutor.log(message);
109+
logExecutor.logFirstMessage();
106110
}
107111

108112
@Override
109113
public void close() {
110114
isRunning.set(false);
111-
cancellableTask.get().cancel();
115+
if (cancellableTask.get() != null) {
116+
cancellableTask.get().cancel();
117+
}
112118
logExecutors.clear();
113119
}
114120

115-
private static class LogExecutor {
116-
private final long skippedLogCalls;
117-
private final Instant timeOfLastLogCall;
121+
// default for testing
122+
static class LogExecutor {
123+
// -1 here because we need to determine if we haven't logged the first time
124+
// After the first time we'll set it to 0, then the thread that runs on an interval
125+
// needs to know if there are any repeated message, if it sees 0, it knows there are none
126+
// and skips emitting the message again.
127+
private static final long INITIAL_LOG_COUNTER_VALUE = -1;
128+
129+
private final AtomicLong skippedLogCalls;
130+
private Instant timeOfLastLogCall;
118131
private final Clock clock;
119-
private final Consumer<String> consumer;
132+
private final Consumer<String> throttledConsumer;
133+
private final String originalMessage;
120134

121-
LogExecutor(Clock clock, Consumer<String> throttledConsumer) {
122-
this(clock, 0, throttledConsumer);
135+
LogExecutor(Clock clock, Consumer<String> throttledConsumer, String originalMessage) {
136+
this(clock, INITIAL_LOG_COUNTER_VALUE, throttledConsumer, originalMessage);
123137
}
124138

125-
LogExecutor(Clock clock, long skippedLogCalls, Consumer<String> consumer) {
126-
this.skippedLogCalls = skippedLogCalls;
139+
LogExecutor(Clock clock, long skippedLogCalls, Consumer<String> throttledConsumer, String originalMessage) {
140+
this.skippedLogCalls = new AtomicLong(skippedLogCalls);
127141
this.clock = Objects.requireNonNull(clock);
128142
timeOfLastLogCall = Instant.now(this.clock);
129-
this.consumer = Objects.requireNonNull(consumer);
143+
this.throttledConsumer = Objects.requireNonNull(throttledConsumer);
144+
this.originalMessage = Objects.requireNonNull(originalMessage);
130145
}
131146

132-
void log(String message) {
133-
this.consumer.accept(message);
147+
void logRepeatedMessages() {
148+
long numSkippedLogCalls;
149+
synchronized (skippedLogCalls) {
150+
numSkippedLogCalls = skippedLogCalls.get();
151+
if (hasRepeatedLogsToEmit(numSkippedLogCalls) == false) {
152+
return;
153+
}
154+
}
155+
156+
String enrichedMessage;
157+
if (numSkippedLogCalls == 1) {
158+
enrichedMessage = Strings.format("%s, repeated 1 time, last message at [%s]", originalMessage, timeOfLastLogCall);
159+
} else {
160+
enrichedMessage = Strings.format(
161+
"%s, repeated %s times, last message at [%s]",
162+
originalMessage,
163+
skippedLogCalls,
164+
timeOfLastLogCall
165+
);
166+
}
167+
168+
throttledConsumer.accept(enrichedMessage);
134169
}
135170

136-
LogExecutor compute(Consumer<String> executor, Duration durationToWait) {
137-
if (hasDurationExpired(durationToWait)) {
138-
String messageToAppend = "";
139-
if (this.skippedLogCalls == 1) {
140-
messageToAppend = ", repeated 1 time";
141-
} else if (this.skippedLogCalls > 1) {
142-
messageToAppend = format(", repeated %s times", this.skippedLogCalls);
143-
}
171+
private static boolean hasRepeatedLogsToEmit(long numSkippedLogCalls) {
172+
return numSkippedLogCalls > 0;
173+
}
144174

145-
final String stringToAppend = messageToAppend;
146-
return new LogExecutor(this.clock, 0, (message) -> executor.accept(message.concat(stringToAppend)));
175+
void logFirstMessage() {
176+
long numSkippedLogCalls;
177+
synchronized (skippedLogCalls) {
178+
numSkippedLogCalls = skippedLogCalls.getAndIncrement();
147179
}
148180

149-
// This creates a consumer that won't do anything because the original consumer is being throttled
150-
return new LogExecutor(this.clock, this.skippedLogCalls + 1, (message) -> {});
181+
timeOfLastLogCall = Instant.now(this.clock);
182+
183+
if (hasLoggedOriginalMessage(numSkippedLogCalls) == false) {
184+
this.throttledConsumer.accept(originalMessage);
185+
}
151186
}
152187

153-
private boolean hasDurationExpired(Duration durationToWait) {
154-
Instant now = Instant.now(clock);
155-
return now.isAfter(timeOfLastLogCall.plus(durationToWait));
188+
private static boolean hasLoggedOriginalMessage(long numSkippedLogCalls) {
189+
// a negative value indicates that we haven't yet logged the original message
190+
return numSkippedLogCalls >= 0;
156191
}
157192
}
158193
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/logging/ThrottlerManager.java

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.settings.Setting;
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.core.TimeValue;
15+
import org.elasticsearch.core.UpdateForV10;
1516
import org.elasticsearch.threadpool.ThreadPool;
1617

1718
import java.io.Closeable;
@@ -24,8 +25,11 @@
2425
public class ThrottlerManager implements Closeable {
2526
private static final TimeValue DEFAULT_STATS_RESET_INTERVAL_TIME = TimeValue.timeValueDays(1);
2627
/**
27-
* A setting specifying the interval for clearing the cached log message stats
28+
* Legacy log throttling setting, kept for BWC compatibility. This setting has no effect in 9.1.0 and later. Do not use.
29+
* TODO remove in 10.0
2830
*/
31+
@UpdateForV10(owner = UpdateForV10.Owner.MACHINE_LEARNING)
32+
@Deprecated
2933
public static final Setting<TimeValue> STATS_RESET_INTERVAL_SETTING = Setting.timeSetting(
3034
"xpack.inference.logging.reset_interval",
3135
DEFAULT_STATS_RESET_INTERVAL_TIME,
@@ -35,48 +39,54 @@ public class ThrottlerManager implements Closeable {
3539

3640
private static final TimeValue DEFAULT_WAIT_DURATION_TIME = TimeValue.timeValueHours(1);
3741
/**
38-
* A setting specifying the amount of time to wait after a log call occurs before allowing another log call.
42+
* Legacy log throttling setting, kept for BWC compatibility. This setting has no effect in 9.1.0 and later. Do not use.
43+
* TODO remove in 10.0
3944
*/
45+
@UpdateForV10(owner = UpdateForV10.Owner.MACHINE_LEARNING)
46+
@Deprecated
4047
public static final Setting<TimeValue> LOGGER_WAIT_DURATION_SETTING = Setting.timeSetting(
4148
"xpack.inference.logging.wait_duration",
4249
DEFAULT_WAIT_DURATION_TIME,
4350
Setting.Property.NodeScope,
4451
Setting.Property.Dynamic
4552
);
4653

47-
private final ThreadPool threadPool;
48-
private Throttler throttler;
49-
private LoggerSettings loggerSettings;
54+
private static final TimeValue DEFAULT_LOG_EMIT_INTERVAL = TimeValue.timeValueHours(1);
55+
/**
56+
* This setting specifies how often a thread will run to emit repeated log messages.
57+
*/
58+
public static final Setting<TimeValue> LOG_EMIT_INTERVAL = Setting.timeSetting(
59+
"xpack.inference.logging.log_interval",
60+
DEFAULT_LOG_EMIT_INTERVAL,
61+
Setting.Property.NodeScope,
62+
Setting.Property.Dynamic
63+
);
5064

51-
public ThrottlerManager(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
52-
Objects.requireNonNull(settings);
53-
Objects.requireNonNull(clusterService);
65+
private volatile TimeValue logInterval;
5466

55-
this.threadPool = Objects.requireNonNull(threadPool);
56-
this.loggerSettings = LoggerSettings.fromSettings(settings);
67+
private Throttler throttler;
5768

58-
throttler = new Throttler(loggerSettings.resetInterval(), loggerSettings.waitDuration(), threadPool);
59-
this.addSettingsUpdateConsumers(clusterService);
60-
}
69+
public ThrottlerManager(Settings settings, ThreadPool threadPool) {
70+
Objects.requireNonNull(settings);
6171

62-
private void addSettingsUpdateConsumers(ClusterService clusterService) {
63-
clusterService.getClusterSettings().addSettingsUpdateConsumer(STATS_RESET_INTERVAL_SETTING, this::setResetInterval);
64-
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOGGER_WAIT_DURATION_SETTING, this::setWaitDuration);
72+
throttler = new Throttler(LOG_EMIT_INTERVAL.get(settings), threadPool);
73+
throttler.init();
6574
}
6675

67-
// default for testing
68-
void setWaitDuration(TimeValue waitDuration) {
69-
loggerSettings = loggerSettings.createWithWaitDuration(waitDuration);
76+
public void init(ClusterService clusterService) {
77+
Objects.requireNonNull(clusterService);
7078

71-
throttler.setDurationToWait(waitDuration);
79+
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOG_EMIT_INTERVAL, this::setLogInterval);
7280
}
7381

7482
// default for testing
75-
void setResetInterval(TimeValue resetInterval) {
76-
loggerSettings = loggerSettings.createWithResetInterval(resetInterval);
83+
void setLogInterval(TimeValue logInterval) {
84+
this.logInterval = logInterval;
7785

78-
throttler.close();
79-
throttler = new Throttler(loggerSettings.resetInterval(), loggerSettings.waitDuration(), threadPool);
86+
var oldThrottler = throttler;
87+
throttler = new Throttler(oldThrottler, this.logInterval);
88+
throttler.init();
89+
oldThrottler.close();
8090
}
8191

8292
// default for testing
@@ -103,25 +113,6 @@ public void close() {
103113
}
104114

105115
public static List<Setting<?>> getSettingsDefinitions() {
106-
return List.of(STATS_RESET_INTERVAL_SETTING, LOGGER_WAIT_DURATION_SETTING);
107-
}
108-
109-
private record LoggerSettings(TimeValue resetInterval, TimeValue waitDuration) {
110-
LoggerSettings {
111-
Objects.requireNonNull(resetInterval);
112-
Objects.requireNonNull(waitDuration);
113-
}
114-
115-
static LoggerSettings fromSettings(Settings settings) {
116-
return new LoggerSettings(STATS_RESET_INTERVAL_SETTING.get(settings), LOGGER_WAIT_DURATION_SETTING.get(settings));
117-
}
118-
119-
LoggerSettings createWithResetInterval(TimeValue resetInterval) {
120-
return new LoggerSettings(resetInterval, waitDuration);
121-
}
122-
123-
LoggerSettings createWithWaitDuration(TimeValue waitDuration) {
124-
return new LoggerSettings(resetInterval, waitDuration);
125-
}
116+
return List.of(LOG_EMIT_INTERVAL, STATS_RESET_INTERVAL_SETTING, LOGGER_WAIT_DURATION_SETTING);
126117
}
127118
}

0 commit comments

Comments
 (0)