Skip to content

Commit 51b2da1

Browse files
committed
Rename to queue execution mode
1 parent 830da73 commit 51b2da1

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

source/Halibut.Tests/LocalExecutionModeFixture.cs renamed to source/Halibut.Tests/RPCOverQueueExecutionModeFixture.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121

2222
namespace Halibut.Tests
2323
{
24-
public class LocalExecutionModeFixture : BaseTest
24+
public class RPCOverQueueExecutionModeFixture : BaseTest
2525
{
2626
[RedisTest]
2727
[Test]
28-
public async Task SimpleLocalExecutionExample()
28+
public async Task SimpleRPCOverQueueExecutionExample()
2929
{
3030
var services = GetDelegateServiceFactory();
3131
var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
@@ -55,7 +55,7 @@ public async Task SimpleLocalExecutionExample()
5555
using var workerCts = new CancellationTokenSource();
5656
var pollingTask = Task.Run(async () =>
5757
{
58-
await worker.PollLocalAsync(new Uri("local://test-worker"), workerCts.Token);
58+
await worker.PollForRPCOverQueueAsync(new Uri("local://test-worker"), workerCts.Token);
5959
}, workerCts.Token);
6060

6161
// Client creates proxy to local://test-worker and makes request

source/Halibut/HalibutRuntime.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace Halibut
2121
{
2222
public class HalibutRuntime : IHalibutRuntime
2323
{
24+
public const string QueueEndpointScheme = "local";
2425
public static readonly string DefaultFriendlyHtmlPageContent = "<html><body><p>Hello!</p></body></html>";
2526
readonly ConcurrentDictionary<Uri, IPendingRequestQueue> queues = new();
2627
readonly IPendingRequestQueueFactory queueFactory;
@@ -199,17 +200,17 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c
199200
pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy));
200201
}
201202

202-
public async Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken)
203+
public async Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken)
203204
{
204-
if (localEndpoint.Scheme.ToLowerInvariant() != "local")
205+
if (queueOnlyEndpoint.Scheme.ToLowerInvariant() != QueueEndpointScheme)
205206
{
206-
throw new ArgumentException($"Only 'local://' endpoints are supported. Provided: {localEndpoint.Scheme}://", nameof(localEndpoint));
207+
throw new ArgumentException($"Only 'queue://' endpoints are supported. Provided: {queueOnlyEndpoint.Scheme}://", nameof(queueOnlyEndpoint));
207208
}
208209

209-
var queue = GetQueue(localEndpoint);
210-
var log = logs.ForEndpoint(localEndpoint);
210+
var queue = GetQueue(queueOnlyEndpoint);
211+
var log = logs.ForEndpoint(queueOnlyEndpoint);
211212

212-
log.Write(EventType.MessageExchange, $"Starting local polling for endpoint: {localEndpoint}");
213+
log.Write(EventType.MessageExchange, $"Starting queue polling for endpoint: {queueOnlyEndpoint}");
213214

214215
while (!cancellationToken.IsCancellationRequested)
215216
{
@@ -226,7 +227,7 @@ public async Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellati
226227
}
227228
catch (Exception ex)
228229
{
229-
log.WriteException(EventType.Error, $"Error executing local request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex);
230+
log.WriteException(EventType.Error, $"Error executing queue request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex);
230231
response = ResponseMessage.FromException(request.RequestMessage, ex);
231232
}
232233

@@ -235,17 +236,17 @@ public async Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellati
235236
}
236237
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
237238
{
238-
log.Write(EventType.MessageExchange, $"Local polling cancelled for endpoint: {localEndpoint}");
239+
log.Write(EventType.MessageExchange, $"Queue polling cancelled for endpoint: {queueOnlyEndpoint}");
239240
break;
240241
}
241242
catch (Exception ex)
242243
{
243-
log.WriteException(EventType.Error, $"Error in local polling loop for endpoint: {localEndpoint}", ex);
244+
log.WriteException(EventType.Error, $"Error in queue polling loop for endpoint: {queueOnlyEndpoint}", ex);
244245
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
245246
}
246247
}
247248

248-
log.Write(EventType.MessageExchange, $"Local polling stopped for endpoint: {localEndpoint}");
249+
log.Write(EventType.MessageExchange, $"Queue polling stopped for endpoint: {queueOnlyEndpoint}");
249250
}
250251

251252
public async Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken)
@@ -291,9 +292,7 @@ async Task<ResponseMessage> SendOutgoingRequestAsync(RequestMessage request, Met
291292
response = await SendOutgoingHttpsRequestAsync(request, cancellationToken).ConfigureAwait(false);
292293
break;
293294
case "poll":
294-
response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false);
295-
break;
296-
case "local":
295+
case QueueEndpointScheme:
297296
response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false);
298297
break;
299298
default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme);

source/Halibut/IHalibutRuntime.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface IHalibutRuntime : IAsyncDisposable, IDisposable
1717
int Listen(IPEndPoint endpoint);
1818
void ListenWebSocket(string endpoint);
1919
void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken cancellationToken);
20-
Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken);
20+
Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken);
2121

2222
Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken);
2323
Task<ServiceEndPoint> DiscoverAsync(ServiceEndPoint endpoint, CancellationToken cancellationToken);

0 commit comments

Comments
 (0)