Skip to content

Commit 4202f7e

Browse files
authored
Adds improved batching algorithm for lookups. (#8523)
1 parent 5a222a0 commit 4202f7e

40 files changed

+1098
-559
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
namespace GreenDonut;
2+
3+
/// <summary>
4+
/// <para>
5+
/// Represents a unit of work to be executed by a batch dispatcher.
6+
/// A batch groups together multiple keys or requests that can be
7+
/// resolved in a single execution step.
8+
/// </para>
9+
/// <para>
10+
/// This abstract base class defines the minimal contract required
11+
/// for schedulers and dispatchers to interact with batches without
12+
/// depending on their concrete key or value types.
13+
/// </para>
14+
/// </summary>
15+
public abstract class Batch
16+
{
17+
/// <summary>
18+
/// <para>
19+
/// Gets the number of items currently contained in this batch.
20+
/// </para>
21+
/// <para>
22+
/// This reflects the current size of the batch and can be used
23+
/// to decide whether to dispatch early (e.g., when reaching a
24+
/// maximum size threshold).
25+
/// </para>
26+
/// </summary>
27+
public abstract int Size { get; }
28+
29+
/// <summary>
30+
/// <para>
31+
/// Gets the current status of this batch.
32+
/// </para>
33+
/// <para>
34+
/// The status indicates whether the batch is newly created, has
35+
/// been observed ("touched") by the scheduler in the current
36+
/// turn/epoch, or is ready for dispatch.
37+
/// </para>
38+
/// </summary>
39+
public abstract BatchStatus Status { get; }
40+
41+
/// <summary>
42+
/// Gets a high-resolution timestamp from representing the last time an item was added to this batch.
43+
/// This value is used for recency checks in scheduling decisions.
44+
/// </summary>
45+
public abstract long ModifiedTimestamp { get; }
46+
47+
/// <summary>
48+
/// <para>
49+
/// Marks the batch as "touched" by the scheduler.
50+
/// </para>
51+
/// <para>
52+
/// This is typically called when the scheduler observes the batch
53+
/// during a scheduling turn, indicating that it has been considered
54+
/// for dispatch. A touched batch may be given a short grace period
55+
/// or additional turn to gather more items before being dispatched.
56+
/// </para>
57+
/// </summary>
58+
/// <returns>
59+
/// <c>true</c> if it did not change since it last was touched.
60+
/// </returns>
61+
public abstract bool Touch();
62+
63+
/// <summary>
64+
/// Dispatch this batch.
65+
/// </summary>
66+
public abstract Task DispatchAsync();
67+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
namespace GreenDonut;
2+
3+
/// <summary>
4+
/// Defines the lifecycle states of a batch within the scheduler/dispatcher pipeline.
5+
/// </summary>
6+
public enum BatchStatus
7+
{
8+
/// <summary>
9+
/// <para>
10+
/// The batch has received new work items (keys, requests, etc.)
11+
/// since the last scheduler observation.
12+
/// </para>
13+
/// <para>
14+
/// This state is typically set when the first item is enqueued into
15+
/// an empty batch or when new items are added after the previous
16+
/// scheduler turn, indicating that the batch may be eligible for
17+
/// dispatch in the near future.
18+
/// </para>
19+
/// </summary>
20+
Enqueued = 1,
21+
22+
/// <summary>
23+
/// <para>
24+
/// The batch has been observed ("touched") by the scheduler or dispatcher
25+
/// in the current scheduling turn.
26+
/// </para>
27+
/// <para>
28+
/// This state is used as a hint that the batch has been considered for
29+
/// dispatch but is being given an additional opportunity to accumulate
30+
/// more items before execution. Typically, a touched batch will be
31+
/// dispatched on the next turn if it receives no new items.
32+
/// </para>
33+
/// </summary>
34+
Touched = 2
35+
}
Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
namespace GreenDonut;
22

33
/// <summary>
4-
/// The batch scheduler is used by the DataLoader to defer the data fetching
5-
/// work to a batch dispatcher that will execute the batches.
4+
/// The batch scheduler is used by DataLoaders to coordinate efficient batch execution
5+
/// of data fetching operations. The scheduler defers individual data requests into
6+
/// batches that are processed by a batch dispatcher, optimizing throughput while
7+
/// maintaining low latency.
68
/// </summary>
79
public interface IBatchScheduler
810
{
911
/// <summary>
10-
/// Schedules work.
12+
/// <para>
13+
/// Schedules a batch for execution. The batch will be queued and processed
14+
/// by the batch dispatcher using an intelligent coordination strategy that
15+
/// prioritizes batches based on their modification timestamp to ensure
16+
/// optimal batching efficiency.
17+
/// </para>
1118
/// </summary>
12-
/// <param name="dispatch">
13-
/// A delegate that represents the work.
19+
/// <param name="batch">
20+
/// The batch containing one or more data loading keys that should be
21+
/// scheduled for coordinated execution.
1422
/// </param>
15-
void Schedule(Func<ValueTask> dispatch);
23+
void Schedule(Batch batch);
1624
}
Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
11
namespace GreenDonut;
22

33
/// <summary>
4-
/// Defines a batch dispatcher that immediately dispatches batch jobs.
4+
/// A simple batch scheduler implementation that immediately dispatches batches
5+
/// without coordination or batching optimization. This scheduler prioritizes
6+
/// low latency over batching efficiency by executing each batch as soon as
7+
/// it is scheduled.
58
/// </summary>
69
public class AutoBatchScheduler : IBatchScheduler
710
{
811
/// <summary>
9-
/// Schedules a new job to the dispatcher that is immediately executed.
12+
/// Schedules a batch for immediate execution. The batch is dispatched
13+
/// asynchronously on a background thread without waiting for additional
14+
/// keys or coordination with other batches.
1015
/// </summary>
11-
/// <param name="dispatch">
12-
/// The job that is being scheduled.
16+
/// <param name="batch">
17+
/// The batch to be immediately dispatched for execution.
1318
/// </param>
14-
public void Schedule(Func<ValueTask> dispatch)
15-
=> BeginDispatch(dispatch);
19+
public void Schedule(Batch batch)
20+
=> BeginDispatch(batch);
1621

17-
private static void BeginDispatch(Func<ValueTask> dispatch)
22+
private static void BeginDispatch(Batch batch)
1823
=> Task.Run(async () =>
1924
{
2025
try
2126
{
22-
await dispatch().ConfigureAwait(false);
27+
await batch.DispatchAsync().ConfigureAwait(false);
2328
}
2429
catch
2530
{
@@ -28,7 +33,7 @@ private static void BeginDispatch(Func<ValueTask> dispatch)
2833
});
2934

3035
/// <summary>
31-
/// Gets the default instance if the <see cref="AutoBatchScheduler"/>.
36+
/// Gets the default shared instance of the <see cref="AutoBatchScheduler"/>.
3237
/// </summary>
3338
public static AutoBatchScheduler Default { get; } = new();
3439
}

src/GreenDonut/src/GreenDonut/Batch.cs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,43 @@
1+
using System.Diagnostics;
2+
13
namespace GreenDonut;
24

3-
internal class Batch<TKey> where TKey : notnull
5+
internal sealed class Batch<TKey> : Batch where TKey : notnull
46
{
7+
private const int Enqueued = 1;
8+
private const int Touched = 2;
9+
510
private readonly List<TKey> _keys = [];
611
private readonly Dictionary<TKey, IPromise> _items = [];
12+
private Func<Batch<TKey>, CancellationToken, ValueTask> _dispatch = null!;
13+
private CancellationToken _ct = CancellationToken.None;
14+
private int _status = Enqueued;
15+
private long _timestamp;
716

817
public bool IsScheduled { get; set; }
918

10-
public int Size => _keys.Count;
11-
1219
public IReadOnlyList<TKey> Keys => _keys;
1320

21+
public override int Size => _keys.Count;
22+
23+
public override BatchStatus Status => (BatchStatus)_status;
24+
25+
public override long ModifiedTimestamp => _timestamp;
26+
27+
public override bool Touch()
28+
{
29+
var previous = Interlocked.Exchange(ref _status, Touched);
30+
return previous == Touched;
31+
}
32+
1433
public Promise<TValue> GetOrCreatePromise<TValue>(TKey key, bool allowCachePropagation)
1534
{
35+
// we mark the batch as enqueued even if we did not really enqueued something.
36+
// as long as there are components interacting with this batch its good to
37+
// keep it in enqueued state.
38+
Interlocked.Exchange(ref _status, Enqueued);
39+
_timestamp = Stopwatch.GetTimestamp();
40+
1641
if (_items.TryGetValue(key, out var value))
1742
{
1843
return (Promise<TValue>)value;
@@ -29,10 +54,23 @@ public Promise<TValue> GetOrCreatePromise<TValue>(TKey key, bool allowCachePropa
2954
public Promise<TValue> GetPromise<TValue>(TKey key)
3055
=> (Promise<TValue>)_items[key];
3156

57+
public override async Task DispatchAsync()
58+
=> await _dispatch(this, _ct);
59+
60+
internal void Initialize(Func<Batch<TKey>, CancellationToken, ValueTask> dispatch, CancellationToken ct)
61+
{
62+
_status = Enqueued;
63+
_dispatch = dispatch;
64+
_ct = ct;
65+
}
66+
3267
internal void ClearUnsafe()
3368
{
3469
_keys.Clear();
3570
_items.Clear();
3671
IsScheduled = false;
72+
_status = Enqueued;
73+
_dispatch = null!;
74+
_ct = CancellationToken.None;
3775
}
3876
}

src/GreenDonut/src/GreenDonut/DataLoaderBase.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ protected internal DataLoaderOptions Options
182182
// we dispatch after everything is enqueued.
183183
if (_currentBatch is { IsScheduled: false })
184184
{
185-
ScheduleBatchUnsafe(_currentBatch, ct);
185+
ScheduleBatchUnsafe(_currentBatch);
186186
}
187187
}
188188

@@ -401,25 +401,25 @@ async ValueTask StartDispatchingAsync()
401401
// we will schedule it before issuing a new batch.
402402
if (!current.IsScheduled)
403403
{
404-
ScheduleBatchUnsafe(current, ct);
404+
ScheduleBatchUnsafe(current);
405405
}
406406
}
407407

408-
var newBatch = _currentBatch = BatchPool<TKey>.Shared.Get();
408+
var newBatch = _currentBatch = RentBatch(ct);
409409
var newPromise = newBatch.GetOrCreatePromise<TValue?>(key, allowCachePropagation);
410410

411411
if (scheduleOnNewBatch)
412412
{
413-
ScheduleBatchUnsafe(newBatch, ct);
413+
ScheduleBatchUnsafe(newBatch);
414414
}
415415

416416
return newPromise;
417417
}
418418

419-
private void ScheduleBatchUnsafe(Batch<TKey> batch, CancellationToken ct)
419+
private void ScheduleBatchUnsafe(Batch<TKey> batch)
420420
{
421421
batch.IsScheduled = true;
422-
_batchScheduler.Schedule(() => DispatchBatchAsync(batch, ct));
422+
_batchScheduler.Schedule(batch);
423423
}
424424

425425
private void SetSingleResult(
@@ -518,4 +518,11 @@ protected static string GetCacheKeyType<TDataLoader>()
518518
/// </returns>
519519
protected static string GetCacheKeyType(Type type)
520520
=> type.FullName ?? type.Name;
521+
522+
private Batch<TKey> RentBatch(CancellationToken ct)
523+
{
524+
var batch = BatchPool<TKey>.Shared.Get();
525+
batch.Initialize(DispatchBatchAsync, ct);
526+
return batch;
527+
}
521528
}

src/GreenDonut/test/GreenDonut.Tests/AutoBatchSchedulerTests.cs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,32 @@ public void DispatchOnEnqueue()
1010
var waitHandle = new AutoResetEvent(false);
1111

1212
// act
13-
AutoBatchScheduler.Default.Schedule(() =>
14-
{
15-
dispatched = true;
16-
waitHandle.Set();
17-
return default;
18-
});
13+
AutoBatchScheduler.Default.Schedule(
14+
new NoopBatch(() =>
15+
{
16+
dispatched = true;
17+
waitHandle.Set();
18+
}));
1919

2020
// assert
2121
waitHandle.WaitOne(TimeSpan.FromSeconds(5));
2222
Assert.True(dispatched);
2323
}
24+
25+
public class NoopBatch(Action action) : Batch
26+
{
27+
public override int Size { get; }
28+
public override BatchStatus Status { get; }
29+
public override long ModifiedTimestamp { get; }
30+
public override bool Touch()
31+
{
32+
throw new NotImplementedException();
33+
}
34+
35+
public override Task DispatchAsync()
36+
{
37+
action();
38+
return Task.CompletedTask;
39+
}
40+
}
2441
}

src/GreenDonut/test/GreenDonut.Tests/BatchDataLoaderTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected override Task<IReadOnlyDictionary<string, string>> LoadBatchAsync(
116116

117117
public sealed class InstantDispatcher : IBatchScheduler
118118
{
119-
public void Schedule(Func<ValueTask> dispatch)
120-
=> dispatch();
119+
public void Schedule(Batch batch)
120+
=> batch.DispatchAsync();
121121
}
122122
}

src/GreenDonut/test/GreenDonut.Tests/DelayDispatcher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ namespace GreenDonut;
22

33
public class DelayDispatcher : IBatchScheduler
44
{
5-
public void Schedule(Func<ValueTask> dispatch)
5+
public void Schedule(Batch batch)
66
=> Task.Run(async () =>
77
{
88
await Task.Delay(150);
9-
await dispatch();
9+
await batch.DispatchAsync();
1010
});
1111
}

src/GreenDonut/test/GreenDonut.Tests/ManualBatchScheduler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@ namespace GreenDonut;
44

55
public class ManualBatchScheduler : IBatchScheduler
66
{
7-
private readonly ConcurrentQueue<Func<ValueTask>> _queue = new();
7+
private readonly ConcurrentQueue<Batch> _queue = new();
88

9-
public void Dispatch()
9+
public void Schedule(Batch batch)
1010
{
11-
while (_queue.TryDequeue(out var dispatch))
12-
{
13-
Task.Run(async () => await dispatch());
14-
}
11+
_queue.Enqueue(batch);
1512
}
1613

17-
public void Schedule(Func<ValueTask> dispatch)
14+
public void Dispatch()
1815
{
19-
_queue.Enqueue(dispatch);
16+
while (_queue.TryDequeue(out var batch))
17+
{
18+
Task.Run(async () => await batch.DispatchAsync());
19+
}
2020
}
2121
}

0 commit comments

Comments
 (0)