Skip to content

Commit 46fc378

Browse files
Martin LercherMartin Lercher
authored andcommitted
exctracted an interface for process scheduling plus tests
1 parent 66a1c10 commit 46fc378

File tree

4 files changed

+128
-2
lines changed

4 files changed

+128
-2
lines changed

src/JsonRpc/IScheduler.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace JsonRpc
5+
{
6+
public interface IScheduler : IDisposable
7+
{
8+
void Start();
9+
void Add(RequestProcessType type, Func<Task> request);
10+
}
11+
}

src/JsonRpc/InputHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class InputHandler : IInputHandler
2323
private Thread _inputThread;
2424
private readonly IRequestRouter _requestRouter;
2525
private readonly IResponseRouter _responseRouter;
26-
private readonly ProcessScheduler _scheduler;
26+
private readonly IScheduler _scheduler;
2727

2828
public InputHandler(
2929
TextReader input,

src/JsonRpc/ProcessScheduler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace JsonRpc
88
{
9-
public class ProcessScheduler : IDisposable
9+
public class ProcessScheduler : IScheduler
1010
{
1111
private readonly BlockingCollection<(RequestProcessType type, Func<Task> request)> _queue;
1212
private readonly CancellationTokenSource _cancel;
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Xunit;
6+
using FluentAssertions;
7+
using System.Collections.Generic;
8+
9+
namespace JsonRpc.Tests
10+
{
11+
public class ProcessSchedulerTests
12+
{
13+
private const int SLEEPTIME_MS = 50;
14+
15+
class AllRequestProcessTypes : TheoryData
16+
{
17+
public override IEnumerator<object[]> GetEnumerator()
18+
{
19+
var values = (object[])Enum.GetValues(typeof(RequestProcessType));
20+
var qy = from v in values select new object[] { v };
21+
return qy.GetEnumerator();
22+
}
23+
}
24+
25+
[Theory, ClassData(typeof(AllRequestProcessTypes))]
26+
public void ShouldScheduleCompletedTask(RequestProcessType type)
27+
{
28+
using (IScheduler s = new ProcessScheduler())
29+
{
30+
var done = false;
31+
s.Start();
32+
s.Add(type, () => {
33+
done = true;
34+
return Task.CompletedTask;
35+
});
36+
Thread.Sleep(SLEEPTIME_MS);
37+
done.Should().Be(true);
38+
}
39+
}
40+
41+
[Theory, ClassData(typeof(AllRequestProcessTypes))]
42+
public void ShouldScheduleAwaitableTask(RequestProcessType type)
43+
{
44+
using (IScheduler s = new ProcessScheduler())
45+
{
46+
var done = false;
47+
s.Start();
48+
s.Add(RequestProcessType.Serial, async () => {
49+
done = true;
50+
await Task.Yield();
51+
});
52+
Thread.Sleep(SLEEPTIME_MS);
53+
done.Should().Be(true);
54+
}
55+
}
56+
57+
public void ShouldScheduleSerialInOrder()
58+
{
59+
using (IScheduler s = new ProcessScheduler())
60+
{
61+
var done = 0;
62+
var peek = 0;
63+
var peekWasMoreThanOne = 0;
64+
65+
Func<Task> HandlePeek = async () => {
66+
Interlocked.Increment(ref done); // record that I was called
67+
var p = Interlocked.Increment(ref peek);
68+
if (p > 1)
69+
Interlocked.Increment(ref peekWasMoreThanOne);
70+
await Task.Delay(SLEEPTIME_MS); // give a different HandlePeek task a chance to run
71+
Interlocked.Decrement(ref peek);
72+
};
73+
74+
s.Start();
75+
s.Add(RequestProcessType.Serial, HandlePeek);
76+
s.Add(RequestProcessType.Serial, HandlePeek);
77+
78+
Thread.Sleep(SLEEPTIME_MS * 3);
79+
done.Should().Be(2);
80+
peek.Should().Be(0);
81+
peekWasMoreThanOne.Should().Be(0);
82+
}
83+
}
84+
85+
public void ShouldScheduleParallelInParallel()
86+
{
87+
using (IScheduler s = new ProcessScheduler())
88+
{
89+
var done = 0;
90+
var peek = 0;
91+
var peekWasMoreThanOne = 0;
92+
93+
Func<Task> HandlePeek = async () => {
94+
Interlocked.Increment(ref done); // record that I was called
95+
var p = Interlocked.Increment(ref peek);
96+
if (p > 1)
97+
Interlocked.Increment(ref peekWasMoreThanOne);
98+
await Task.Delay(SLEEPTIME_MS); // give a different HandlePeek task a chance to run
99+
Interlocked.Decrement(ref peek);
100+
};
101+
102+
s.Start();
103+
s.Add(RequestProcessType.Parallel, HandlePeek);
104+
s.Add(RequestProcessType.Parallel, HandlePeek);
105+
s.Add(RequestProcessType.Parallel, HandlePeek);
106+
s.Add(RequestProcessType.Parallel, HandlePeek);
107+
108+
Thread.Sleep(SLEEPTIME_MS * 2);
109+
done.Should().Be(4);
110+
peek.Should().Be(0);
111+
peekWasMoreThanOne.Should().BeGreaterThan(0);
112+
}
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)