Pull consumers are not re-subscribing on Reconnect of NATS connection #1347
Replies: 5 comments
-
| What version of the client. There is a new bug introduced in 2.21.3 | 
Beta Was this translation helpful? Give feedback.
-
| I am using client version 2.20.5 | 
Beta Was this translation helpful? Give feedback.
-
| Here is sample consumer code i am using import io.nats.client.*  | 
Beta Was this translation helpful? Give feedback.
-
| It works fine for me. Try to upgrade to the latest library. 
 import io.nats.client.*;
import io.nats.client.api.*;
public class Test1374 {
    public static void main(String[] args) throws Exception {
        Options options = new Options.Builder()
            .server(Options.DEFAULT_URL)
            .build();
        String stream = "stream1347";
        String subject = "subject1347";
        String conName = "con1347";
        try (Connection nc = Nats.connect(options)) {
            // delete the stream for a fresh start.
            // catch exception we don't care if it didn't exist
            try {
                nc.jetStreamManagement().deleteStream(stream);
            }
            catch (Exception ignore) {}
            // make the stream
            nc.jetStreamManagement()
                .addStream(
                    StreamConfiguration.builder()
                        .name(stream)
                        .subjects(subject)
                        .storageType(StorageType.File) // HAS TO BE FILE
                        .build());
            // consume options are not necessary for the example
            ConsumeOptions consumeOptions = ConsumeOptions.builder()
                .batchSize(15000)
                .expiresIn(1000)
                .build();
            StreamContext streamContext = nc.getStreamContext(stream);
            ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(
                ConsumerConfiguration.builder()
                    .name(conName)
                    .deliverPolicy(DeliverPolicy.LastPerSubject)
                    .ackPolicy(AckPolicy.None)
                    .filterSubjects(">")
                    .build());
            MessageHandler handler = msg -> System.out.println("Received message: " + new String(msg.getData()));
            //noinspection resource this should really be done in a try-resource, but it matches the original code.
            MessageConsumer mcon = consumerContext.consume(consumeOptions, handler);
            JetStream js = nc.jetStream();
            int x = 0;
            //noinspection InfiniteLoopStatement just run until the user kills the program.
            while (true) {
                try {
                    String data = "Data" + (++x);
                    js.publish(subject, data.getBytes());
                    System.out.println("Published message: " + data);
                    //noinspection BusyWait
                    Thread.sleep(250);
                }
                catch (Exception e) {
                    // publish can fail during disconnect
                    // try again in a bit
                    //noinspection BusyWait
                    Thread.sleep(1000);
                }
            }
        }
    }
} | 
Beta Was this translation helpful? Give feedback.
-
| As far as I can tell the current release works and I've made sure in the upcoming release. | 
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi,
I am creating a consumer as shown in the example code
https://github.com/nats-io/nats.java/blob/main/src/examples/java/io/nats/examples/jetstream/simple/MessageConsumerExample.java
But i see that on re-connection of NATS server, the consumer is not getting resubscribed and i keep getting heartbeatAlarm errors. Is this the expected behavior and do i need to handle the reconnect event and re-create the consumer again?
Thanks,
Gopinadh.
Beta Was this translation helpful? Give feedback.
All reactions