Replies: 7 comments 9 replies
-
|
Beta Was this translation helpful? Give feedback.
-
Converted to discussion |
Beta Was this translation helpful? Give feedback.
-
@Exagram Can you please repost your latest code, particularly the consumer configuration and your app loop |
Beta Was this translation helpful? Give feedback.
-
One thing to reduce confusion is to have the stream and subjects be different names. I know it's legal, but it's confusing. |
Beta Was this translation helpful? Give feedback.
-
Take a look at this example and see if it helps: using System;
using System.Text;
using System.Threading;
using NATS.Client;
using NATS.Client.JetStream;
namespace NATSExamples
{
internal static class JetStreamStarter
{
static void Main(string[] args)
{
using (IConnection connection1 = new ConnectionFactory().CreateConnection("nats://localhost:4222"))
using (IConnection connection2 = new ConnectionFactory().CreateConnection("nats://localhost:4222"))
{
IJetStreamManagement jsm = connection1.CreateJetStreamManagementContext();
// start off with a clean stream for the example
try { jsm.DeleteStream("STREAM"); } catch (NATSJetStreamException) { }
jsm.AddStream(StreamConfiguration.Builder()
.WithName("STREAM")
.WithSubjects("SUBJECT.>")
.Build());
jsm.AddOrUpdateConsumer("STREAM", ConsumerConfiguration.Builder()
.WithDurable("WORKER")
.WithFilterSubject("SUBJECT.A")
.Build()
);
Console.WriteLine();
int count = 10;
new Thread(() =>
{
IJetStream context2 = connection2.CreateJetStreamContext();
PullSubscribeOptions options = PullSubscribeOptions.BindTo("STREAM", "WORKER");
// subscribe subject must be an exact match to filter subject
IJetStreamPullSubscription subscription = context2.PullSubscribe("SUBJECT.A", options);
int red = 0;
while (red < count)
{
ConsumerInfo loopInfo = subscription.GetConsumerInformation();
Console.WriteLine(
"\nConsumerInfo{" +
"Name='" + loopInfo.Name + '\'' +
", NumPending=" + loopInfo.NumPending +
", NumWaiting=" + loopInfo.NumWaiting +
", Delivered=" + loopInfo.Delivered +
", AckFloor=" + loopInfo.AckFloor +
'}');
subscription.PullExpiresIn(1, 500);
try
{
Msg msg = subscription.NextMessage(600);
Console.WriteLine("Got Message: " + msg.Subject + " " + Encoding.UTF8.GetString(msg.Data));
red++;
}
catch (NATSTimeoutException)
{
Console.WriteLine("Normal timeout waiting for message. Probably no messages currently.");
}
}
ConsumerInfo afterInfo = subscription.GetConsumerInformation();
Console.WriteLine(
"\nConsumerInfo{" +
"Name='" + afterInfo.Name + '\'' +
", NumPending=" + afterInfo.NumPending +
", NumWaiting=" + afterInfo.NumWaiting +
", Delivered=" + afterInfo.Delivered +
", AckFloor=" + afterInfo.AckFloor +
"}\n\n");
}).Start();
IJetStream context1 = connection2.CreateJetStreamContext();
for (int x = 1; x <= count; x++)
{
context1.Publish("SUBJECT.A", Encoding.UTF8.GetBytes("DATA-A-" + x));
context1.Publish("SUBJECT.B", Encoding.UTF8.GetBytes("DATA-B-" + x));
Thread.Sleep(2000);
}
}
}
}
} |
Beta Was this translation helpful? Give feedback.
-
Good news! I was able to get the above console tests to pass as well as my .NET web app consumer:
Unfortunately I wasn't able to figure out the root cause. |
Beta Was this translation helpful? Give feedback.
-
Update - I might not be receiving messages because of exceeding NATS Server resources: NATS Client 0.14.5 error:
NATS Server error:
Diagnosis:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I am keeping a NATS connection for a couple of hours (~1 day). Suddenly I'm getting issues connecting to it. Restarting both NATS server and my .NET application doesn't fix it. Thoughts?
In Powershell I ran the server:
My .NET code using the default connection logic:
Exception thrown on line
return connectionFactory.CreateConnection(options);
:I moved away from .NET to two Powershell windows:
nats publish "filter.blah.test" "{}"
nats consumer next STREAM WORKER
Observations through
nats consumer info STREAM WORKER
:nats consumer next STREAM WORKER
:Waiting Pulls
goes up from 70 to 72 (up by 2). What are Waiting Pulls? I notice that on eachnext
they increment by random factors.Beta Was this translation helpful? Give feedback.
All reactions