Skip to content

Commit 7295b2b

Browse files
LukeButtersclaude
andauthored
Add support for RPC over queue (e.g. RPC over Redis) execution mode (#698)
* Add local execution mode for queue-based RPC without TCP This adds a new execution mode where RPC requests are executed locally on the worker node that dequeues work, rather than being proxied over TCP. Changes: - Add PollLocalAsync() method to HalibutRuntime for local queue polling - Support local:// URI scheme for local execution endpoints - Workers poll queue directly and execute RPCs locally via ServiceInvoker - Add comprehensive design document explaining architecture and usage - Add test fixture demonstrating local execution mode Benefits: - 10-100x lower latency (no TCP/SSL overhead) - True horizontal scaling via worker pools - Queue-agnostic (works with in-memory and Redis queues) - Backward compatible with existing code Usage: ```csharp // Worker var worker = new HalibutRuntime(serviceFactory); worker.Services.AddSingleton<IMyService>(new MyServiceImpl()); await worker.PollLocalAsync(new Uri("local://worker-pool-a"), cancellationToken); // Client var client = new HalibutRuntime(serviceFactory); var service = client.CreateAsyncClient<IMyService, IAsyncClientMyService>( new ServiceEndPoint("local://worker-pool-a", null)); await service.DoWorkAsync(); ``` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Add conditional compilation directive for .NET 8.0+ to LocalExecutionModeFixture The LocalExecutionModeFixture test uses Redis functionality (RedisFacadeBuilder, RedisPendingRequestQueueFactory) which is only available in .NET 8.0 or greater. Added #if NET8_0_OR_GREATER directive to match the pattern used in other Redis queue tests in the codebase. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Restore SimplePollingExample test to HalibutExamplesFixture Added back the SimplePollingExample test implementation that demonstrates basic polling mode with TCP. This test serves as a reference example for the Halibut polling pattern. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * . * Expose InMemoryConnectionLog max events as public static readonly field The hardcoded limit of 100 log events is now accessible via InMemoryConnectionLog.MaxLogEvents, allowing external code to reference this configuration value. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Make InMemoryConnectionLog class public Making the class public so that the MaxLogEvents field can be accessed from outside the assembly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * . * . * . * . * Respect Dequeue Async CT * Rename to queue execution mode * Rename local to queue --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent ad811db commit 7295b2b

File tree

7 files changed

+232
-5
lines changed

7 files changed

+232
-5
lines changed

README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,28 @@ using (var tentaclePolling = new HalibutRuntime(services, Certificates.Alice))
5555
}
5656
```
5757

58-
Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work.
58+
Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work.
59+
60+
## RPC over Redis
61+
62+
Halibut supports executing RPC commands between nodes using a shared Redis queue as the communication mechanism. In this mode, nodes do not communicate directly with each other - all communication flows through Redis. This is particularly useful for scenarios where multiple nodes can all access a shared Redis instance but cannot establish direct network connections to each other.
63+
64+
### How it works
65+
66+
When using RPC over Redis:
67+
68+
- The client queues RPC requests in Redis
69+
- The server polls Redis for pending requests
70+
- The server processes requests and writes responses back to Redis
71+
- The client retrieves responses from Redis
72+
73+
This decoupled communication model allows nodes behind firewalls, in different networks, or with restricted connectivity to communicate as long as they can all reach the shared Redis instance.
74+
75+
### Usage
76+
77+
See the [SimpleLocalExecutionExample](source/Halibut.Tests/LocalExecutionModeFixture.cs) test for a complete example of how to set up and use RPC over Redis.
78+
79+
For more detailed information about the Redis queue implementation, refer to the [Redis Queue documentation](docs/RedisQueue.md).
5980

6081
## Failure modes
6182

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Halibut.ServiceModel;
4+
using Halibut.Tests.Support;
5+
using Halibut.Tests.TestServices;
6+
using Halibut.Tests.TestServices.Async;
7+
using Halibut.TestUtils.Contracts;
8+
using NUnit.Framework;
9+
10+
namespace Halibut.Tests
11+
{
12+
public class HalibutExamplesFixture : BaseTest
13+
{
14+
[Test]
15+
public async Task SimplePollingExample()
16+
{
17+
var services = GetDelegateServiceFactory();
18+
await using (var client = new HalibutRuntimeBuilder()
19+
.WithServerCertificate(Certificates.Octopus)
20+
.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build())
21+
.Build())
22+
await using (var pollingService = new HalibutRuntimeBuilder()
23+
.WithServerCertificate(Certificates.TentaclePolling)
24+
.WithServiceFactory(services)
25+
.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build())
26+
.Build())
27+
{
28+
var octopusPort = client.Listen();
29+
client.Trust(Certificates.TentaclePollingPublicThumbprint);
30+
31+
pollingService.Poll(new Uri("poll://alice"), new ServiceEndPoint("https://localhost:" + octopusPort, Certificates.OctopusPublicThumbprint, client.TimeoutsAndLimits), CancellationToken);
32+
33+
var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(new ServiceEndPoint("poll://alice", null, client.TimeoutsAndLimits));
34+
35+
await echo.SayHelloAsync("World");
36+
}
37+
}
38+
39+
static DelegateServiceFactory GetDelegateServiceFactory()
40+
{
41+
var services = new DelegateServiceFactory();
42+
services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService());
43+
return services;
44+
}
45+
}
46+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#if NET8_0_OR_GREATER
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using FluentAssertions;
6+
using Halibut.Diagnostics;
7+
using Halibut.Logging;
8+
using Halibut.Queue;
9+
using Halibut.Queue.Redis;
10+
using Halibut.Queue.Redis.RedisDataLossDetection;
11+
using Halibut.Queue.Redis.RedisHelpers;
12+
using Halibut.ServiceModel;
13+
using Halibut.Tests.Queue.Redis.Utils;
14+
using Halibut.Tests.Support;
15+
using Halibut.Tests.Support.Logging;
16+
using Halibut.Tests.TestServices;
17+
using Halibut.Tests.TestServices.Async;
18+
using Halibut.TestUtils.Contracts;
19+
using NUnit.Framework;
20+
using DisposableCollection = Halibut.Util.DisposableCollection;
21+
22+
namespace Halibut.Tests
23+
{
24+
public class RPCOverQueueExecutionModeFixture : BaseTest
25+
{
26+
[RedisTest]
27+
[Test]
28+
public async Task SimpleRPCOverQueueExecutionExample()
29+
{
30+
var services = GetDelegateServiceFactory();
31+
var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
32+
33+
var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace));
34+
35+
var log = new TestContextLogCreator("Redis", LogLevel.Fatal);
36+
37+
var preSharedGuid = Guid.NewGuid();
38+
39+
await using var disposables = new DisposableCollection();
40+
41+
await using var client = new HalibutRuntimeBuilder()
42+
.WithServerCertificate(Certificates.Octopus)
43+
.WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory))
44+
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
45+
.Build();
46+
47+
await using var worker = new HalibutRuntimeBuilder()
48+
.WithServerCertificate(Certificates.TentaclePolling)
49+
.WithServiceFactory(services)
50+
.WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory))
51+
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
52+
.Build();
53+
54+
// Start worker polling
55+
using var workerCts = new CancellationTokenSource();
56+
var pollingTask = Task.Run(async () =>
57+
{
58+
await worker.PollForRPCOverQueueAsync(new Uri(HalibutRuntime.QueueEndpointScheme + "://test-worker"), workerCts.Token);
59+
}, workerCts.Token);
60+
61+
// Client creates proxy and makes request
62+
var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(
63+
new ServiceEndPoint(HalibutRuntime.QueueEndpointScheme + "://test-worker", null, client.TimeoutsAndLimits));
64+
65+
var result = await echo.SayHelloAsync("World");
66+
result.Should().Be("World...");
67+
68+
await workerCts.CancelAsync();
69+
70+
await pollingTask;
71+
}
72+
73+
Func<QueueMessageSerializer, IPendingRequestQueueFactory> RedisFactory(
74+
Guid preSharedGuid,
75+
DisposableCollection disposables,
76+
TestContextLogCreator log,
77+
CachingLogFactory logFactory)
78+
{
79+
return msgSer =>
80+
{
81+
var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid);
82+
disposables.AddAsyncDisposable(redisFacade);
83+
var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher"));
84+
disposables.AddAsyncDisposable(watchForRedisLosingAllItsData);
85+
86+
return new RedisPendingRequestQueueFactory(msgSer,
87+
new InMemoryStoreDataStreamsForDistributedQueues(),
88+
watchForRedisLosingAllItsData,
89+
new HalibutRedisTransport(redisFacade),
90+
new HalibutTimeoutsAndLimitsForTestsBuilder().Build(),
91+
logFactory);
92+
};
93+
}
94+
95+
static DelegateServiceFactory GetDelegateServiceFactory()
96+
{
97+
var services = new DelegateServiceFactory();
98+
services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService());
99+
return services;
100+
}
101+
}
102+
}
103+
#endif

source/Halibut/Diagnostics/InMemoryConnectionLog.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
namespace Halibut.Diagnostics
88
{
9+
public static class InMemoryConnectionLogLimits
10+
{
11+
public static readonly int MaxLogEventsStored = 100;
12+
}
13+
914
internal class InMemoryConnectionLog : ILog
1015
{
1116
readonly string endpoint;
@@ -57,7 +62,7 @@ void WriteInternal(LogEvent logEvent)
5762

5863
events.Enqueue(logEvent);
5964

60-
while (events.Count > 100 && events.TryDequeue(out _)) { }
65+
while (events.Count > InMemoryConnectionLogLimits.MaxLogEventsStored && events.TryDequeue(out _)) { }
6166
}
6267

6368
static LogLevel GetLogLevel(LogEvent logEvent)

source/Halibut/HalibutRuntime.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace Halibut
2121
{
2222
public class HalibutRuntime : IHalibutRuntime
2323
{
24+
public const string QueueEndpointScheme = "queue";
2425
public static readonly string DefaultFriendlyHtmlPageContent = "<html><body><p>Hello!</p></body></html>";
2526
readonly ConcurrentDictionary<Uri, IPendingRequestQueue> queues = new();
2627
readonly IPendingRequestQueueFactory queueFactory;
@@ -199,6 +200,55 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c
199200
pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy));
200201
}
201202

203+
public async Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken)
204+
{
205+
if (queueOnlyEndpoint.Scheme.ToLowerInvariant() != QueueEndpointScheme)
206+
{
207+
throw new ArgumentException($"Only '{QueueEndpointScheme}://' endpoints are supported. Provided: {queueOnlyEndpoint.Scheme}://", nameof(queueOnlyEndpoint));
208+
}
209+
210+
var queue = GetQueue(queueOnlyEndpoint);
211+
var log = logs.ForEndpoint(queueOnlyEndpoint);
212+
213+
log.Write(EventType.MessageExchange, $"Starting queue polling for endpoint: {queueOnlyEndpoint}");
214+
215+
while (!cancellationToken.IsCancellationRequested)
216+
{
217+
try
218+
{
219+
var request = await queue.DequeueAsync(cancellationToken);
220+
221+
if (request != null)
222+
{
223+
ResponseMessage response;
224+
try
225+
{
226+
response = await invoker.InvokeAsync(request.RequestMessage);
227+
}
228+
catch (Exception ex)
229+
{
230+
log.WriteException(EventType.Error, $"Error executing queue request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex);
231+
response = ResponseMessage.FromException(request.RequestMessage, ex);
232+
}
233+
234+
await queue.ApplyResponse(response, request.RequestMessage.ActivityId);
235+
}
236+
}
237+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
238+
{
239+
log.Write(EventType.MessageExchange, $"Queue polling cancelled for endpoint: {queueOnlyEndpoint}");
240+
break;
241+
}
242+
catch (Exception ex)
243+
{
244+
log.WriteException(EventType.Error, $"Error in queue polling loop for endpoint: {queueOnlyEndpoint}", ex);
245+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
246+
}
247+
}
248+
249+
log.Write(EventType.MessageExchange, $"Queue polling stopped for endpoint: {queueOnlyEndpoint}");
250+
}
251+
202252
public async Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken)
203253
{
204254
return await DiscoverAsync(new ServiceEndPoint(uri, null, TimeoutsAndLimits), cancellationToken);
@@ -242,6 +292,7 @@ async Task<ResponseMessage> SendOutgoingRequestAsync(RequestMessage request, Met
242292
response = await SendOutgoingHttpsRequestAsync(request, cancellationToken).ConfigureAwait(false);
243293
break;
244294
case "poll":
295+
case QueueEndpointScheme:
245296
response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false);
246297
break;
247298
default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme);

source/Halibut/IHalibutRuntime.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public interface IHalibutRuntime : IAsyncDisposable, IDisposable
1717
int Listen(IPEndPoint endpoint);
1818
void ListenWebSocket(string endpoint);
1919
void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken cancellationToken);
20+
Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken);
2021

2122
Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken);
2223
Task<ServiceEndPoint> DiscoverAsync(ServiceEndPoint endpoint, CancellationToken cancellationToken);

source/Halibut/Queue/Redis/RedisPendingRequestQueue.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ async Task<bool> TryClearRequestFromQueue(RedisPendingRequest redisPending)
381381
// It will kill the TCP connection, which will force re-connect (in perhaps a backoff function)
382382
// This could result in connecting to a node that is actually connected to redis. It could also
383383
// cause a cascade of failure from high load.
384-
var pending = await DequeueNextAsync();
384+
var pending = await DequeueNextAsync(cancellationToken);
385385
if (pending == null) return null;
386386

387387
var pendingRequest = pending.Value.Item1;
@@ -492,9 +492,9 @@ public async Task ApplyResponse(ResponseMessage response, Guid requestActivityId
492492
}
493493
}
494494

495-
async Task<(RequestMessage, RequestDataStreamsTransferProgress)?> DequeueNextAsync()
495+
async Task<(RequestMessage, RequestDataStreamsTransferProgress)?> DequeueNextAsync(CancellationToken cancellationToken)
496496
{
497-
await using var cts = new CancelOnDisposeCancellationToken(queueToken);
497+
await using var cts = new CancelOnDisposeCancellationToken(queueToken, cancellationToken);
498498
try
499499
{
500500
hasItemsForEndpoint.Reset();

0 commit comments

Comments
 (0)