Skip to content

Commit 9b16029

Browse files
Improve async with custom tick scheduler factory and exception printing
1 parent 423fe01 commit 9b16029

File tree

10 files changed

+186
-61
lines changed

10 files changed

+186
-61
lines changed

api/AltV.Net.Async/AltVAsync.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ internal class AltVAsync
1010
private readonly TickScheduler scheduler;
1111
private readonly Thread mainThread;
1212

13-
public AltVAsync()
13+
public AltVAsync(ITickSchedulerFactory tickSchedulerFactory)
1414
{
1515
mainThread = Thread.CurrentThread;
1616
mainThread.Name = "main";
17-
scheduler = new TickScheduler(mainThread);
17+
scheduler = tickSchedulerFactory.Create(mainThread);
1818
taskFactory = new TaskFactory(
1919
CancellationToken.None, TaskCreationOptions.DenyChildAttach,
2020
TaskContinuationOptions.None, scheduler);

api/AltV.Net.Async/AsyncEventHandler.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ public async Task CallAsync(Func<TEvent, Task> func)
1919
await Task.WhenAll(tasks);
2020
}
2121

22+
public async void CallAsyncWithoutTask(Func<TEvent, Task> callback)
23+
{
24+
var events = GetEvents();
25+
foreach (var value in events)
26+
{
27+
await ExecuteEventAsync(value, callback);
28+
}
29+
}
30+
2231
public static async Task ExecuteEventAsync(TEvent subscription, Func<TEvent, Task> callback)
2332
{
2433
try
@@ -27,7 +36,7 @@ public static async Task ExecuteEventAsync(TEvent subscription, Func<TEvent, Tas
2736
}
2837
catch (Exception e)
2938
{
30-
Alt.Log($"Execution of {typeof(TEvent)} threw an error: {e}");
39+
AltAsync.Log($"Execution of {typeof(TEvent)} threw an error: {e}");
3140
}
3241
}
3342

@@ -39,7 +48,7 @@ public static async void ExecuteEventAsyncWithoutTask(TEvent subscription, Func<
3948
}
4049
catch (Exception e)
4150
{
42-
Alt.Log($"Execution of {typeof(TEvent)} threw an error: {e}");
51+
AltAsync.Log($"Execution of {typeof(TEvent)} threw an error: {e}");
4352
}
4453
}
4554
}

api/AltV.Net.Async/AsyncModule.cs

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Threading.Tasks;
34
using AltV.Net.Async.Events;
@@ -48,10 +49,10 @@ internal readonly AsyncEventHandler<PlayerChangeVehicleSeatAsyncDelegate>
4849

4950
internal readonly AsyncEventHandler<ConsoleCommandAsyncDelegate> ConsoleCommandAsyncDelegateHandlers =
5051
new AsyncEventHandler<ConsoleCommandAsyncDelegate>();
51-
52+
5253
internal readonly AsyncEventHandler<MetaDataChangeAsyncDelegate> MetaDataChangeAsyncDelegateHandlers =
5354
new AsyncEventHandler<MetaDataChangeAsyncDelegate>();
54-
55+
5556
internal readonly AsyncEventHandler<MetaDataChangeAsyncDelegate> SyncedMetaDataChangeAsyncDelegateHandlers =
5657
new AsyncEventHandler<MetaDataChangeAsyncDelegate>();
5758

@@ -79,14 +80,15 @@ public override void OnCheckPointEvent(ICheckpoint checkpoint, IEntity entity, b
7980
{
8081
base.OnCheckPointEvent(checkpoint, entity, state);
8182
if (!CheckpointAsyncEventHandler.HasEvents()) return;
82-
Task.Run(() => CheckpointAsyncEventHandler.CallAsync(@delegate => @delegate(checkpoint, entity, state)));
83+
Task.Run(() =>
84+
CheckpointAsyncEventHandler.CallAsyncWithoutTask(@delegate => @delegate(checkpoint, entity, state)));
8385
}
8486

8587
public override void OnPlayerConnectEvent(IPlayer player, string reason)
8688
{
8789
base.OnPlayerConnectEvent(player, reason);
8890
if (!PlayerConnectAsyncEventHandler.HasEvents()) return;
89-
Task.Run(() => PlayerConnectAsyncEventHandler.CallAsync(@delegate => @delegate(player, reason)));
91+
Task.Run(() => PlayerConnectAsyncEventHandler.CallAsyncWithoutTask(@delegate => @delegate(player, reason)));
9092
}
9193

9294
public override void OnPlayerDamageEvent(IPlayer player, IEntity attacker, uint weapon, ushort damage)
@@ -96,7 +98,7 @@ public override void OnPlayerDamageEvent(IPlayer player, IEntity attacker, uint
9698
var oldHealth = player.Health;
9799
var oldArmor = player.Armor;
98100
Task.Run(() =>
99-
PlayerDamageAsyncEventHandler.CallAsync(@delegate =>
101+
PlayerDamageAsyncEventHandler.CallAsyncWithoutTask(@delegate =>
100102
@delegate(player, attacker, oldHealth, oldArmor, weapon, damage)));
101103
}
102104

@@ -105,7 +107,7 @@ public override void OnPlayerDeathEvent(IPlayer player, IEntity killer, uint wea
105107
base.OnPlayerDeathEvent(player, killer, weapon);
106108
if (!PlayerDeadAsyncEventHandler.HasEvents()) return;
107109
Task.Run(() =>
108-
PlayerDeadAsyncEventHandler.CallAsync(@delegate => @delegate(player, killer, weapon)));
110+
PlayerDeadAsyncEventHandler.CallAsyncWithoutTask(@delegate => @delegate(player, killer, weapon)));
109111
}
110112

111113
public override void OnPlayerChangeVehicleSeatEvent(IVehicle vehicle, IPlayer player, byte oldSeat,
@@ -114,7 +116,7 @@ public override void OnPlayerChangeVehicleSeatEvent(IVehicle vehicle, IPlayer pl
114116
base.OnPlayerChangeVehicleSeatEvent(vehicle, player, oldSeat, newSeat);
115117
if (!PlayerChangeVehicleSeatAsyncEventHandler.HasEvents()) return;
116118
Task.Run(() =>
117-
PlayerChangeVehicleSeatAsyncEventHandler.CallAsync(@delegate =>
119+
PlayerChangeVehicleSeatAsyncEventHandler.CallAsyncWithoutTask(@delegate =>
118120
@delegate(vehicle, player, oldSeat, newSeat)));
119121
}
120122

@@ -123,7 +125,7 @@ public override void OnPlayerEnterVehicleEvent(IVehicle vehicle, IPlayer player,
123125
base.OnPlayerEnterVehicleEvent(vehicle, player, seat);
124126
if (!PlayerEnterVehicleAsyncEventHandler.HasEvents()) return;
125127
Task.Run(() =>
126-
PlayerEnterVehicleAsyncEventHandler.CallAsync(@delegate =>
128+
PlayerEnterVehicleAsyncEventHandler.CallAsyncWithoutTask(@delegate =>
127129
@delegate(vehicle, player, seat)));
128130
}
129131

@@ -132,7 +134,7 @@ public override void OnPlayerLeaveVehicleEvent(IVehicle vehicle, IPlayer player,
132134
base.OnPlayerLeaveVehicleEvent(vehicle, player, seat);
133135
if (!PlayerLeaveVehicleAsyncEventHandler.HasEvents()) return;
134136
Task.Run(() =>
135-
PlayerLeaveVehicleAsyncEventHandler.CallAsync(@delegate =>
137+
PlayerLeaveVehicleAsyncEventHandler.CallAsyncWithoutTask(@delegate =>
136138
@delegate(vehicle, player, seat)));
137139
}
138140

@@ -155,7 +157,7 @@ public override void OnPlayerRemoveEvent(IPlayer player)
155157
base.OnPlayerRemoveEvent(player);
156158
if (!PlayerRemoveAsyncEventHandler.HasEvents()) return;
157159
Task.Run(() =>
158-
PlayerRemoveAsyncEventHandler.CallAsync(@delegate =>
160+
PlayerRemoveAsyncEventHandler.CallAsyncWithoutTask(@delegate =>
159161
@delegate(player)));
160162
}
161163

@@ -164,7 +166,7 @@ public override void OnVehicleRemoveEvent(IVehicle vehicle)
164166
base.OnVehicleRemoveEvent(vehicle);
165167
if (!VehicleRemoveAsyncEventHandler.HasEvents()) return;
166168
Task.Run(() =>
167-
VehicleRemoveAsyncEventHandler.CallAsync(@delegate =>
169+
VehicleRemoveAsyncEventHandler.CallAsyncWithoutTask(@delegate =>
168170
@delegate(vehicle)));
169171
}
170172

@@ -190,14 +192,25 @@ public override void OnClientEventEvent(IPlayer player, string name, ref MValueA
190192
}
191193
}
192194

193-
Task.Run(() =>
195+
Task.Run(async () =>
194196
{
195197
foreach (var eventHandler in eventHandlers)
196198
{
197199
var invokeValues = eventHandler.CalculateInvokeValues(objects, player);
198200
if (invokeValues != null)
199201
{
200-
eventHandler.InvokeNoResult(invokeValues);
202+
try
203+
{
204+
var task = eventHandler.InvokeTaskOrNull(invokeValues);
205+
if (task != null)
206+
{
207+
await task;
208+
}
209+
}
210+
catch (Exception e)
211+
{
212+
AltAsync.Log($"Execution of ${name} threw an error: {e}");
213+
}
201214
}
202215
else
203216
{
@@ -284,14 +297,25 @@ public override void OnServerEventEvent(string name, ref MValueArray args, MValu
284297
}
285298
}
286299

287-
Task.Run(() =>
300+
Task.Run(async () =>
288301
{
289302
foreach (var eventHandler in eventHandlers)
290303
{
291304
var invokeValues = eventHandler.CalculateInvokeValues(objects);
292305
if (invokeValues != null)
293306
{
294-
eventHandler.InvokeNoResult(invokeValues);
307+
try
308+
{
309+
var task = eventHandler.InvokeTaskOrNull(invokeValues);
310+
if (task != null)
311+
{
312+
await task;
313+
}
314+
}
315+
catch (Exception e)
316+
{
317+
AltAsync.Log($"Execution of ${name} threw an error: {e}");
318+
}
295319
}
296320
else
297321
{
@@ -335,7 +359,7 @@ public override void OnConsoleCommandEvent(string name, string[] args)
335359
base.OnConsoleCommandEvent(name, args);
336360
if (!ConsoleCommandAsyncDelegateHandlers.HasEvents()) return;
337361
Task.Run(() =>
338-
ConsoleCommandAsyncDelegateHandlers.CallAsync(@delegate =>
362+
ConsoleCommandAsyncDelegateHandlers.CallAsyncWithoutTask(@delegate =>
339363
@delegate(name, args)));
340364
}
341365

@@ -344,7 +368,7 @@ public override void OnMetaDataChangeEvent(IEntity entity, string key, object va
344368
base.OnMetaDataChangeEvent(entity, key, value);
345369
if (!MetaDataChangeAsyncDelegateHandlers.HasEvents()) return;
346370
Task.Run(() =>
347-
MetaDataChangeAsyncDelegateHandlers.CallAsync(@delegate =>
371+
MetaDataChangeAsyncDelegateHandlers.CallAsyncWithoutTask(@delegate =>
348372
@delegate(entity, key, value)));
349373
}
350374

@@ -353,7 +377,7 @@ public override void OnSyncedMetaDataChangeEvent(IEntity entity, string key, obj
353377
base.OnSyncedMetaDataChangeEvent(entity, key, value);
354378
if (!SyncedMetaDataChangeAsyncDelegateHandlers.HasEvents()) return;
355379
Task.Run(() =>
356-
SyncedMetaDataChangeAsyncDelegateHandlers.CallAsync(@delegate =>
380+
SyncedMetaDataChangeAsyncDelegateHandlers.CallAsyncWithoutTask(@delegate =>
357381
@delegate(entity, key, value)));
358382
}
359383

api/AltV.Net.Async/AsyncResource.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,16 @@ namespace AltV.Net.Async
55
{
66
public abstract class AsyncResource : Resource
77
{
8-
private readonly AltVAsync altVAsync = new AltVAsync();
8+
private readonly AltVAsync altVAsync;
9+
10+
public AsyncResource() : this(new DefaultTickSchedulerFactory())
11+
{
12+
}
13+
14+
public AsyncResource(ITickSchedulerFactory tickSchedulerFactory)
15+
{
16+
altVAsync = new AltVAsync(tickSchedulerFactory);
17+
}
918

1019
public override void OnTick()
1120
{
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Channels;
4+
using System.Threading.Tasks;
5+
6+
namespace AltV.Net.Async
7+
{
8+
internal class ChannelTickScheduler : TickScheduler
9+
{
10+
private readonly Thread mainThread;
11+
12+
public override int MaximumConcurrencyLevel { get; } = 1;
13+
14+
private readonly Channel<Task> tasks = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions
15+
{SingleReader = true});
16+
17+
private Task currentTask;
18+
19+
private readonly ChannelReader<Task> reader;
20+
21+
private readonly ChannelWriter<Task> writer;
22+
23+
public ChannelTickScheduler(Thread mainThread)
24+
{
25+
this.mainThread = mainThread;
26+
reader = tasks.Reader;
27+
writer = tasks.Writer;
28+
}
29+
30+
protected override IEnumerable<Task> GetScheduledTasks() => null;
31+
32+
protected override void QueueTask(Task task) => writer.WriteAsync(task);
33+
34+
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) =>
35+
Thread.CurrentThread == mainThread && TryExecuteTask(task);
36+
37+
public override void Tick()
38+
{
39+
if (reader.TryRead(out currentTask))
40+
{
41+
TryExecuteTask(currentTask);
42+
}
43+
}
44+
}
45+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Threading;
2+
3+
namespace AltV.Net.Async
4+
{
5+
public class DefaultTickSchedulerFactory : ITickSchedulerFactory
6+
{
7+
public TickScheduler Create(Thread mainThread)
8+
{
9+
return new ChannelTickScheduler(mainThread);
10+
}
11+
}
12+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Threading;
2+
3+
namespace AltV.Net.Async
4+
{
5+
public interface ITickSchedulerFactory
6+
{
7+
TickScheduler Create(Thread mainThread);
8+
}
9+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System.Collections.Concurrent;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace AltV.Net.Async
7+
{
8+
internal class QueueTickScheduler : TickScheduler
9+
{
10+
private readonly Thread mainThread;
11+
12+
public override int MaximumConcurrencyLevel { get; } = 1;
13+
14+
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();
15+
16+
private Task currentTask;
17+
18+
private int runs;
19+
20+
public QueueTickScheduler(Thread mainThread)
21+
{
22+
this.mainThread = mainThread;
23+
}
24+
25+
protected override IEnumerable<Task> GetScheduledTasks() => null;
26+
27+
protected override void QueueTask(Task task) => tasks.Enqueue(task);
28+
29+
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) =>
30+
Thread.CurrentThread == mainThread && TryExecuteTask(task);
31+
32+
public override void Tick()
33+
{
34+
runs = tasks.Count;
35+
36+
while (runs-- > 0 && tasks.TryDequeue(out currentTask))
37+
{
38+
TryExecuteTask(currentTask);
39+
}
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)