Skip to content

Commit 830da73

Browse files
committed
Respect Dequeue Async CT
1 parent 7693db0 commit 830da73

File tree

2 files changed

+26
-25
lines changed

2 files changed

+26
-25
lines changed

source/Halibut.Tests/LocalExecutionModeFixture.cs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ public async Task SimpleLocalExecutionExample()
2929
{
3030
var services = GetDelegateServiceFactory();
3131
var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
32-
timeoutsAndLimits = new HalibutTimeoutsAndLimits();
33-
34-
// Use a shared queue factory so client and worker share the same queue
35-
var queueFactory = new PendingRequestQueueFactoryAsync(timeoutsAndLimits, new LogFactory());
3632

3733
var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace));
3834

@@ -44,22 +40,21 @@ public async Task SimpleLocalExecutionExample()
4440

4541
await using var client = new HalibutRuntimeBuilder()
4642
.WithServerCertificate(Certificates.Octopus)
47-
.WithPendingRequestQueueFactory(RedisFactory())
43+
.WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory))
4844
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
4945
.Build();
5046

5147
await using var worker = new HalibutRuntimeBuilder()
5248
.WithServerCertificate(Certificates.TentaclePolling)
5349
.WithServiceFactory(services)
54-
.WithPendingRequestQueueFactory(RedisFactory())
50+
.WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory))
5551
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
5652
.Build();
5753

5854
// Start worker polling for local://test-worker
5955
using var workerCts = new CancellationTokenSource();
6056
var pollingTask = Task.Run(async () =>
6157
{
62-
//await Task.Delay(TimeSpan.FromSeconds(10));
6358
await worker.PollLocalAsync(new Uri("local://test-worker"), workerCts.Token);
6459
}, workerCts.Token);
6560

@@ -72,23 +67,29 @@ public async Task SimpleLocalExecutionExample()
7267

7368
await workerCts.CancelAsync();
7469

75-
Func<QueueMessageSerializer, IPendingRequestQueueFactory> RedisFactory()
70+
await pollingTask;
71+
}
72+
73+
Func<QueueMessageSerializer, IPendingRequestQueueFactory> RedisFactory(
74+
Guid preSharedGuid,
75+
DisposableCollection disposables,
76+
TestContextLogCreator log,
77+
CachingLogFactory logFactory)
78+
{
79+
return msgSer =>
7680
{
77-
return msgSer =>
78-
{
79-
var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid);
80-
disposables.AddAsyncDisposable(redisFacade);
81-
var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher"));
82-
disposables.AddAsyncDisposable(watchForRedisLosingAllItsData);
81+
var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid);
82+
disposables.AddAsyncDisposable(redisFacade);
83+
var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher"));
84+
disposables.AddAsyncDisposable(watchForRedisLosingAllItsData);
8385

84-
return new RedisPendingRequestQueueFactory(msgSer,
85-
new InMemoryStoreDataStreamsForDistributedQueues(),
86-
watchForRedisLosingAllItsData,
87-
new HalibutRedisTransport(redisFacade),
88-
new HalibutTimeoutsAndLimitsForTestsBuilder().Build(),
89-
logFactory);
90-
};
91-
}
86+
return new RedisPendingRequestQueueFactory(msgSer,
87+
new InMemoryStoreDataStreamsForDistributedQueues(),
88+
watchForRedisLosingAllItsData,
89+
new HalibutRedisTransport(redisFacade),
90+
new HalibutTimeoutsAndLimitsForTestsBuilder().Build(),
91+
logFactory);
92+
};
9293
}
9394

9495
static DelegateServiceFactory GetDelegateServiceFactory()

source/Halibut/Queue/Redis/RedisPendingRequestQueue.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ async Task<bool> TryClearRequestFromQueue(RedisPendingRequest redisPending)
380380
// It will kill the TCP connection, which will force re-connect (in perhaps a backoff function)
381381
// This could result in connecting to a node that is actually connected to redis. It could also
382382
// cause a cascade of failure from high load.
383-
var pending = await DequeueNextAsync();
383+
var pending = await DequeueNextAsync(cancellationToken);
384384
if (pending == null) return null;
385385

386386
var pendingRequest = pending.Value.Item1;
@@ -491,9 +491,9 @@ public async Task ApplyResponse(ResponseMessage response, Guid requestActivityId
491491
}
492492
}
493493

494-
async Task<(RequestMessage, RequestDataStreamsTransferProgress)?> DequeueNextAsync()
494+
async Task<(RequestMessage, RequestDataStreamsTransferProgress)?> DequeueNextAsync(CancellationToken cancellationToken)
495495
{
496-
await using var cts = new CancelOnDisposeCancellationToken(queueToken);
496+
await using var cts = new CancelOnDisposeCancellationToken(queueToken, cancellationToken);
497497
try
498498
{
499499
hasItemsForEndpoint.Reset();

0 commit comments

Comments
 (0)