Skip to content

Commit d5eb28f

Browse files
committed
Create AsyncBatchSample.cs
1 parent 47ae88a commit d5eb28f

File tree

1 file changed

+253
-0
lines changed

1 file changed

+253
-0
lines changed

Misc/AsyncBatchSample.cs

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using System.Timers;
6+
using ExcelDna.Integration;
7+
using Timer = System.Timers.Timer;
8+
using System.Net.Http;
9+
10+
namespace GeneralTestsCS
11+
{
12+
13+
public static class AysyncBatchExample
14+
{
15+
// 1. Create an instance on the AsyncBatchUtil, passing in some parameters and the btah running function.
16+
static readonly AsyncBatchUtil BatchRunner = new AsyncBatchUtil(1000, TimeSpan.FromMilliseconds(250), RunBatch);
17+
18+
// This function will be called for each batch, on a ThreadPool thread.
19+
// Each AsyncCall contains the function name and arguments passed from the function.
20+
// The List<object> returned by the Task must contain the results, corresponding to the calls list.
21+
static async Task<List<object>> RunBatch(List<AsyncBatchUtil.AsyncCall> calls)
22+
{
23+
var batchStart = DateTime.Now;
24+
// Simulate things taking a while...
25+
await Task.Delay(TimeSpan.FromSeconds(10));
26+
27+
using (var httpClient = new HttpClient())
28+
{
29+
var page = await httpClient.GetStringAsync("http://www.google.com");
30+
}
31+
32+
// Now build up the list of results...
33+
var results = new List<object>();
34+
int i = 0;
35+
foreach (var call in calls)
36+
{
37+
// As an example just an informative string
38+
var result = string.Format("{0} - {1} : {2}/{3} @ {4:HH:mm:ss.fff}", call.FunctionName, call.Arguments[0], i++, calls.Count, batchStart);
39+
results.Add(result);
40+
}
41+
42+
return results;
43+
}
44+
45+
public static object SlowFunction(string code, int value)
46+
{
47+
return BatchRunner.Run("SlowFunction", code, value);
48+
}
49+
}
50+
51+
52+
// This is the main helper class for supporting batched async calls
53+
// To use:
54+
// 1. Create an instance of AsyncBatchUtil, passing in a Func<List<AsyncCall>, Task<List<object>>> to run each batch.
55+
// 2. Call from inside a batched function as:
56+
// public static object SlowFunction(string code, int value)
57+
// {
58+
// return BatchRunner.Run("SlowFunction", code, value);
59+
// }
60+
public class AsyncBatchUtil
61+
{
62+
// Represents a single function call in a batch
63+
public class AsyncCall
64+
{
65+
internal TaskCompletionSource<object> TaskCompletionSource;
66+
public string FunctionName { get; private set; }
67+
public object[] Arguments { get; private set; }
68+
69+
public AsyncCall(TaskCompletionSource<object> taskCompletion, string functionName, object[] args)
70+
{
71+
TaskCompletionSource = taskCompletion;
72+
FunctionName = functionName;
73+
Arguments = args;
74+
}
75+
}
76+
77+
// Not a hard limit
78+
readonly int _maxBatchSize;
79+
readonly Func<List<AsyncCall>, Task<List<object>>> _batchRunner;
80+
81+
readonly object _lock = new object();
82+
readonly Timer _batchTimer; // Timer events will fire from a ThreadPool thread
83+
List<AsyncCall> _currentBatch;
84+
85+
public AsyncBatchUtil(int maxBatchSize, TimeSpan batchTimeout, Func<List<AsyncCall>, Task<List<object>>> batchRunner)
86+
{
87+
if (maxBatchSize < 1)
88+
{
89+
throw new ArgumentOutOfRangeException("maxBatchSize", "Max batch size must be positive");
90+
}
91+
if (batchRunner == null)
92+
{
93+
// Check early - otherwise the NullReferenceException would happen in a threadpool callback.
94+
throw new ArgumentNullException("batchRunner");
95+
}
96+
97+
_maxBatchSize = maxBatchSize;
98+
_batchRunner = batchRunner;
99+
100+
_currentBatch = new List<AsyncCall>();
101+
102+
_batchTimer = new Timer(batchTimeout.TotalMilliseconds);
103+
_batchTimer.AutoReset = false;
104+
_batchTimer.Elapsed += TimerElapsed;
105+
// Timer is not Enabled (Started) by default
106+
}
107+
108+
// Will only run on the main thread
109+
public object Run(string functionName, params object[] args)
110+
{
111+
return ExcelAsyncUtil.Observe(functionName, args, delegate
112+
{
113+
var tcs = new TaskCompletionSource<object>();
114+
EnqueueAsyncCall(tcs, functionName, args);
115+
return new TaskExcelObservable(tcs.Task);
116+
});
117+
}
118+
119+
// Will only run on the main thread
120+
void EnqueueAsyncCall(TaskCompletionSource<object> taskCompletion, string functionName, object[] args)
121+
{
122+
lock (_lock)
123+
{
124+
_currentBatch.Add(new AsyncCall(taskCompletion, functionName, args));
125+
126+
// Check if the batch size has been reached, schedule it to be run
127+
if (_currentBatch.Count >= _maxBatchSize)
128+
{
129+
// This won't run the batch immediately, but will ensure that the current batch (containing this call) will run soon.
130+
ThreadPool.QueueUserWorkItem(state => RunBatch((List<AsyncCall>)state), _currentBatch);
131+
_currentBatch = new List<AsyncCall>();
132+
_batchTimer.Stop();
133+
}
134+
else
135+
{
136+
// We don't know if the batch containing the current call will run,
137+
// so ensure that a timer is started.
138+
if (!_batchTimer.Enabled)
139+
{
140+
_batchTimer.Start();
141+
}
142+
}
143+
}
144+
}
145+
146+
// Will run on a ThreadPool thread
147+
void TimerElapsed(object sender, ElapsedEventArgs e)
148+
{
149+
List<AsyncCall> batch;
150+
lock (_lock)
151+
{
152+
batch = _currentBatch;
153+
_currentBatch = new List<AsyncCall>();
154+
}
155+
RunBatch(batch);
156+
}
157+
158+
159+
// Will always run on a ThreadPool thread
160+
// Might be re-entered...
161+
// batch is allowed to be empty
162+
async void RunBatch(List<AsyncCall> batch)
163+
{
164+
// Maybe due to Timer re-entrancy we got an empty batch...?
165+
if (batch.Count == 0)
166+
{
167+
// No problem - just return
168+
return;
169+
}
170+
171+
try
172+
{
173+
var resultList = await _batchRunner(batch);
174+
if (resultList.Count != batch.Count)
175+
{
176+
throw new InvalidOperationException(string.Format("Batch result size incorrect. Batch Count: {0}, Result Count: {1}", batch.Count, resultList.Count));
177+
}
178+
179+
for (int i = 0; i < resultList.Count; i++)
180+
{
181+
batch[i].TaskCompletionSource.SetResult(resultList[i]);
182+
}
183+
}
184+
catch (Exception ex)
185+
{
186+
foreach (var call in batch)
187+
{
188+
call.TaskCompletionSource.SetException(ex);
189+
}
190+
}
191+
}
192+
193+
// Helper class to turn a task into an IExcelObservable that either returns the task result and completes, or pushes an Exception
194+
class TaskExcelObservable : IExcelObservable
195+
{
196+
readonly Task<object> _task;
197+
198+
public TaskExcelObservable(Task<object> task)
199+
{
200+
_task = task;
201+
}
202+
203+
public IDisposable Subscribe(IExcelObserver observer)
204+
{
205+
switch (_task.Status)
206+
{
207+
case TaskStatus.RanToCompletion:
208+
observer.OnNext(_task.Result);
209+
observer.OnCompleted();
210+
break;
211+
case TaskStatus.Faulted:
212+
observer.OnError(_task.Exception.InnerException);
213+
break;
214+
case TaskStatus.Canceled:
215+
observer.OnError(new TaskCanceledException(_task));
216+
break;
217+
default:
218+
var task = _task;
219+
// OK - the Task has not completed synchronously
220+
// And handle the Task completion
221+
task.ContinueWith(t =>
222+
{
223+
switch (t.Status)
224+
{
225+
case TaskStatus.RanToCompletion:
226+
observer.OnNext(t.Result);
227+
observer.OnCompleted();
228+
break;
229+
case TaskStatus.Faulted:
230+
observer.OnError(t.Exception.InnerException);
231+
break;
232+
case TaskStatus.Canceled:
233+
observer.OnError(new TaskCanceledException(t));
234+
break;
235+
}
236+
});
237+
break;
238+
}
239+
240+
return DefaultDisposable.Instance;
241+
}
242+
243+
// Helper class to make an empty IDisposable
244+
sealed class DefaultDisposable : IDisposable
245+
{
246+
public static readonly DefaultDisposable Instance = new DefaultDisposable();
247+
// Prevent external instantiation
248+
DefaultDisposable() { }
249+
public void Dispose() { }
250+
}
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)