-
Notifications
You must be signed in to change notification settings - Fork 646
Expand file tree
/
Copy pathLuaTimeoutManager.cs
More file actions
338 lines (291 loc) · 12.4 KB
/
LuaTimeoutManager.cs
File metadata and controls
338 lines (291 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace Garnet.server.Lua
{
/// <summary>
/// Central place to manage timeouts being injected into running Lua scripts.
///
/// We use this because each <see cref="LuaRunner"/> starting it's own timer
/// or similar has substantial overhead.
///
/// Timeouts are explicitly best effort, there's no guarantee we'll time something out _exactly_
/// when it runs too long. We will time it out _eventually_ once it has run too long.
/// </summary>
/// <remarks>
/// This is complex functionality.
///
/// Complications are:
/// 1. Timeouts are _rare_, so significant overhead must be avoided
/// - We cannot afford to allocate timers/tasks/etc. per-invocation due to this
/// 2. Scripts can be active on many threads
/// - However, only 1 script will be active PER thread
/// 3. Scripts can complete after we've decided to "time them out", so there's a natural race
///
/// The rough design is:
/// - <see cref="SessionScriptCache"/>s are registered with the <see cref="LuaTimeoutManager"/>
/// - <see cref="SessionScriptCache"/>s now track the active <see cref="LuaRunner"/> (if any)
/// - When a script starts, we get a unique token so we can distinguish the natural race
/// - A dedicate thread is ticking periodically, walking the <see cref="SessionScriptCache"/>s and triggering timeouts
/// </remarks>
internal sealed class LuaTimeoutManager : IDisposable
{
internal sealed class Registration : IDisposable
{
private readonly LuaTimeoutManager owner;
// Bottom half is cookie, top half is count
private ulong packedState;
internal SessionScriptCache ScriptCache { get; }
internal uint Cookie => (uint)(packedState >> 32);
internal uint Count => (uint)packedState;
internal Registration(LuaTimeoutManager owner, SessionScriptCache cache)
{
this.owner = owner;
ScriptCache = cache;
}
/// <summary>
/// Update value of <see cref="Cookie"/> atomically.
/// </summary>
internal void SetCookie(uint cookie)
{
_ = Interlocked.Exchange(ref packedState, cookie);
}
/// <summary>
/// Advance count on current registration.
///
/// Returns true if should be cancelled.
///
/// If true, <paramref name="cookie"/> will be set to the value that identifies
/// this registration.
/// </summary>
internal bool AdvanceTimeout(out uint cookie)
{
// Advances the top half of packedState by 1
const ulong IncrBy = 1UL << 32;
// +1 here because at the point of registration, the current tick is some % of the way
// complete. So we need to wait an additional one to make sure we don't cancel early.
const ulong TriggerAfter = (ulong)(TimeoutDivisions + 1) << 32;
var oldValue = Volatile.Read(ref packedState);
if ((uint)oldValue == 0)
{
// Current registration isn't active (cookie == 0)
Unsafe.SkipInit(out cookie);
return false;
}
var newValue = oldValue + IncrBy;
if (Interlocked.CompareExchange(ref packedState, newValue, oldValue) != oldValue)
{
// Cookie was set from some other thread, this registration cannot timeout yet
Unsafe.SkipInit(out cookie);
return false;
}
if (newValue >= TriggerAfter)
{
// It's been the requisit number of ticks since the registration, timeout
cookie = (uint)newValue;
return true;
}
Unsafe.SkipInit(out cookie);
return false;
}
/// <inheritdoc/>
public void Dispose()
{
owner.RemoveRegistration(this);
}
}
// Rather than track proper timestamps we just count up to some number
// and use that to trigger timeouts.
private const int TimeoutDivisions = 10;
private readonly TimeSpan frequency;
private readonly ILogger logger;
// Shared thread for all timeouts
//
// Using a Thread instead of some variant of a Timer for promptness,
// because the API fits better, and because going through the thread
// pool has considerable overhead.
private Thread timerThread;
private CancellationTokenSource timerThreadCts;
private Registration[] registrations;
internal LuaTimeoutManager(TimeSpan timeout, ILogger logger = null)
{
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentException($"Timeout must be >= 0, was {timeout}");
}
this.logger = logger;
frequency = TimeSpan.FromTicks(timeout.Ticks / TimeoutDivisions); // Should get us to +/- 10% of the desired timeout, which is fine
if (frequency < TimeSpan.FromMilliseconds(1))
{
// Below 1ms, it doesn't really make sense to try and be precise of timeouts - too much jitter
frequency = TimeSpan.FromMilliseconds(1);
}
int initialRegistrationsSize;
#if DEBUG
// In Debug, force growth of registrations frequently
initialRegistrationsSize = 1;
#else
// In Release, make a decent guess at the max for perf reasons
initialRegistrationsSize = Environment.ProcessorCount;
#endif
registrations = new Registration[initialRegistrationsSize];
logger?.LogInformation("Created LuaTimeoutManager with space for {initialRegistrationSize} timeout registrations", initialRegistrationsSize);
}
/// <inheritdoc/>
public void Dispose()
{
timerThreadCts?.Cancel();
timerThread?.Join();
timerThreadCts?.Dispose();
}
/// <summary>
/// Start this <see cref="LuaTimeoutManager"/>.
/// </summary>
internal void Start()
{
timerThreadCts = new CancellationTokenSource();
timerThread =
new Thread(
() =>
{
var token = timerThreadCts.Token;
while (!token.IsCancellationRequested)
{
if (token.WaitHandle.WaitOne(frequency))
{
return;
}
// We will slip a bit here, but timeouts are a best effort thing anyway
TickTimeouts();
}
}
)
{
Name = $"{nameof(LuaTimeoutManager)}",
IsBackground = true,
};
timerThread.Start();
}
/// <summary>
/// Register the given <see cref="SessionScriptCache"/> for a timeout notification.
///
/// A runner can only be registered a single time, by a single thread.
///
/// Returns a <see cref="Registration"/> that uniquely identifies identifies this registration.
///
/// Dispose the <see cref="Registration"/> to remove the cache from timeout notifications.
/// </summary>
internal Registration RegisterForTimeout(SessionScriptCache cache)
{
var ret = new Registration(this, cache);
var potentiallyInserted = false;
var curRegistrations = Volatile.Read(ref registrations);
tryAgain:
if (potentiallyInserted)
{
// If we're trying again, it's because registrations grew
for (var i = 0; i < curRegistrations.Length; i++)
{
if (Interlocked.CompareExchange(ref curRegistrations[i], null, null) == ret)
{
return ret;
}
}
}
// Scan for an open slot
for (var i = 0; i < curRegistrations.Length; i++)
{
if (Interlocked.CompareExchange(ref curRegistrations[i], ret, null) == null)
{
potentiallyInserted = true;
goto checkUnmodified;
}
}
// Fell through, grow registrations and retry
var newSize = curRegistrations.Length * 2;
var newRegistrations = new Registration[newSize];
for (var i = 0; i < curRegistrations.Length; i++)
{
newRegistrations[i] = Interlocked.CompareExchange(ref curRegistrations[i], null, null);
}
newRegistrations[curRegistrations.Length] = ret;
Registration[] updatedRegistrations;
if ((updatedRegistrations = Interlocked.CompareExchange(ref registrations, newRegistrations, curRegistrations)) == curRegistrations)
{
// This thread won, so we know we successfully inserted the registration
logger?.LogInformation("Grew LuaTimeoutManager registration space to {newSize}", newSize);
return ret;
}
else
{
// So other thread won, just update our reference and try again
curRegistrations = updatedRegistrations;
goto tryAgain;
}
// Other threads might update registrations, so check that before returning
checkUnmodified:
if ((updatedRegistrations = Interlocked.CompareExchange(ref registrations, curRegistrations, curRegistrations)) != curRegistrations)
{
// Another thread grew registrations, retry
curRegistrations = updatedRegistrations;
goto tryAgain;
}
return ret;
}
/// <summary>
/// Remove a previously created registration
/// </summary>
private void RemoveRegistration(Registration toRemove)
{
var removedAtLeastOnce = false;
var curRegistrations = Volatile.Read(ref registrations);
tryAgain:
for (var i = 0; i < curRegistrations.Length; i++)
{
if (Interlocked.CompareExchange(ref curRegistrations[i], null, toRemove) == toRemove)
{
removedAtLeastOnce = true;
goto checkUnmodified;
}
}
// Scanned all the way through and _didn't_ find our registration
//
// This implies we're retrying due to modification, and the thread doing that update
// saw our previous removal. That's fine.
Debug.Assert(removedAtLeastOnce, "We should have seen at least one removal succeed");
return;
checkUnmodified:
Registration[] updatedRegistrations;
if ((updatedRegistrations = Interlocked.CompareExchange(ref registrations, null, null)) != curRegistrations)
{
// Some other thread modified registrations, we need to try again
curRegistrations = updatedRegistrations;
goto tryAgain;
}
}
/// <summary>
/// Invoked periodically to check timeouts on active registrations.
/// </summary>
private void TickTimeouts()
{
var curRegistrations = Volatile.Read(ref registrations);
for (var i = 0; i < curRegistrations.Length; i++)
{
var reg = curRegistrations[i];
if (reg == null)
{
continue;
}
if (reg.AdvanceTimeout(out var cookie))
{
reg.ScriptCache.RequestTimeout(cookie);
break;
}
}
}
}
}