Skip to content

Commit 1f5715b

Browse files
committed
Further testing, fixing and investigations
1 parent 47630c5 commit 1f5715b

36 files changed

+369
-166
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System.Reactive.Concurrency;
2+
3+
namespace System.Reactive.Unity.Concurrency {
4+
/// <summary>
5+
/// Represents an object that schedules units of work on the CLR thread pool. (This one will never fall back to the NewThreadScheduler unlike the ThreadPoolScheduler)
6+
/// </summary>
7+
/// <seealso cref="Instance">Singleton instance of this type exposed through this static property.</seealso>
8+
public sealed class ThreadPoolOnlyScheduler : LocalScheduler, ISchedulerPeriodic {
9+
private static readonly Lazy<ThreadPoolOnlyScheduler> _lazyDefault = new Lazy<ThreadPoolOnlyScheduler>(static () => new ThreadPoolOnlyScheduler());
10+
private readonly ThreadPoolScheduler _innerScheduler;
11+
12+
public ThreadPoolOnlyScheduler() {
13+
_innerScheduler = ThreadPoolScheduler.Instance;
14+
}
15+
16+
/// <summary>
17+
/// Gets the singleton instance of the thread pool only scheduler. (This one will never fall back to the NewThreadScheduler unlike the ThreadPoolScheduler)
18+
/// </summary>
19+
public static ThreadPoolOnlyScheduler Instance => _lazyDefault.Value;
20+
21+
/// <summary>
22+
/// Schedules an action to be executed.
23+
/// </summary>
24+
/// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
25+
/// <param name="state">State passed to the action to be executed.</param>
26+
/// <param name="action">Action to be executed.</param>
27+
/// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
28+
/// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
29+
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) {
30+
return _innerScheduler.Schedule(state, action);
31+
}
32+
33+
/// <summary>
34+
/// Schedules an action to be executed after dueTime.
35+
/// </summary>
36+
/// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
37+
/// <param name="state">State passed to the action to be executed.</param>
38+
/// <param name="action">Action to be executed.</param>
39+
/// <param name="dueTime">Relative time after which to execute the action.</param>
40+
/// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
41+
/// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
42+
public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) {
43+
return _innerScheduler.Schedule(state, dueTime, action);
44+
}
45+
46+
/// <summary>
47+
/// Schedules a periodic piece of work.
48+
/// </summary>
49+
/// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
50+
/// <param name="state">Initial state passed to the action upon the first iteration.</param>
51+
/// <param name="period">Period for running the work periodically.</param>
52+
/// <param name="action">Action to be executed, potentially updating the state.</param>
53+
/// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
54+
/// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
55+
/// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than zero.</exception>
56+
public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action) {
57+
return _innerScheduler.SchedulePeriodic(state, period, action);
58+
}
59+
60+
public override IStopwatch StartStopwatch() => _innerScheduler.StartStopwatch();
61+
}
62+
}

Assets/Src/Scripts/Unity/Concurrency/ThreadPoolOnlyScheduler.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Assets/Src/Scripts/Unity/Concurrency/UnityMainThreadScheduler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ public UnityMainThreadScheduler() {
1717
_scheduleAction = new Action<object>(Schedule);
1818
}
1919

20-
/// Gets the singleton instance of the Windows Runtime thread pool scheduler.
20+
/// <summary>
21+
/// Gets the singleton instance of the Unity main thread scheduler.
2122
/// </summary>
2223
public static UnityMainThreadScheduler Instance => _lazyDefault.Value;
2324

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System.Reactive.Concurrency;
2+
using System.Reactive.PlatformServices;
3+
using System.Reactive.Unity.Concurrency;
4+
5+
namespace System.Reactive.Unity.InternalUtil {
6+
sealed class UnityEnlightenmentProvider : CurrentPlatformEnlightenmentProvider {
7+
public override T GetService<T>(object[] args) {
8+
if (typeof(T) == typeof(IScheduler) && args != null && ((string)args[0]) == "ThreadPool")
9+
return (T)(object)ThreadPoolOnlyScheduler.Instance;
10+
return base.GetService<T>(args);
11+
}
12+
}
13+
}

Assets/Src/Scripts/Unity/InternalUtil/UnityEnlightenmentProvider.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Assets/Src/Scripts/Unity/ReactiveUnity.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
using UnityEngine;
44
using System.Reactive.Concurrency;
55
using System.Reactive.Unity.Concurrency;
6+
using System.Reactive.PlatformServices;
67

78
namespace System.Reactive.Unity {
89
public static class ReactiveUnity {
910
public static void SetupPatches() {
1011
InitSchedulerDefaults();
12+
InitCurrentPlatformEnlightenmentProvider();
1113
AddUnityEqualityComparer<Vector2>();
1214
AddUnityEqualityComparer<Vector3>();
1315
AddUnityEqualityComparer<Vector4>();
@@ -28,11 +30,15 @@ private static void InitSchedulerDefaults() {
2830
#if WEB_GL
2931
SchedulerDefaults.AsyncConversions = UnityMainThreadScheduler.Instance;
3032
#else
31-
// SchedulerDefaults.AsyncConversions = ThreadPoolScheduler.Instance;
32-
SchedulerDefaults.AsyncConversions = DefaultScheduler.Instance;
33+
// SchedulerDefaults.AsyncConversions = DefaultScheduler.Instance;
34+
SchedulerDefaults.AsyncConversions = ThreadPoolOnlyScheduler.Instance;
3335
#endif
3436
}
3537

38+
private static void InitCurrentPlatformEnlightenmentProvider() {
39+
PlatformEnlightenmentProvider.Current = new UnityEnlightenmentProvider();
40+
}
41+
3642
private static void AddUnityEqualityComparer<T>() {
3743
var comparer = UnityEqualityComparer.GetDefault<T>();
3844
ReactiveProperty<T>.defaultEqualityComparer = comparer;

Assets/Tests/Microsoft.Reactive.Testing/ReactiveTest.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ public class ReactiveTest
3030
/// </summary>
3131
public const long Disposed = 1000;
3232

33+
[OneTimeSetUp]
34+
public void SetupOnce() {
35+
System.Threading.ThreadPool.GetAvailableThreads(out var availableWorker, out var availableIoCompletion);
36+
System.Threading.ThreadPool.GetMaxThreads(out var maxWorker, out var maxIoCompletion);
37+
UnityEngine.Debug.LogWarningFormat("Worker: {0} / {1} ; IoCompletion: {2} / {3}", availableWorker, maxWorker, availableIoCompletion, maxIoCompletion);
38+
}
39+
3340
/// <summary>
3441
/// Factory method for an OnNext notification record at a given time with a given value.
3542
/// </summary>

Assets/Tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,9 @@ public void EventLoop_ScheduleActionDue() {
272272
Assert.True(sw.ElapsedMilliseconds > 180, "due " + sw.ElapsedMilliseconds);
273273

274274
static void Aot() {
275-
CurrentThreadScheduler.Instance.Schedule<(Func<(BasicProducer<int>, SingleAssignmentDisposable, IObserver<int>), IDisposable>, (BasicProducer<int>, SingleAssignmentDisposable, IObserver<int>))>(
275+
_ = CurrentThreadScheduler.Instance.Schedule<(Func<(BasicProducer<int>, SingleAssignmentDisposable, IObserver<int>), IDisposable>, (BasicProducer<int>, SingleAssignmentDisposable, IObserver<int>))>(
276276
(null, (null, null, null)), TimeSpan.Zero, null);
277-
Aot();
277+
_ = nameof(Aot);
278278
}
279279
}
280280

@@ -303,9 +303,9 @@ public void EventLoop_ScheduleActionDueNested() {
303303
Assert.True(sw.ElapsedMilliseconds > 380, "due " + sw.ElapsedMilliseconds);
304304

305305
static void Aot() {
306-
CurrentThreadScheduler.Instance.Schedule<(Action<(Producer<int, Where<int>.Predicate._>, Where<int>.Predicate._)>, (Producer<int, Where<int>.Predicate._>, Where<int>.Predicate._))>(
306+
_ = CurrentThreadScheduler.Instance.Schedule<(Action<(Producer<int, Where<int>.Predicate._>, Where<int>.Predicate._)>, (Producer<int, Where<int>.Predicate._>, Where<int>.Predicate._))>(
307307
(null, (null, null)), TimeSpan.Zero, null);
308-
Aot();
308+
_ = nameof(Aot);
309309
}
310310
}
311311

Assets/Tests/Tests.System.Reactive/Tests/Linq/Observable/BufferTest.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -836,8 +836,8 @@ public void BufferWithCount_Disposed()
836836
);
837837

838838
static void Aot() {
839-
new TestScheduler().ScheduleAbsolute<((Buffer<int>.Ferry._, int), Action<(Buffer<int>.Ferry._, int)>)>(((null, default), null), 0L, null);
840-
Aot();
839+
_ = new TestScheduler().ScheduleAbsolute<((Buffer<int>.Ferry._, int), Action<(Buffer<int>.Ferry._, int)>)>(((null, default), null), 0L, null);
840+
_ = nameof(Aot);
841841
}
842842
}
843843

@@ -1176,12 +1176,6 @@ public void BufferWithTime_Basic_Same_Periodic_Error()
11761176
new TimerRun(200, 480) { 300, 400 }
11771177
);
11781178
#endif
1179-
1180-
static void Aot() {
1181-
// TOOD invisible class!
1182-
// new TestScheduler().ScheduleAbsolute<((Action<Action<TimeSpan>>, SingleAssignmentDisposable, Scheduler.InvokeRec2State<Action<Action<TimeSpan>>>), Action<(Action<Action<TimeSpan>>, SingleAssignmentDisposable, Scheduler.InvokeRec2State<Action<Action<TimeSpan>>>)>)>(((null, null, null), null), 0L, null);
1183-
Aot();
1184-
}
11851179
}
11861180

11871181
[Test]
@@ -1333,8 +1327,8 @@ public void BufferWithTime_TickWhileOnCompleted()
13331327
});
13341328

13351329
static void Aot() {
1336-
new TestScheduler().ScheduleAbsolute<((Window<int>.Ferry._, Subject<int>), Action<(Window<int>.Ferry._, Subject<int>)>)>(((null, null), null), 0L, null);
1337-
Aot();
1330+
_ = new TestScheduler().ScheduleAbsolute<((Window<int>.Ferry._, Subject<int>), Action<(Window<int>.Ferry._, Subject<int>)>)>(((null, null), null), 0L, null);
1331+
_ = nameof(Aot);
13381332
}
13391333
}
13401334

Assets/Tests/Tests.System.Reactive/Tests/Linq/Observable/DelaySubscriptionTest.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,22 @@
1111
using ReactiveTests.Dummies;
1212
using NUnit.Framework;
1313
using UnityEngine.TestTools;
14+
using UniRx.Tests;
15+
using System.Reactive.Unity;
1416

1517
namespace ReactiveTests.Tests
1618
{
1719
public class DelaySubscriptionTest : ReactiveTest
1820
{
21+
[SetUp]
22+
public void Init() {
23+
TestUtil.SetSchedulerForImport();
24+
}
25+
26+
[TearDown]
27+
public void Dispose() {
28+
ReactiveUnity.SetupPatches();
29+
}
1930

2031
[Test]
2132
public void DelaySubscription_ArgumentChecking()

0 commit comments

Comments
 (0)