Skip to content

Commit 0feb5d3

Browse files
Added extra Batch-Join test.
1 parent 64dbf4a commit 0feb5d3

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

Open.ChannelExtensions.Tests/BatchTests.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System.Runtime.CompilerServices;
2+
using System.Threading.Channels;
3+
using System.Threading;
24

35
namespace Open.ChannelExtensions.Tests;
46

@@ -407,4 +409,68 @@ public static async IAsyncEnumerable<IList<T>> ReadBatchEnumerableAsyncBakedIn<T
407409
if (item?.Count > 0) yield return item;
408410
}
409411
}
412+
413+
[Fact]
414+
public static async Task AnotherComplexBatchJoinScenario()
415+
{
416+
const int SIZE = 300000;
417+
const long HTTP_BATCH_TIMEOUT = 10000;
418+
const int MAX_PARALLELISM_HTTP = 8;
419+
const int MAX_CACHE_WRITE_PARALLELISM = 8;
420+
421+
using var cts = new CancellationTokenSource(30_000);
422+
var cancellationToken = cts.Token;
423+
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(20) { SingleReader = false, SingleWriter = true });
424+
425+
int remaining = SIZE;
426+
_ = Task.Run(async () =>
427+
{
428+
int n = 50;
429+
430+
while(remaining > 0)
431+
{
432+
await Task.Delay(20);
433+
for (int i = 0; remaining > 0 && i < n; i++)
434+
{
435+
await channel.Writer.WriteAsync(remaining, cancellationToken);
436+
remaining--;
437+
}
438+
439+
n += 15;
440+
}
441+
442+
await Task.Delay(30);
443+
channel.Writer.Complete();
444+
});
445+
446+
int count = 0;
447+
var total = await channel.Reader
448+
.Batch(100)
449+
.WithTimeout(HTTP_BATCH_TIMEOUT)
450+
.PipeAsync(
451+
MAX_PARALLELISM_HTTP,
452+
async x =>
453+
{
454+
await Task.Delay(50);
455+
return x.ConvertAll(x => x * 2);
456+
},
457+
cancellationToken: cancellationToken
458+
)
459+
.Join()
460+
.Batch(10)
461+
.WithTimeout(HTTP_BATCH_TIMEOUT)
462+
.ReadAllConcurrently(
463+
MAX_CACHE_WRITE_PARALLELISM,
464+
x =>
465+
{
466+
Debug.WriteLine("Value: {0}", string.Join(' ', x));
467+
for (int i = 0; i < x.Count; i++)
468+
Interlocked.Increment(ref count);
469+
},
470+
cancellationToken);
471+
472+
remaining.Should().Be(0);
473+
channel.Reader.TryRead(out _).Should().BeFalse();
474+
count.Should().Be(SIZE);
475+
}
410476
}

0 commit comments

Comments
 (0)