|
7 | 7 |
|
8 | 8 | package org.elasticsearch.xpack.inference.logging; |
9 | 9 |
|
| 10 | +import org.apache.logging.log4j.Level; |
| 11 | +import org.apache.logging.log4j.LogBuilder; |
10 | 12 | import org.apache.logging.log4j.LogManager; |
11 | 13 | import org.apache.logging.log4j.Logger; |
| 14 | +import org.elasticsearch.common.Strings; |
| 15 | +import org.elasticsearch.core.Nullable; |
12 | 16 | import org.elasticsearch.core.TimeValue; |
13 | 17 | import org.elasticsearch.threadpool.Scheduler; |
14 | 18 | import org.elasticsearch.threadpool.ThreadPool; |
15 | 19 |
|
16 | 20 | import java.io.Closeable; |
17 | 21 | import java.time.Clock; |
18 | | -import java.time.Duration; |
19 | 22 | import java.time.Instant; |
20 | 23 | import java.util.Objects; |
21 | 24 | import java.util.concurrent.ConcurrentHashMap; |
22 | 25 | import java.util.concurrent.ConcurrentMap; |
23 | 26 | import java.util.concurrent.atomic.AtomicBoolean; |
| 27 | +import java.util.concurrent.atomic.AtomicLong; |
24 | 28 | import java.util.concurrent.atomic.AtomicReference; |
25 | | -import java.util.function.Consumer; |
| 29 | +import java.util.concurrent.locks.ReentrantReadWriteLock; |
26 | 30 |
|
27 | | -import static org.elasticsearch.core.Strings.format; |
28 | 31 | import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME; |
29 | 32 |
|
30 | 33 | /** |
31 | | - * A class that throttles calls to a logger. If a log call is made during the throttle period a counter is incremented. |
32 | | - * If a log call occurs after the throttle period, then the call will proceed, and it will include a message like |
33 | | - * "repeated X times" to indicate how often the message was attempting to be logged. |
| 34 | + * A class that throttles calls to a logger. The first unique log message is permitted to emit a message. Any subsequent log messages |
| 35 | + * matching a message that has already been emitted will only increment a counter. A thread runs on an interval |
| 36 | + * to emit any log messages that have been repeated beyond the initial emitted message. Once the thread emits a repeated |
| 37 | + * message the counter is reset. If another message is received matching a previously emitted message by the thread, it will be consider |
| 38 | + * the first time a unique message is received and will be logged. |
34 | 39 | */ |
35 | 40 | public class Throttler implements Closeable { |
36 | 41 |
|
37 | 42 | private static final Logger classLogger = LogManager.getLogger(Throttler.class); |
38 | 43 |
|
39 | | - private final TimeValue resetInterval; |
40 | | - private Duration durationToWait; |
| 44 | + private final TimeValue loggingInterval; |
41 | 45 | private final Clock clock; |
42 | 46 | private final ConcurrentMap<String, LogExecutor> logExecutors; |
43 | 47 | private final AtomicReference<Scheduler.Cancellable> cancellableTask = new AtomicReference<>(); |
44 | 48 | private final AtomicBoolean isRunning = new AtomicBoolean(true); |
| 49 | + private final ThreadPool threadPool; |
| 50 | + // This lock governs the ability of the utility thread to get exclusive access to remove entries |
| 51 | + // from the map |
| 52 | + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
45 | 53 |
|
46 | 54 | /** |
47 | | - * Constructs the throttler and kicks of a scheduled tasks to clear the internal stats. |
48 | | - * |
49 | | - * @param resetInterval the frequency for clearing the internal stats. This protects against an ever growing |
50 | | - * cache |
51 | | - * @param durationToWait the amount of time to wait before logging a message after the threshold |
52 | | - * is reached |
| 55 | + * @param loggingInterval the frequency to run a task to emit repeated log messages |
53 | 56 | * @param threadPool a thread pool for running a scheduled task to clear the internal stats |
54 | 57 | */ |
55 | | - public Throttler(TimeValue resetInterval, TimeValue durationToWait, ThreadPool threadPool) { |
56 | | - this(resetInterval, durationToWait, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>()); |
| 58 | + public Throttler(TimeValue loggingInterval, ThreadPool threadPool) { |
| 59 | + this(loggingInterval, Clock.systemUTC(), threadPool, new ConcurrentHashMap<>()); |
| 60 | + } |
| 61 | + |
| 62 | + /** |
| 63 | + * @param oldThrottler a previous throttler that is being replaced |
| 64 | + * @param loggingInterval the frequency to run a task to emit repeated log messages |
| 65 | + */ |
| 66 | + public Throttler(Throttler oldThrottler, TimeValue loggingInterval) { |
| 67 | + this(loggingInterval, oldThrottler.clock, oldThrottler.threadPool, new ConcurrentHashMap<>(oldThrottler.logExecutors)); |
57 | 68 | } |
58 | 69 |
|
59 | 70 | /** |
60 | 71 | * This should only be used directly for testing. |
61 | 72 | */ |
62 | | - Throttler( |
63 | | - TimeValue resetInterval, |
64 | | - TimeValue durationToWait, |
65 | | - Clock clock, |
66 | | - ThreadPool threadPool, |
67 | | - ConcurrentMap<String, LogExecutor> logExecutors |
68 | | - ) { |
69 | | - Objects.requireNonNull(durationToWait); |
70 | | - Objects.requireNonNull(threadPool); |
71 | | - |
72 | | - this.resetInterval = Objects.requireNonNull(resetInterval); |
73 | | - this.durationToWait = Duration.ofMillis(durationToWait.millis()); |
| 73 | + Throttler(TimeValue loggingInterval, Clock clock, ThreadPool threadPool, ConcurrentMap<String, LogExecutor> logExecutors) { |
| 74 | + this.threadPool = Objects.requireNonNull(threadPool); |
| 75 | + this.loggingInterval = Objects.requireNonNull(loggingInterval); |
74 | 76 | this.clock = Objects.requireNonNull(clock); |
75 | 77 | this.logExecutors = Objects.requireNonNull(logExecutors); |
| 78 | + } |
76 | 79 |
|
77 | | - this.cancellableTask.set(startResetTask(threadPool)); |
| 80 | + public void init() { |
| 81 | + cancellableTask.set(startRepeatingLogEmitter()); |
78 | 82 | } |
79 | 83 |
|
80 | | - private Scheduler.Cancellable startResetTask(ThreadPool threadPool) { |
81 | | - classLogger.debug(() -> format("Reset task scheduled with interval [%s]", resetInterval)); |
| 84 | + private Scheduler.Cancellable startRepeatingLogEmitter() { |
| 85 | + classLogger.debug(() -> Strings.format("Scheduling repeating log emitter with interval [%s]", loggingInterval)); |
| 86 | + |
| 87 | + return threadPool.scheduleWithFixedDelay(this::emitRepeatedLogs, loggingInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME)); |
| 88 | + } |
| 89 | + |
| 90 | + private void emitRepeatedLogs() { |
| 91 | + if (isRunning.get() == false) { |
| 92 | + return; |
| 93 | + } |
| 94 | + |
| 95 | + final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); |
82 | 96 |
|
83 | | - return threadPool.scheduleWithFixedDelay(logExecutors::clear, resetInterval, threadPool.executor(UTILITY_THREAD_POOL_NAME)); |
| 97 | + writeLock.lock(); |
| 98 | + try { |
| 99 | + for (var iter = logExecutors.values().iterator(); iter.hasNext();) { |
| 100 | + var executor = iter.next(); |
| 101 | + executor.logRepeatedMessages(); |
| 102 | + iter.remove(); |
| 103 | + } |
| 104 | + } finally { |
| 105 | + writeLock.unlock(); |
| 106 | + } |
84 | 107 | } |
85 | 108 |
|
86 | | - public void setDurationToWait(TimeValue durationToWait) { |
87 | | - this.durationToWait = Duration.ofMillis(durationToWait.millis()); |
| 109 | + public void execute(Logger logger, Level level, String message, Throwable t) { |
| 110 | + executeInternal(logger, level, message, t); |
88 | 111 | } |
89 | 112 |
|
90 | | - public void execute(String message, Consumer<String> consumer) { |
| 113 | + public void execute(Logger logger, Level level, String message) { |
| 114 | + executeInternal(logger, level, message, null); |
| 115 | + } |
| 116 | + |
| 117 | + private void executeInternal(Logger logger, Level level, String message, Throwable throwable) { |
91 | 118 | if (isRunning.get() == false) { |
92 | 119 | return; |
93 | 120 | } |
94 | 121 |
|
95 | | - LogExecutor logExecutor = logExecutors.compute(message, (key, value) -> { |
96 | | - if (value == null) { |
97 | | - return new LogExecutor(clock, consumer); |
98 | | - } |
| 122 | + final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); |
99 | 123 |
|
100 | | - return value.compute(consumer, durationToWait); |
101 | | - }); |
| 124 | + readLock.lock(); |
| 125 | + try { |
| 126 | + var logExecutor = logExecutors.compute( |
| 127 | + message, |
| 128 | + (key, value) -> Objects.requireNonNullElseGet(value, () -> new LogExecutor(clock, logger, level, message, throwable)) |
| 129 | + ); |
102 | 130 |
|
103 | | - // This executes an internal consumer that wraps the passed in one, it will either log the message passed here |
104 | | - // unchanged, do nothing if it is in the throttled period, or log this message + some text saying how many times it was repeated |
105 | | - logExecutor.log(message); |
| 131 | + logExecutor.logFirstMessage(); |
| 132 | + } finally { |
| 133 | + readLock.unlock(); |
| 134 | + } |
106 | 135 | } |
107 | 136 |
|
108 | 137 | @Override |
109 | 138 | public void close() { |
110 | 139 | isRunning.set(false); |
111 | | - cancellableTask.get().cancel(); |
112 | | - logExecutors.clear(); |
| 140 | + if (cancellableTask.get() != null) { |
| 141 | + cancellableTask.get().cancel(); |
| 142 | + } |
| 143 | + |
| 144 | + clearLogExecutors(); |
| 145 | + } |
| 146 | + |
| 147 | + private void clearLogExecutors() { |
| 148 | + final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); |
| 149 | + writeLock.lock(); |
| 150 | + try { |
| 151 | + logExecutors.clear(); |
| 152 | + } finally { |
| 153 | + writeLock.unlock(); |
| 154 | + } |
113 | 155 | } |
114 | 156 |
|
115 | 157 | private static class LogExecutor { |
116 | | - private final long skippedLogCalls; |
117 | | - private final Instant timeOfLastLogCall; |
| 158 | + // -1 here because we need to determine if we haven't logged the first time |
| 159 | + // After the first time we'll set it to 0, then the thread that runs on an interval |
| 160 | + // needs to know if there are any repeated message, if it sees 0, it knows there are none |
| 161 | + // and skips emitting the message again. |
| 162 | + private static final long INITIAL_LOG_COUNTER_VALUE = -1; |
| 163 | + |
| 164 | + private final AtomicLong skippedLogCalls = new AtomicLong(INITIAL_LOG_COUNTER_VALUE); |
| 165 | + private final AtomicReference<Instant> timeOfLastLogCall; |
118 | 166 | private final Clock clock; |
119 | | - private final Consumer<String> consumer; |
| 167 | + private final Logger throttledLogger; |
| 168 | + private final Level level; |
| 169 | + private final String originalMessage; |
| 170 | + private final Throwable throwable; |
120 | 171 |
|
121 | | - LogExecutor(Clock clock, Consumer<String> throttledConsumer) { |
122 | | - this(clock, 0, throttledConsumer); |
123 | | - } |
124 | | - |
125 | | - LogExecutor(Clock clock, long skippedLogCalls, Consumer<String> consumer) { |
126 | | - this.skippedLogCalls = skippedLogCalls; |
| 172 | + LogExecutor(Clock clock, Logger logger, Level level, String originalMessage, @Nullable Throwable throwable) { |
127 | 173 | this.clock = Objects.requireNonNull(clock); |
128 | | - timeOfLastLogCall = Instant.now(this.clock); |
129 | | - this.consumer = Objects.requireNonNull(consumer); |
| 174 | + timeOfLastLogCall = new AtomicReference<>(Instant.now(this.clock)); |
| 175 | + this.throttledLogger = Objects.requireNonNull(logger); |
| 176 | + this.level = Objects.requireNonNull(level); |
| 177 | + this.originalMessage = Objects.requireNonNull(originalMessage); |
| 178 | + this.throwable = throwable; |
130 | 179 | } |
131 | 180 |
|
132 | | - void log(String message) { |
133 | | - this.consumer.accept(message); |
| 181 | + void logRepeatedMessages() { |
| 182 | + var numSkippedLogCalls = skippedLogCalls.get(); |
| 183 | + if (hasRepeatedLogsToEmit(numSkippedLogCalls) == false) { |
| 184 | + return; |
| 185 | + } |
| 186 | + |
| 187 | + String enrichedMessage; |
| 188 | + if (numSkippedLogCalls == 1) { |
| 189 | + enrichedMessage = Strings.format("%s, repeated 1 time, last message at [%s]", originalMessage, timeOfLastLogCall.get()); |
| 190 | + } else { |
| 191 | + enrichedMessage = Strings.format( |
| 192 | + "%s, repeated %s times, last message at [%s]", |
| 193 | + originalMessage, |
| 194 | + skippedLogCalls, |
| 195 | + timeOfLastLogCall.get() |
| 196 | + ); |
| 197 | + } |
| 198 | + |
| 199 | + log(enrichedMessage); |
134 | 200 | } |
135 | 201 |
|
136 | | - LogExecutor compute(Consumer<String> executor, Duration durationToWait) { |
137 | | - if (hasDurationExpired(durationToWait)) { |
138 | | - String messageToAppend = ""; |
139 | | - if (this.skippedLogCalls == 1) { |
140 | | - messageToAppend = ", repeated 1 time"; |
141 | | - } else if (this.skippedLogCalls > 1) { |
142 | | - messageToAppend = format(", repeated %s times", this.skippedLogCalls); |
143 | | - } |
144 | | - |
145 | | - final String stringToAppend = messageToAppend; |
146 | | - return new LogExecutor(this.clock, 0, (message) -> executor.accept(message.concat(stringToAppend))); |
| 202 | + private void log(String enrichedMessage) { |
| 203 | + LogBuilder builder = throttledLogger.atLevel(level); |
| 204 | + if (throwable != null) { |
| 205 | + builder = builder.withThrowable(throwable); |
147 | 206 | } |
148 | 207 |
|
149 | | - // This creates a consumer that won't do anything because the original consumer is being throttled |
150 | | - return new LogExecutor(this.clock, this.skippedLogCalls + 1, (message) -> {}); |
| 208 | + builder.log(enrichedMessage); |
| 209 | + } |
| 210 | + |
| 211 | + private static boolean hasRepeatedLogsToEmit(long numSkippedLogCalls) { |
| 212 | + return numSkippedLogCalls > 0; |
| 213 | + } |
| 214 | + |
| 215 | + void logFirstMessage() { |
| 216 | + timeOfLastLogCall.set(Instant.now(this.clock)); |
| 217 | + |
| 218 | + if (hasLoggedOriginalMessage(skippedLogCalls.getAndIncrement()) == false) { |
| 219 | + log(originalMessage); |
| 220 | + } |
151 | 221 | } |
152 | 222 |
|
153 | | - private boolean hasDurationExpired(Duration durationToWait) { |
154 | | - Instant now = Instant.now(clock); |
155 | | - return now.isAfter(timeOfLastLogCall.plus(durationToWait)); |
| 223 | + private static boolean hasLoggedOriginalMessage(long numSkippedLogCalls) { |
| 224 | + // a negative value indicates that we haven't yet logged the original message |
| 225 | + return numSkippedLogCalls >= 0; |
156 | 226 | } |
157 | 227 | } |
158 | 228 | } |
0 commit comments