Skip to content

Commit e584e26

Browse files
oleksii-novikov-onixadamsaghy
authored andcommitted
FINERACT-2191: ThreadLocal context handling during job execution
1 parent 0f1d53c commit e584e26

File tree

20 files changed

+295
-260
lines changed

20 files changed

+295
-260
lines changed

fineract-core/src/main/java/org/apache/fineract/infrastructure/core/domain/FineractContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import lombok.AllArgsConstructor;
2525
import lombok.Builder;
26+
import lombok.EqualsAndHashCode;
2627
import lombok.Getter;
2728
import lombok.extern.jackson.Jacksonized;
2829
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
@@ -31,11 +32,13 @@
3132
@Jacksonized
3233
@Builder
3334
@Getter
35+
@EqualsAndHashCode
3436
public class FineractContext implements Serializable {
3537

3638
private final String contextHolder;
3739
private final FineractPlatformTenant tenantContext;
3840
private final String authTokenContext;
3941
private final HashMap<BusinessDateType, LocalDate> businessDateContext;
4042
private final ActionContext actionContext;
43+
4144
}

fineract-core/src/main/java/org/apache/fineract/infrastructure/core/domain/FineractPlatformTenant.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020

2121
import java.io.Serializable;
2222
import lombok.Builder;
23+
import lombok.EqualsAndHashCode;
24+
import lombok.Getter;
25+
import lombok.RequiredArgsConstructor;
2326
import lombok.extern.jackson.Jacksonized;
2427

2528
@Jacksonized
2629
@Builder
30+
@EqualsAndHashCode
31+
@RequiredArgsConstructor
32+
@Getter
2733
public class FineractPlatformTenant implements Serializable {
2834

2935
private final Long id;
@@ -32,32 +38,4 @@ public class FineractPlatformTenant implements Serializable {
3238
private final String timezoneId;
3339
private final FineractPlatformTenantConnection connection;
3440

35-
public FineractPlatformTenant(final Long id, final String tenantIdentifier, final String name, final String timezoneId,
36-
final FineractPlatformTenantConnection connection) {
37-
this.id = id;
38-
this.tenantIdentifier = tenantIdentifier;
39-
this.name = name;
40-
this.timezoneId = timezoneId;
41-
this.connection = connection;
42-
}
43-
44-
public Long getId() {
45-
return this.id;
46-
}
47-
48-
public String getTenantIdentifier() {
49-
return this.tenantIdentifier;
50-
}
51-
52-
public String getName() {
53-
return this.name;
54-
}
55-
56-
public String getTimezoneId() {
57-
return this.timezoneId;
58-
}
59-
60-
public FineractPlatformTenantConnection getConnection() {
61-
return connection;
62-
}
6341
}

fineract-core/src/main/java/org/apache/fineract/infrastructure/core/service/ThreadLocalContextUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import org.springframework.util.Assert;
2828

2929
/**
30-
*
30+
* A utility class for managing ThreadLocal context in the application. Provides methods for context initialization and
31+
* cleanup.
3132
*/
3233
public final class ThreadLocalContextUtil {
3334

@@ -124,4 +125,5 @@ public static void reset() {
124125
businessDateContext.remove();
125126
actionContext.remove();
126127
}
128+
127129
}

fineract-core/src/main/java/org/apache/fineract/infrastructure/event/business/service/BusinessEventNotifierServiceImpl.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.fineract.infrastructure.event.business.domain.NoExternalEvent;
3737
import org.apache.fineract.infrastructure.event.external.service.ExternalEventService;
3838
import org.springframework.beans.factory.InitializingBean;
39+
import org.springframework.lang.NonNull;
3940
import org.springframework.stereotype.Service;
4041
import org.springframework.transaction.TransactionExecution;
4142
import org.springframework.transaction.TransactionExecutionListener;
@@ -198,19 +199,29 @@ public void afterBegin(TransactionExecution transaction, Throwable beginFailure)
198199
}
199200

200201
@Override
201-
public void beforeCommit(TransactionExecution transaction) {
202-
List<BusinessEventWithContext> businessEventWithContexts = transactionBusinessEvents.get().peek();
203-
if (!businessEventWithContexts.isEmpty()) {
204-
FineractContext originalContext = ThreadLocalContextUtil.getContext();
202+
public void beforeCommit(@NonNull final TransactionExecution transaction) {
203+
final List<BusinessEventWithContext> businessEventWithContexts = transactionBusinessEvents.get().peek();
204+
if (businessEventWithContexts.isEmpty()) {
205+
return;
206+
}
207+
final FineractContext originalContext = ThreadLocalContextUtil.getContext();
208+
businessEventWithContexts.forEach(businessEventWithContext -> {
209+
final FineractContext currentContext = businessEventWithContext.getFineractContext();
210+
boolean swappedContext = false;
205211
try {
206-
for (BusinessEventWithContext businessEventWithContext : businessEventWithContexts) {
207-
ThreadLocalContextUtil.init(businessEventWithContext.getFineractContext());
208-
externalEventService.postEvent(businessEventWithContext.getEvent());
212+
if (!originalContext.equals(currentContext)) {
213+
swappedContext = true;
214+
ThreadLocalContextUtil.init(currentContext);
209215
}
216+
externalEventService.postEvent(businessEventWithContext.getEvent());
210217
} finally {
211-
ThreadLocalContextUtil.init(originalContext);
218+
// Back to original context if we swapped it. We should restore the original context rather than reset
219+
// it completely
220+
if (swappedContext) {
221+
ThreadLocalContextUtil.init(originalContext);
222+
}
212223
}
213-
}
224+
});
214225
}
215226

216227
@Override

fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,28 +109,31 @@ private void sendEventsToProducer(Map<Long, List<byte[]>> partitions) {
109109
eventProducer.sendEvents(partitions);
110110
}
111111

112-
private void markEventsAsSent(List<Long> eventIds) {
112+
private void markEventsAsSent(final List<Long> eventIds) {
113113
OffsetDateTime sentAt = DateUtils.getAuditOffsetDateTime();
114114

115115
// Partitioning dataset to avoid exception: PreparedStatement can have at most 65,535 parameters
116116
final int partitionSize = fineractProperties.getEvents().getExternal().getPartitionSize();
117117
List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
118118
List<Future<?>> tasks = new ArrayList<>();
119119
final FineractContext context = ThreadLocalContextUtil.getContext();
120-
partitions //
121-
.forEach(partitionedEventIds -> {
122-
tasks.add(threadPoolTaskExecutor.submit(() -> {
123-
ThreadLocalContextUtil.init(context);
124-
transactionTemplate.execute((status) -> {
125-
measure(() -> {
126-
repository.markEventsSent(partitionedEventIds, sentAt);
127-
}, timeTaken -> {
128-
log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
129-
});
130-
return null;
120+
partitions.forEach(partitionedEventIds -> {
121+
tasks.add(threadPoolTaskExecutor.submit(() -> {
122+
try {
123+
ThreadLocalContextUtil.init(context);
124+
transactionTemplate.execute((status) -> {
125+
measure(() -> {
126+
repository.markEventsSent(partitionedEventIds, sentAt);
127+
}, timeTaken -> {
128+
log.debug("Took {}ms to update {} events", timeTaken.toMillis(), partitionedEventIds.size());
131129
});
132-
}));
133-
});
130+
return null;
131+
});
132+
} finally {
133+
ThreadLocalContextUtil.reset();
134+
}
135+
}));
136+
});
134137
for (Future<?> task : tasks) {
135138
try {
136139
task.get();

fineract-loan/src/main/java/org/apache/fineract/portfolio/loanaccount/service/RecalculateInterestPoster.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ public class RecalculateInterestPoster implements Callable<Void> {
4141

4242
@Override
4343
public Void call() throws JobExecutionException {
44-
ThreadLocalContextUtil.init(fineractContext);
45-
if (!loanIds.isEmpty()) {
46-
List<Throwable> errors = new ArrayList<>();
44+
if (loanIds.isEmpty()) {
45+
return null;
46+
}
47+
try {
48+
ThreadLocalContextUtil.init(fineractContext);
49+
final List<Throwable> errors = new ArrayList<>();
4750
for (Long loanId : loanIds) {
4851
log.debug("Loan ID {}", loanId);
4952
try {
@@ -55,6 +58,8 @@ public Void call() throws JobExecutionException {
5558
if (!errors.isEmpty()) {
5659
throw new JobExecutionException(errors);
5760
}
61+
} finally {
62+
ThreadLocalContextUtil.reset();
5863
}
5964
return null;
6065
}

fineract-provider/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
3939
import org.springframework.security.core.context.SecurityContextHolder;
4040

41+
/**
42+
* Tasklet to initialize the thread local context for job execution
43+
*/
4144
@Slf4j
4245
@RequiredArgsConstructor
4346
public class InitialisationTasklet implements Tasklet {
@@ -51,14 +54,17 @@ public RepeatStatus execute(@NotNull StepContribution contribution, @NotNull Chu
5154
UsernamePasswordAuthenticationToken auth = new UsernamePasswordAuthenticationToken(user, user.getPassword(), user.getAuthorities());
5255
SecurityContextHolder.getContext().setAuthentication(auth);
5356
ThreadLocalContextUtil.setActionContext(ActionContext.COB);
57+
5458
String businessDateString = Objects.requireNonNull((String) chunkContext.getStepContext().getStepExecution().getJobExecution()
5559
.getExecutionContext().get(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME));
5660
LocalDate businessDate = LocalDate.parse(businessDateString, DateTimeFormatter.ISO_DATE);
61+
5762
businessDates.put(BusinessDateType.COB_DATE, businessDate);
5863
businessDates.put(BusinessDateType.BUSINESS_DATE, businessDate.plusDays(1));
5964
ThreadLocalContextUtil.setBusinessDates(businessDates);
60-
log.debug("Initialisation with Business Date [{}], COB Date [{}] and Action Context [{}]", businessDate.plusDays(1), businessDate,
61-
ThreadLocalContextUtil.getActionContext());
65+
66+
log.debug("Initialized context with Business Date [{}], COB Date [{}] and Action Context [{}]", businessDate.plusDays(1),
67+
businessDate, ThreadLocalContextUtil.getActionContext());
6268
return RepeatStatus.FINISHED;
6369
}
6470
}

fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,30 @@
1818
*/
1919
package org.apache.fineract.cob.loan;
2020

21+
import lombok.extern.slf4j.Slf4j;
2122
import org.apache.fineract.infrastructure.core.domain.FineractContext;
2223
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
23-
import org.jetbrains.annotations.NotNull;
2424
import org.springframework.core.task.TaskDecorator;
25+
import org.springframework.lang.NonNull;
2526

27+
/**
28+
* Task decorator to ensure proper thread context propagation and cleanup
29+
*/
30+
@Slf4j
2631
public class ContextAwareTaskDecorator implements TaskDecorator {
2732

33+
@NonNull
2834
@Override
29-
public Runnable decorate(@NotNull Runnable runnable) {
35+
public Runnable decorate(@NonNull final Runnable runnable) {
3036
final FineractContext context = ThreadLocalContextUtil.getContext();
3137
return () -> {
32-
ThreadLocalContextUtil.init(context);
33-
runnable.run();
38+
try {
39+
log.debug("Initializing thread context for decorated task");
40+
ThreadLocalContextUtil.init(context);
41+
runnable.run();
42+
} finally {
43+
ThreadLocalContextUtil.reset();
44+
}
3445
};
3546
}
3647

fineract-provider/src/main/java/org/apache/fineract/cob/service/AsyncLoanCOBExecutorServiceImpl.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.fineract.cob.service;
2020

21-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2221
import java.time.LocalDate;
2322
import java.time.format.DateTimeFormatter;
2423
import java.util.Collections;
@@ -37,11 +36,11 @@
3736
import org.apache.fineract.infrastructure.core.service.DateUtils;
3837
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
3938
import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
40-
import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
4139
import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
4240
import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetailRepository;
4341
import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException;
4442
import org.apache.fineract.infrastructure.jobs.service.JobStarter;
43+
import org.apache.fineract.infrastructure.jobs.service.SchedulerServiceConstants;
4544
import org.quartz.JobExecutionException;
4645
import org.springframework.batch.core.Job;
4746
import org.springframework.batch.core.JobParametersInvalidException;
@@ -63,12 +62,10 @@ public class AsyncLoanCOBExecutorServiceImpl implements AsyncLoanCOBExecutorServ
6362
private final JobLocator jobLocator;
6463
private final ScheduledJobDetailRepository scheduledJobDetailRepository;
6564
private final JobStarter jobStarter;
66-
private final JobParameterRepository jobParameterRepository;
6765
private final RetrieveLoanIdService retrieveLoanIdService;
6866

6967
@Override
7068
@Async(TaskExecutorConstant.LOAN_COB_CATCH_UP_TASK_EXECUTOR_BEAN_NAME)
71-
@SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT")
7269
public void executeLoanCOBCatchUpAsync(FineractContext context) {
7370
try {
7471
ThreadLocalContextUtil.init(context);
@@ -80,35 +77,36 @@ public void executeLoanCOBCatchUpAsync(FineractContext context) {
8077
? loanIdAndLastClosedBusinessDate.get(0).getLastClosedBusinessDate()
8178
: cobBusinessDate;
8279
if (DateUtils.isBefore(oldestCOBProcessedDate, cobBusinessDate)) {
83-
executeLoanCOBDayByDayUntilCOBBusinessDate(oldestCOBProcessedDate, cobBusinessDate, context);
80+
executeLoanCOBDayByDayUntilCOBBusinessDate(oldestCOBProcessedDate, cobBusinessDate);
8481
}
8582
} catch (NoSuchJobException e) {
8683
// Throwing an error here is useless as it will be swallowed hence it is async method
87-
log.error("", new JobNotFoundException(LoanCOBConstant.JOB_NAME, e));
84+
log.error("Job not found: {}", LoanCOBConstant.JOB_NAME, new JobNotFoundException(LoanCOBConstant.JOB_NAME, e));
8885
} catch (JobInstanceAlreadyCompleteException | JobRestartException | JobParametersInvalidException
8986
| JobExecutionAlreadyRunningException | JobExecutionException e) {
9087
// Throwing an error here is useless as it will be swallowed hence it is async method
91-
log.error("", e);
88+
log.error("Error executing job", e);
9289
} finally {
9390
ThreadLocalContextUtil.reset();
9491
}
9592
}
9693

97-
private void executeLoanCOBDayByDayUntilCOBBusinessDate(LocalDate oldestCOBProcessedDate, LocalDate cobBusinessDate,
98-
FineractContext context) throws NoSuchJobException, JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
94+
private void executeLoanCOBDayByDayUntilCOBBusinessDate(LocalDate oldestCOBProcessedDate, LocalDate cobBusinessDate)
95+
throws NoSuchJobException, JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
9996
JobParametersInvalidException, JobRestartException, JobExecutionException {
10097
Job job = jobLocator.getJob(LoanCOBConstant.JOB_NAME);
10198
ScheduledJobDetail scheduledJobDetail = scheduledJobDetailRepository.findByJobName(LoanCOBConstant.JOB_HUMAN_READABLE_NAME);
10299
LocalDate executingBusinessDate = oldestCOBProcessedDate.plusDays(1);
100+
String tenantIdentifier = ThreadLocalContextUtil.getTenant().getTenantIdentifier();
101+
103102
while (!DateUtils.isAfter(executingBusinessDate, cobBusinessDate)) {
104-
// Need to reinitialize the thread-local tenant info because after running the job, it resets the thread
105-
ThreadLocalContextUtil.init(context);
106103
JobParameterDTO jobParameterDTO = new JobParameterDTO(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME,
107104
executingBusinessDate.format(DateTimeFormatter.ISO_DATE));
108105
JobParameterDTO jobParameterCatchUpDTO = new JobParameterDTO(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
106+
JobParameterDTO tenantParameterDTO = new JobParameterDTO(SchedulerServiceConstants.TENANT_IDENTIFIER, tenantIdentifier);
109107
Set<JobParameterDTO> jobParameters = new HashSet<>();
110-
Collections.addAll(jobParameters, jobParameterDTO, jobParameterCatchUpDTO);
111-
jobStarter.run(job, scheduledJobDetail, jobParameters, ThreadLocalContextUtil.getTenant().getTenantIdentifier());
108+
Collections.addAll(jobParameters, jobParameterDTO, jobParameterCatchUpDTO, tenantParameterDTO);
109+
jobStarter.run(job, scheduledJobDetail, jobParameters, tenantIdentifier);
112110
executingBusinessDate = executingBusinessDate.plusDays(1);
113111
}
114112
}

0 commit comments

Comments
 (0)