Skip to content

ASB SDK has suspected memory leak on it's consumers #442

@amritsis

Description

@amritsis

We have seen memory increase in heap and Pod when there is no load on application. Our pod start with less memory usage but in few days max 1-2 days it just reach to maximum pod memory.
Our application is using ASB SDK version 5.17.1 and spring boot parent 3.3.5

com.azure.spring
spring-cloud-azure-starter-servicebus-jms

When we took heap dump we saw -
15 instances of ‘reactor.core.publisher.SinkOneMulticast’, loaded by ‘org.springframework.boot.loader.launch.LaunchedClassLoader @ 0x1003ee128218’ occupy 198,475,448 (66.35%) bytes.

Biggest instances:
•reactor.core.publisher.SinkOneMulticast @ 0x10000cb3ac10 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10000cb53758 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10000cb6c578 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000171afae8 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10002c131290 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326ab800 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326c4348 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326dd3a0 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326f5ee8 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x100032740460 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10003270edd0 - 13,471,144 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x100016034bc8 - 13,470,096 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10002c0ff9a8 - 13,470,096 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10002c118748 - 13,466,600 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10001f01d800 - 9,884,552 (3.30%) bytes.

This particular application has 15 consumers to consume from ASB topic and post back on ASB topic.
Our standard ASB consumer code is -

@PostConstruct
protected void init() {

    this.serviceBusProcessorClient = new ServiceBusClientBuilder()
        .connectionString(connectionString)
        .sessionProcessor()
        .topicName(topicName)
        .subscriptionName(subscriptionName)
        .prefetchCount(10)
        .maxConcurrentSessions(20)
        .processMessage(this::processMessage)
        .processError(this::processError)
        .maxAutoLockRenewDuration(Duration.ofMinutes(4))
        .buildProcessorClient();
}

void processError(ServiceBusErrorContext context) {
var exception = context.getException().getCause();
if (exception instanceof MessageConversionException) {
var specificCause = ((MessageConversionException) exception).getMostSpecificCause();
log.error("Message Conversion Error when processing topic {}, Error Message: {}",
topicName, specificCause.getMessage());
} else if (exception instanceof AmqpException) {
log.error("AMQP error detected: {}", exception.getMessage());
if (serviceBusProcessorClient != null) {
serviceBusProcessorClient.stop();
serviceBusProcessorClient.start();
log.info("Restarted Service Bus Processor Client {} after AMQP error.", serviceBusProcessorClient.getSubscriptionName());
}
} else {
log.error("Unexpected Error when processing topic {}, Error Message: {}",
topicName, exception.getMessage(), exception);
}
}

@SneakyThrows
void processMessage(ServiceBusReceivedMessageContext context) {
    ServiceBusReceivedMessage message = context.getMessage();
    String body = getMessageBody(message);

    log.debug("Processing topic {}, Contents: {}",
        topicName, body);
    try {
        method.invoke(new GenericMessage<>(body, message.getApplicationProperties()));
    } catch (ApplicationException ex) {
        log.warn("Error when processing topic {}, Error Message: {}",
            topicName, ex.getMessage());
    }
}

@Override
public void start() {
    serviceBusProcessorClient.start();
}

@Override
public void stop() {
    serviceBusProcessorClient.stop();
}

@Override
public boolean isRunning() {
    return serviceBusProcessorClient.isRunning();
}

This application log only error logs and still pod container memory consumption increases within few hours.
However, if we reduce consumer count to 1-2 then it survive may be for 2-3 weeks. All this happens when application is mainly idle.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions