Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions Gofer.NET.Tests/GivenARedisTaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;

using FluentAssertions;

using Gofer.NET.Utils;

using Xunit;

namespace Gofer.NET.Tests
Expand All @@ -22,8 +25,8 @@ public override string ToString()
return Value;
}
}
private class CustomException : Exception {}

private class CustomException : Exception { }

[Fact]
public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
Expand All @@ -40,13 +43,16 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
Tuple.Create<Expression<Action>, string>(
actionExp,
str);

// Action to expected result
var delgates = new Tuple<Expression<Action>, string>[]
{
// Exception Argument
TC(() => ExceptionFunc(new Exception(), semaphoreFile), new Exception().ToString()),
TC(() => ExceptionFunc(new CustomException(), semaphoreFile), new CustomException().ToString()),

// Cancelation Argument
TC(() => CancellationFunc(default, semaphoreFile), new CancellationToken().ToString()),

// Integer Arguments
TC(() => IntFunc(int.MaxValue, semaphoreFile), int.MaxValue.ToString()),
Expand Down Expand Up @@ -99,29 +105,29 @@ public async Task ItCapturesArgumentsPassedToEnqueuedDelegate()
TC(() => AsyncFunc(semaphoreFile).T(), "async"),
TC(() => AsyncFuncThatReturnsString(semaphoreFile).T(), "async")
};


foreach (var tup in delgates)
{
var actionExpr = tup.Item1;
var expectedString = tup.Item2;

File.Delete(semaphoreFile);
await testFixture.TaskQueue.Enqueue(actionExpr);

await testFixture.TaskQueue.Enqueue(actionExpr);
await testFixture.TaskQueue.ExecuteNext();

File.ReadAllText(semaphoreFile).Should().Be(expectedString);
}

File.Delete(semaphoreFile);
}

[Fact]
public async Task ItEnqueuesAndReceivesDelegatesThatAreRunnable()
{
var testFixture = new TaskQueueTestFixture(nameof(ItEnqueuesAndReceivesDelegatesThatAreRunnable));

testFixture.EnsureSemaphoreDoesntExist();
await testFixture.PushPopExecuteWriteSemaphore();
testFixture.EnsureSemaphore();
Expand All @@ -133,18 +139,18 @@ public async Task ItsTasksAreConsumedOnlyOnceByMultipleConsumers()
// Higher numbers here increase confidence
var numberOfJobs = 16;
var numberOfConsumers = 4;

var sharedTaskQueueName = nameof(ItsTasksAreConsumedOnlyOnceByMultipleConsumers);
var consumers = Enumerable.Range(0, numberOfConsumers)
.Select(_ => new TaskQueueTestFixture(sharedTaskQueueName)).ToList();

var semaphoreFiles = new List<string>();
for(int i=0;i < numberOfJobs;++i)
for (int i = 0; i < numberOfJobs; ++i)
{
var path = Path.GetTempFileName();
File.Delete(path);
semaphoreFiles.Add(path);

var sharedTaskQueue = consumers[0].TaskQueue;
await sharedTaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(path));
}
Expand Down Expand Up @@ -175,25 +181,25 @@ public async Task AsyncFunc(string semaphoreFile)
{
// Wait to ensure async waiting is happening.
await Task.Delay(1000);

TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async");
}

public async Task<string> AsyncFuncThatReturnsString(string semaphoreFile)
{
// Wait to ensure async waiting is happening.
await Task.Delay(1000);

TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, "async");

return "async";
}

public void NullableTypeFunc(DateTime? dateTime, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
}

public void DateTimeFunc(DateTime dateTime, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, dateTime);
Expand All @@ -203,37 +209,37 @@ public void IntFunc(int num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void NullableIntFunc(int? num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num ?? -1);
}

public void LongFunc(long num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void FloatFunc(float num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void BoolFunc(bool num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void DoubleFunc(double num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void StringFunc(string num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
}

public void ObjectFunc(object num, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, num);
Expand All @@ -249,21 +255,26 @@ public void ExceptionFunc(Exception exc, string semaphoreFile)
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, exc);
}

public void CancellationFunc(CancellationToken ct, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, ct);
}

public void TypeFunc(Type typeArg, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, typeArg?.ToString() ?? "null");
}

public void ArrayFunc1(string[] nums, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums));
}

public void ArrayFunc2(int[] nums, string semaphoreFile)
{
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, string.Join(",", nums));
}

public void ArrayFunc3(int?[] nums, string semaphoreFile)
{
var str = "";
Expand All @@ -274,7 +285,7 @@ public void ArrayFunc3(int?[] nums, string semaphoreFile)
str += num?.ToString() ?? "null";
first = false;
}

TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, str);
}
}
Expand Down
53 changes: 38 additions & 15 deletions Gofer.NET.Tests/GivenATaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using FluentAssertions;
using Gofer.NET.Utils;

using Xunit;

namespace Gofer.NET.Tests
Expand All @@ -15,11 +16,11 @@ public class GivenATaskClient
public async Task ItContinuesListeningWhenATaskThrowsAnException()
{
var waitTime = 5000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);
var semaphoreFile = Path.GetTempFileName();

await taskClient.TaskQueue.Enqueue(() => Throw());
await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile));

Expand All @@ -28,37 +29,59 @@ public async Task ItContinuesListeningWhenATaskThrowsAnException()

taskClient.CancelListen();
await task;


TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a bug in the test, good catch!

}

[Fact]
public async Task ItStopsOnCancellation()
{
var semaphoreFile = Path.GetTempFileName();

var waitTime = 2000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);
var cancellation = new CancellationTokenSource();

await taskClient.TaskQueue.Enqueue(() => TaskQueueTestFixture.WaitForCancellationAndWriteSemaphore(semaphoreFile, default));

var task = Task.Run(async () => await taskClient.Listen(cancellation.Token), CancellationToken.None);
await Task.Delay(waitTime, CancellationToken.None);
cancellation.Cancel();
Copy link
Author

@ig-sinicyn ig-sinicyn Jul 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enqueued task (WaitForTaskClientCancellationAndWriteSemaphore) will be canceled at this moment.

await Task.Delay(waitTime, CancellationToken.None);
await task;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a bug is introduced, and the task fails to exit on cancellation will this introduce a hanging test?


TaskQueueTestFixture.EnsureSemaphore(semaphoreFile);
}

[Fact]
public async Task ItDoesNotDelayScheduledTaskPromotionWhenRunningLongTasks()
{
var waitTime = 4000;

var taskQueue = TaskQueueTestFixture.UniqueRedisTaskQueue();
var taskClient = new TaskClient(taskQueue);

var semaphoreFile = Path.GetTempFileName();
File.Delete(semaphoreFile);
File.Exists(semaphoreFile).Should().BeFalse();

await taskClient.TaskQueue.Enqueue(() => Wait(waitTime));

await taskClient.TaskScheduler.AddScheduledTask(
() => TaskQueueTestFixture.WriteSemaphore(semaphoreFile),
TimeSpan.FromMilliseconds(waitTime / 4));

var task = Task.Run(async () => await taskClient.Listen());

await Task.Delay(waitTime / 2);

// Ensure we did not run the scheduled task
File.Exists(semaphoreFile).Should().BeFalse();

var dequeuedScheduledTask = await taskQueue.Dequeue();

File.Exists(semaphoreFile).Should().BeFalse();
dequeuedScheduledTask.Should().NotBeNull();
dequeuedScheduledTask.MethodName.Should().Be(nameof(TaskQueueTestFixture.WriteSemaphore));
Expand All @@ -83,19 +106,19 @@ public async Task ItExecutesImmediateAndScheduledTasksInOrder()
File.Delete(semaphoreFile);
File.Exists(semaphoreFile).Should().BeFalse();

for (var i=0; i<immediateTasks; ++i)
for (var i = 0; i < immediateTasks; ++i)
{
await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i+1).ToString()));
await taskClient.TaskQueue.Enqueue(() =>
TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (i + 1).ToString()));
}

for (var i=0; i<scheduledTasks; ++i)
for (var i = 0; i < scheduledTasks; ++i)
{
await taskClient.TaskScheduler.AddScheduledTask(
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks+i+1).ToString()),
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement*i)));
() => TaskQueueTestFixture.WriteSemaphoreValue(semaphoreFile, (immediateTasks + i + 1).ToString()),
TimeSpan.FromMilliseconds(scheduledTasksStart + (scheduledTasksIncrement * i)));
}

var task = Task.Run(async () => await taskClient.Listen());
Thread.Sleep(scheduledTasks * scheduledTasksIncrement + 2000);

Expand Down
Loading