Replies: 2 comments
-
ANSWER FROM SLACK: I just happened to work on a similar POC with dotnet 6. Relevant code snippets
Worker.cs
NatsMessaging.cs
|
Beta Was this translation helpful? Give feedback.
0 replies
-
The above example describes a push-based consumer. For pull-based consumer I would recommend replacing Worker.cs: namespace Service;
public class Worker : IHostedService
{
private readonly ILogger<Worker> _logger;
private readonly IMessaging _messaging;
public Worker(ILogger<Worker> logger, IMessaging messaging)
{
_logger = logger;
_messaging = messaging;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var msgs = _messaging.Subscribe();
_logger.LogWarning("Add logic to handle the messages: " + msgs);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_messaging.Dispose();
return Task.CompletedTask;
}
} NatsMessaging.cs: public class NatsMessaging : IMessaging
{
private readonly ILogger<NatsMessaging> _logger;
private IConnection? _connection;
private bool _disposed;
public NatsMessaging(ILogger<NatsMessaging> logger)
{
_logger = logger;
}
public IMessage Subscribe()
{
var subject = "my-subject";
var service = "my-service";
EnsureConnection();
EnsureStream("my-stream", subject);
var consumerConf = ConsumerConfiguration.Builder()
.WithDurable(service)
.Build();
var subscribeOptions = PullSubscribeOptions.Builder()
.WithConfiguration(consumerConf)
.Build();
// This will create the stream consumer in NATS if it does not already exist.
var subscription = _connection!.CreateJetStreamContext().PullSubscribe(
subject,
subscribeOptions);
var msgs = subscription.Fetch(1, 1000); // TODO: Sane?
if (msgs.Count() != 1)
{
return null;
}
var msg = msgs[0];
return new NatsMessage { Subject = msg.Subject, Body = msg.Body };
}
}
public interface IMessaging : IDisposable
{
IMessage Subscribe();
}
public interface IMessage
{
string Subject { get; set; }
string Body { get; set; }
}
public class NatsMessage : IMessage
{
public string Subject { get; set; }
public string Body { get; set; }
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
COPIED FROM SLACK:
Our org is looking to move more towards event based service integration, and I’m busy trying to spin up a proof of concept using JetStream & dotnet. The local POC is using docker-compose. Down the line, it makes sense to have dedicated services for the subscribers, but that the moment I’m wanting to simply add a stream subscription to the aspnet minimal API project as a HostedService / BackgroundService, so API calls to one of the services could publish events that other services could consume, but at the same time background workers could listen & consume messages from other subjects.
I’ve looked at the Samples, but haven’t seen anything that’s using a BackgroundService as a long running subscription, so was wondering if anyone could hep me out. I’m also quite new to dotnet and the fundamentals of BackgroundServices so that’s probably making it a bit harder for me to simply re-implement the examples from the samples directory and to get it to work.
I guess the simplest version of what I need to try work out is how can I add a long running JetStream subscription to the Worker class created when running dotnet new worker --name <Project.Name> (some microsoft docs here: https://docs.microsoft.com/en-us/dotnet/core/extensions/queue-service).
Beta Was this translation helpful? Give feedback.
All reactions