Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ public static class FineractTaskExecutor {
private int tenantUpgradeTaskExecutorCorePoolSize;
private int tenantUpgradeTaskExecutorMaxPoolSize;
private int tenantUpgradeTaskExecutorQueueCapacity;
private int eventTaskExecutorCorePoolSize;
private int eventTaskExecutorMaxPoolSize;
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,71 @@

package org.apache.fineract.infrastructure.core.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.MethodInvokingFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.context.SecurityContextHolderStrategy;
import org.springframework.security.task.DelegatingSecurityContextAsyncTaskExecutor;

@Configuration
public class SpringConfig {

private static final int AWAIT_TERMINATION_SECONDS = 60;

private final FineractProperties fineractProperties;

public SpringConfig(FineractProperties fineractProperties) {
this.fineractProperties = fineractProperties;
}

private int getEventExecutorCorePoolSize() {
int configured = fineractProperties.getTaskExecutor().getEventTaskExecutorCorePoolSize();
if (configured > 0) {
return configured;
}
return Runtime.getRuntime().availableProcessors() * 2;
}

private int getEventExecutorMaxPoolSize() {
int configured = fineractProperties.getTaskExecutor().getEventTaskExecutorMaxPoolSize();
if (configured > 0) {
return configured;
}
return Runtime.getRuntime().availableProcessors() * 5;
}

@Bean(name = "fineractEventExecutor")
public ThreadPoolTaskExecutor fineractEventExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(getEventExecutorCorePoolSize());
executor.setMaxPoolSize(getEventExecutorMaxPoolSize());
executor.setThreadNamePrefix("FineractEvent-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);
return executor;
}

@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster saem = new SimpleApplicationEventMulticaster();
saem.setTaskExecutor(new SimpleAsyncTaskExecutor());
return saem;
@DependsOn("overrideSecurityContextHolderStrategy")
public SimpleApplicationEventMulticaster applicationEventMulticaster(
@Qualifier("fineractEventExecutor") ThreadPoolTaskExecutor taskExecutor) {
SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
multicaster.setTaskExecutor(new DelegatingSecurityContextAsyncTaskExecutor(taskExecutor));
return multicaster;
}

// The application events (for importing) rely on the inheritable thread local security context strategy
// This is NOT compatible with threadpools so if we use threadpools the below will need to be reworked
@Bean
public MethodInvokingFactoryBean overrideSecurityContextHolderStrategy() {
MethodInvokingFactoryBean mifb = new MethodInvokingFactoryBean();
mifb.setTargetClass(SecurityContextHolder.class);
mifb.setTargetMethod("setStrategyName");
mifb.setArguments("MODE_INHERITABLETHREADLOCAL");
return mifb;
MethodInvokingFactoryBean factoryBean = new MethodInvokingFactoryBean();
factoryBean.setTargetClass(SecurityContextHolder.class);
factoryBean.setTargetMethod("setStrategyName");
factoryBean.setArguments(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
return factoryBean;
}

@Bean
Expand Down
2 changes: 2 additions & 0 deletions fineract-provider/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ fineract.task-executor.tenant-upgrade-task-executor-core-pool-size=1
# This is intentionally restricted to a single thread due to an outstanding Liquibase thread-safety issue https://github.com/liquibase/liquibase/pull/7227
fineract.task-executor.tenant-upgrade-task-executor-max-pool-size=1
fineract.task-executor.tenant-upgrade-task-executor-queue-capacity=${FINERACT_TENANT_UPGRADE_TASK_EXECUTOR_QUEUE_CAPACITY:100}
fineract.task-executor.event-task-executor-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:0}
fineract.task-executor.event-task-executor-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:0}

fineract.idempotency-key-header-name=${FINERACT_IDEMPOTENCY_KEY_HEADER_NAME:Idempotency-Key}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.core.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@DisplayName("SpringConfig Thread Pool Tests")
class SpringConfigTest {

@Test
@DisplayName("SimpleAsyncTaskExecutor creates unbounded threads")
void simpleAsyncTaskExecutorCreatesUnboundedThreads() throws Exception {
String prefix = "SimpleAsync-";
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(prefix);

CountDownLatch startGate = new CountDownLatch(1);
CountDownLatch readyLatch = new CountDownLatch(100);
CountDownLatch doneLatch = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
executor.execute(() -> {
readyLatch.countDown();
try {
startGate.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
});
}

boolean allStarted = readyLatch.await(5, TimeUnit.SECONDS);
assertThat(allStarted).as("All 100 threads should start").isTrue();

long asyncThreadCount = Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().startsWith(prefix)).count();

startGate.countDown();
boolean allFinished = doneLatch.await(5, TimeUnit.SECONDS);

assertThat(asyncThreadCount).as("Unbounded executor creates ~100 threads for 100 tasks").isGreaterThan(90);
assertThat(allFinished).isTrue();
}

@Test
@DisplayName("ThreadPoolTaskExecutor bounds thread creation at maxPoolSize")
void threadPoolTaskExecutorBoundsThreadCreation() throws Exception {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("Bounded-");
executor.initialize();

try {
CountDownLatch latch = new CountDownLatch(500);
Set<String> threadNames = ConcurrentHashMap.newKeySet();

for (int i = 0; i < 500; i++) {
executor.execute(() -> {
threadNames.add(Thread.currentThread().getName());
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
latch.countDown();
});
}
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

assertThat(threadNames.size()).as("Thread count capped at maxPoolSize").isLessThanOrEqualTo(10);
assertThat(threadNames.size()).as("Parallelism proof: multiple threads used").isGreaterThan(1);
} finally {
executor.shutdown();
}
}

@Test
@DisplayName("FineractEventExecutor respects configured pool sizes from properties")
void fineractEventExecutorRespectsConfiguration() {
FineractProperties properties = mock(FineractProperties.class);
FineractProperties.FineractTaskExecutor taskExecutor = mock(FineractProperties.FineractTaskExecutor.class);
when(properties.getTaskExecutor()).thenReturn(taskExecutor);
when(taskExecutor.getEventTaskExecutorCorePoolSize()).thenReturn(16);
when(taskExecutor.getEventTaskExecutorMaxPoolSize()).thenReturn(40);

SpringConfig config = new SpringConfig(properties);
ThreadPoolTaskExecutor executor = config.fineractEventExecutor();

assertThat(executor.getCorePoolSize()).isEqualTo(16);
assertThat(executor.getMaxPoolSize()).isEqualTo(40);
}

@Test
@DisplayName("CallerRunsPolicy provides backpressure when pool is saturated")
void threadPoolWithCallerRunsPolicyProvidesBackpressure() throws Exception {
String mainThread = Thread.currentThread().getName();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

try {
AtomicInteger callerExecutions = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(50);

for (int i = 0; i < 50; i++) {
executor.execute(() -> {
if (Thread.currentThread().getName().equals(mainThread)) {
callerExecutions.incrementAndGet();
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
latch.countDown();
});
}
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

assertThat(callerExecutions.get()).as("Caller thread executed rejected tasks").isGreaterThan(0);
} finally {
executor.shutdown();
}
}

@ParameterizedTest(name = "Mode: read={0}, write={1}, batchWorker={2}, batchManager={3}")
@DisplayName("Pool sizing consistent across Fineract deployment modes")
@CsvSource({ "true, true, true, true", "true, false, false, false", "false, true, false, false", "false, false, true, true" })
void verifyPoolSizingAcrossFineractModes(boolean readEnabled, boolean writeEnabled, boolean batchWorker, boolean batchManager) {
FineractProperties properties = createPropertiesWithModes(readEnabled, writeEnabled, batchWorker, batchManager, 8, 20);

SpringConfig config = new SpringConfig(properties);
ThreadPoolTaskExecutor executor = config.fineractEventExecutor();

assertThat(executor.getCorePoolSize()).isEqualTo(8);
assertThat(executor.getMaxPoolSize()).isEqualTo(20);
}

private FineractProperties createPropertiesWithModes(boolean read, boolean write, boolean batchWorker, boolean batchManager,
int corePoolSize, int maxPoolSize) {
FineractProperties properties = mock(FineractProperties.class);
FineractProperties.FineractTaskExecutor taskExecutor = mock(FineractProperties.FineractTaskExecutor.class);
FineractProperties.FineractModeProperties modes = mock(FineractProperties.FineractModeProperties.class);

when(properties.getTaskExecutor()).thenReturn(taskExecutor);
when(properties.getMode()).thenReturn(modes);

when(taskExecutor.getEventTaskExecutorCorePoolSize()).thenReturn(corePoolSize);
when(taskExecutor.getEventTaskExecutorMaxPoolSize()).thenReturn(maxPoolSize);

when(modes.isReadEnabled()).thenReturn(read);
when(modes.isWriteEnabled()).thenReturn(write);
when(modes.isBatchWorkerEnabled()).thenReturn(batchWorker);
when(modes.isBatchManagerEnabled()).thenReturn(batchManager);

return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ fineract.task-executor.tenant-upgrade-task-executor-core-pool-size=1
# This is intentionally restricted to a single thread due to an outstanding Liquibase thread-safety issue: https://github.com/liquibase/liquibase/pull/7227
fineract.task-executor.tenant-upgrade-task-executor-max-pool-size=1
fineract.task-executor.tenant-upgrade-task-executor-queue-capacity=${FINERACT_TENANT_UPGRADE_TASK_EXECUTOR_QUEUE_CAPACITY:100}
fineract.task-executor.event-task-executor-core-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE:0}
fineract.task-executor.event-task-executor-max-pool-size=${FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE:0}

fineract.loan.transactionprocessor.creocore.enabled=true
fineract.loan.transactionprocessor.early-repayment.enabled=true
Expand Down
Loading