Skip to content

ASB sdk consumers suspect of memory leak #44228

@amritsis

Description

@amritsis

We have seen memory increase in heap and Pod when there is no load on application. Our pod start with less memory usage but in few days max 1-2 days it just reach to maximum pod memory.
Our application is using ASB SDK version 5.17.1 and spring boot parent 3.3.5

com.azure.spring
spring-cloud-azure-starter-servicebus-jms

When we took heap dump we saw -
15 instances of ‘reactor.core.publisher.SinkOneMulticast’, loaded by ‘org.springframework.boot.loader.launch.LaunchedClassLoader @ 0x1003ee128218’ occupy 198,475,448 (66.35%) bytes.

Biggest instances:
•reactor.core.publisher.SinkOneMulticast @ 0x10000cb3ac10 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10000cb53758 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10000cb6c578 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000171afae8 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10002c131290 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326ab800 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326c4348 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326dd3a0 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x1000326f5ee8 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x100032740460 - 13,471,296 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10003270edd0 - 13,471,144 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x100016034bc8 - 13,470,096 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10002c0ff9a8 - 13,470,096 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10002c118748 - 13,466,600 (4.50%) bytes.
•reactor.core.publisher.SinkOneMulticast @ 0x10001f01d800 - 9,884,552 (3.30%) bytes.

This particular application has 15 consumers to consume from ASB topic and post back on ASB topic.
Our standard ASB consumer code is -

@PostConstruct
protected void init() {

this.serviceBusProcessorClient = new ServiceBusClientBuilder()
    .connectionString(connectionString)
    .sessionProcessor()
    .topicName(topicName)
    .subscriptionName(subscriptionName)
    .prefetchCount(10)
    .maxConcurrentSessions(20)
    .processMessage(this::processMessage)
    .processError(this::processError)
    .maxAutoLockRenewDuration(Duration.ofMinutes(4))
    .buildProcessorClient();

}
void processError(ServiceBusErrorContext context) {
var exception = context.getException().getCause();
if (exception instanceof MessageConversionException) {
var specificCause = ((MessageConversionException) exception).getMostSpecificCause();
log.error("Message Conversion Error when processing topic {}, Error Message: {}",
topicName, specificCause.getMessage());
} else if (exception instanceof AmqpException) {
log.error("AMQP error detected: {}", exception.getMessage());
if (serviceBusProcessorClient != null) {
serviceBusProcessorClient.stop();
serviceBusProcessorClient.start();
log.info("Restarted Service Bus Processor Client {} after AMQP error.", serviceBusProcessorClient.getSubscriptionName());
}
} else {
log.error("Unexpected Error when processing topic {}, Error Message: {}",
topicName, exception.getMessage(), exception);
}
}

@SneakyThrows
void processMessage(ServiceBusReceivedMessageContext context) {
ServiceBusReceivedMessage message = context.getMessage();
String body = getMessageBody(message);

log.debug("Processing topic {}, Contents: {}",
    topicName, body);
try {
    method.invoke(new GenericMessage<>(body, message.getApplicationProperties()));
} catch (ApplicationException ex) {
    log.warn("Error when processing topic {}, Error Message: {}",
        topicName, ex.getMessage());
}

}

@OverRide
public void start() {
serviceBusProcessorClient.start();
}

@OverRide
public void stop() {
serviceBusProcessorClient.stop();
}

@OverRide
public boolean isRunning() {
return serviceBusProcessorClient.isRunning();
}
This application log only error logs and still pod container memory consumption increases within few hours.
However, if we reduce consumer count to 1-2 then it survive may be for 2-3 weeks. All this happens when application is mainly idle or minimal load. However we see some AMQP exception for session renewal but we are not sure if that can cause leak or not.

Metadata

Metadata

Assignees

Labels

ClientThis issue points to a problem in the data-plane of the library.Service Buscustomer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK teamquestionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions