-
Notifications
You must be signed in to change notification settings - Fork 48
Add support for RPC over queue (e.g. RPC over Redis) execution mode #698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
75b3543
941c327
88fb6a0
2ab6dda
5a584c0
d923006
12c1a29
77160c7
4e7bf15
7693db0
830da73
51b2da1
1403f35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| using System; | ||
| using System.Threading.Tasks; | ||
| using Halibut.ServiceModel; | ||
| using Halibut.Tests.Support; | ||
| using Halibut.Tests.TestServices; | ||
| using Halibut.Tests.TestServices.Async; | ||
| using Halibut.TestUtils.Contracts; | ||
| using NUnit.Framework; | ||
|
|
||
| namespace Halibut.Tests | ||
| { | ||
| public class HalibutExamplesFixture : BaseTest | ||
| { | ||
| [Test] | ||
| public async Task SimplePollingExample() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this since we lack simple examples of how to use halibit. |
||
| { | ||
| var services = GetDelegateServiceFactory(); | ||
| await using (var client = new HalibutRuntimeBuilder() | ||
| .WithServerCertificate(Certificates.Octopus) | ||
| .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build()) | ||
| .Build()) | ||
| await using (var pollingService = new HalibutRuntimeBuilder() | ||
| .WithServerCertificate(Certificates.TentaclePolling) | ||
| .WithServiceFactory(services) | ||
| .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build()) | ||
| .Build()) | ||
| { | ||
| var octopusPort = client.Listen(); | ||
| client.Trust(Certificates.TentaclePollingPublicThumbprint); | ||
|
|
||
| pollingService.Poll(new Uri("poll://alice"), new ServiceEndPoint("https://localhost:" + octopusPort, Certificates.OctopusPublicThumbprint, client.TimeoutsAndLimits), CancellationToken); | ||
|
|
||
| var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(new ServiceEndPoint("poll://alice", null, client.TimeoutsAndLimits)); | ||
|
|
||
| await echo.SayHelloAsync("World"); | ||
| } | ||
| } | ||
|
|
||
| static DelegateServiceFactory GetDelegateServiceFactory() | ||
| { | ||
| var services = new DelegateServiceFactory(); | ||
| services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService()); | ||
| return services; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| #if NET8_0_OR_GREATER | ||
| using System; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using FluentAssertions; | ||
| using Halibut.Diagnostics; | ||
| using Halibut.Logging; | ||
| using Halibut.Queue; | ||
| using Halibut.Queue.Redis; | ||
| using Halibut.Queue.Redis.RedisDataLossDetection; | ||
| using Halibut.Queue.Redis.RedisHelpers; | ||
| using Halibut.ServiceModel; | ||
| using Halibut.Tests.Queue.Redis.Utils; | ||
| using Halibut.Tests.Support; | ||
| using Halibut.Tests.Support.Logging; | ||
| using Halibut.Tests.TestServices; | ||
| using Halibut.Tests.TestServices.Async; | ||
| using Halibut.TestUtils.Contracts; | ||
| using NUnit.Framework; | ||
| using DisposableCollection = Halibut.Util.DisposableCollection; | ||
|
|
||
| namespace Halibut.Tests | ||
| { | ||
| public class RPCOverQueueExecutionModeFixture : BaseTest | ||
| { | ||
| [RedisTest] | ||
| [Test] | ||
| public async Task SimpleRPCOverQueueExecutionExample() | ||
| { | ||
| var services = GetDelegateServiceFactory(); | ||
| var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); | ||
|
|
||
| var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace)); | ||
|
|
||
| var log = new TestContextLogCreator("Redis", LogLevel.Fatal); | ||
|
|
||
| var preSharedGuid = Guid.NewGuid(); | ||
|
|
||
| await using var disposables = new DisposableCollection(); | ||
|
|
||
| await using var client = new HalibutRuntimeBuilder() | ||
| .WithServerCertificate(Certificates.Octopus) | ||
| .WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory)) | ||
| .WithHalibutTimeoutsAndLimits(timeoutsAndLimits) | ||
| .Build(); | ||
|
|
||
| await using var worker = new HalibutRuntimeBuilder() | ||
| .WithServerCertificate(Certificates.TentaclePolling) | ||
| .WithServiceFactory(services) | ||
| .WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory)) | ||
| .WithHalibutTimeoutsAndLimits(timeoutsAndLimits) | ||
| .Build(); | ||
|
|
||
| // Start worker polling | ||
| using var workerCts = new CancellationTokenSource(); | ||
| var pollingTask = Task.Run(async () => | ||
| { | ||
| await worker.PollForRPCOverQueueAsync(new Uri(HalibutRuntime.QueueEndpointScheme + "://test-worker"), workerCts.Token); | ||
| }, workerCts.Token); | ||
|
|
||
| // Client creates proxy and makes request | ||
| var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>( | ||
| new ServiceEndPoint(HalibutRuntime.QueueEndpointScheme + "://test-worker", null, client.TimeoutsAndLimits)); | ||
|
|
||
| var result = await echo.SayHelloAsync("World"); | ||
| result.Should().Be("World..."); | ||
|
|
||
| await workerCts.CancelAsync(); | ||
|
|
||
| await pollingTask; | ||
| } | ||
|
|
||
| Func<QueueMessageSerializer, IPendingRequestQueueFactory> RedisFactory( | ||
| Guid preSharedGuid, | ||
| DisposableCollection disposables, | ||
| TestContextLogCreator log, | ||
| CachingLogFactory logFactory) | ||
| { | ||
| return msgSer => | ||
| { | ||
| var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid); | ||
| disposables.AddAsyncDisposable(redisFacade); | ||
| var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher")); | ||
| disposables.AddAsyncDisposable(watchForRedisLosingAllItsData); | ||
|
|
||
| return new RedisPendingRequestQueueFactory(msgSer, | ||
| new InMemoryStoreDataStreamsForDistributedQueues(), | ||
| watchForRedisLosingAllItsData, | ||
| new HalibutRedisTransport(redisFacade), | ||
| new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), | ||
| logFactory); | ||
| }; | ||
| } | ||
|
|
||
| static DelegateServiceFactory GetDelegateServiceFactory() | ||
| { | ||
| var services = new DelegateServiceFactory(); | ||
| services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService()); | ||
| return services; | ||
| } | ||
| } | ||
| } | ||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,11 @@ | |
|
|
||
| namespace Halibut.Diagnostics | ||
| { | ||
| public static class InMemoryConnectionLogLimits | ||
| { | ||
| public static readonly int MaxLogEventsStored = 100; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be used in Octopus to limit returned logs in multi node setups. |
||
| } | ||
|
|
||
| internal class InMemoryConnectionLog : ILog | ||
| { | ||
| readonly string endpoint; | ||
|
|
@@ -57,7 +62,7 @@ void WriteInternal(LogEvent logEvent) | |
|
|
||
| events.Enqueue(logEvent); | ||
|
|
||
| while (events.Count > 100 && events.TryDequeue(out _)) { } | ||
| while (events.Count > InMemoryConnectionLogLimits.MaxLogEventsStored && events.TryDequeue(out _)) { } | ||
| } | ||
|
|
||
| static LogLevel GetLogLevel(LogEvent logEvent) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ namespace Halibut | |
| { | ||
| public class HalibutRuntime : IHalibutRuntime | ||
| { | ||
| public const string QueueEndpointScheme = "queue"; | ||
| public static readonly string DefaultFriendlyHtmlPageContent = "<html><body><p>Hello!</p></body></html>"; | ||
| readonly ConcurrentDictionary<Uri, IPendingRequestQueue> queues = new(); | ||
| readonly IPendingRequestQueueFactory queueFactory; | ||
|
|
@@ -199,6 +200,55 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c | |
| pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy)); | ||
| } | ||
|
|
||
| public async Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken) | ||
| { | ||
| if (queueOnlyEndpoint.Scheme.ToLowerInvariant() != QueueEndpointScheme) | ||
| { | ||
| throw new ArgumentException($"Only '{QueueEndpointScheme}://' endpoints are supported. Provided: {queueOnlyEndpoint.Scheme}://", nameof(queueOnlyEndpoint)); | ||
| } | ||
|
|
||
| var queue = GetQueue(queueOnlyEndpoint); | ||
| var log = logs.ForEndpoint(queueOnlyEndpoint); | ||
|
|
||
| log.Write(EventType.MessageExchange, $"Starting queue polling for endpoint: {queueOnlyEndpoint}"); | ||
|
|
||
| while (!cancellationToken.IsCancellationRequested) | ||
| { | ||
| try | ||
| { | ||
| var request = await queue.DequeueAsync(cancellationToken); | ||
|
|
||
| if (request != null) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like if there is no request we immediately poll again? Is it worth adding a delay when there is no request waiting? |
||
| { | ||
| ResponseMessage response; | ||
| try | ||
| { | ||
| response = await invoker.InvokeAsync(request.RequestMessage); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| log.WriteException(EventType.Error, $"Error executing queue request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex); | ||
| response = ResponseMessage.FromException(request.RequestMessage, ex); | ||
| } | ||
|
|
||
| await queue.ApplyResponse(response, request.RequestMessage.ActivityId); | ||
| } | ||
| } | ||
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| log.Write(EventType.MessageExchange, $"Queue polling cancelled for endpoint: {queueOnlyEndpoint}"); | ||
| break; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| log.WriteException(EventType.Error, $"Error in queue polling loop for endpoint: {queueOnlyEndpoint}", ex); | ||
| await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); | ||
| } | ||
| } | ||
|
|
||
| log.Write(EventType.MessageExchange, $"Queue polling stopped for endpoint: {queueOnlyEndpoint}"); | ||
| } | ||
|
|
||
| public async Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken) | ||
| { | ||
| return await DiscoverAsync(new ServiceEndPoint(uri, null, TimeoutsAndLimits), cancellationToken); | ||
|
|
@@ -242,6 +292,7 @@ async Task<ResponseMessage> SendOutgoingRequestAsync(RequestMessage request, Met | |
| response = await SendOutgoingHttpsRequestAsync(request, cancellationToken).ConfigureAwait(false); | ||
| break; | ||
| case "poll": | ||
| case QueueEndpointScheme: | ||
| response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); | ||
| break; | ||
| default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad you added this section to the readme. This feels like a significant extra capability you've added to halibut here. I understand the value and our use case for it, I'm interested to hear how you are feeling about the conceptual overhead that this new capability adds.
For example I notice that in this readme you talk about "nodes" for the first time. Halibut is a client and server communication framework, so what is a "node" in this client serve model.
Also I notice in the "how it works" section. You talk about a client making an RPC call to a server... however in our use case of collecting in memory halibut logs, isn't it actually a client sending an RPC call to another client?
Lets use this thread to discuss the big picture question "How do we feel about the conceptual overhead we've added" and once we've covered that I can start new threads to nit pick the examples I've listed to give weight to my question.
I'm happy for you to merge while this discussion is ongoing as long as we're committed to addressing this question 😄