Skip to content

Commit db73d59

Browse files
GH-37 Use immutable Callable in executeInParallel to avoid race condition
1 parent 4e9e5db commit db73d59

File tree

1 file changed

+32
-11
lines changed

1 file changed

+32
-11
lines changed

src/main/java/de/zalando/sprocwrapper/proxy/StoredProcedure.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.FutureTask;
2020

2121
import javax.sql.DataSource;
22+
import javax.annotation.concurrent.Immutable;
2223

2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
@@ -383,24 +384,37 @@ private Map<Integer, Object[]> partitionArguments(final DataSourceProvider dataS
383384
return argumentsByShardId;
384385
}
385386

386-
private static class Call implements Callable<Object> {
387-
private final StoredProcedure sproc;
387+
@Immutable
388+
private static final class Call implements Callable<Object> {
389+
390+
private final Executor executor;
388391
private final DataSource shardDs;
392+
private final String query;
389393
private final Object[] params;
394+
private final int[] types;
390395
private final InvocationContext invocation;
396+
private final Class<?> returnType;
397+
398+
Call(final Executor executor,
399+
final DataSource shardDs,
400+
final String query,
401+
final Object[] params,
402+
final int[] types,
403+
final InvocationContext invocation,
404+
final Class<?> returnType) {
391405

392-
public Call(final StoredProcedure sproc, final DataSource shardDs, final Object[] params,
393-
final InvocationContext invocation) {
394-
this.sproc = sproc;
406+
this.executor = executor;
395407
this.shardDs = shardDs;
408+
this.query = query;
396409
this.params = params;
410+
this.types = types;
397411
this.invocation = invocation;
412+
this.returnType = returnType;
398413
}
399414

400415
@Override
401416
public Object call() throws Exception {
402-
return sproc.executor.executeSProc(shardDs, sproc.getQuery(), params, sproc.getTypes(), invocation,
403-
sproc.returnType);
417+
return executor.executeSProc(shardDs, query, params, types, invocation, returnType);
404418
}
405419

406420
}
@@ -576,18 +590,25 @@ private Object executeInParallel(final DataSourceProvider dp, final InvocationCo
576590
final List<Integer> shardIds, final List<Object[]> paramValues,
577591
final Map<Integer, SameConnectionDatasource> transactionalDatasources, final List<?> results,
578592
Object sprocResult) {
579-
DataSource shardDs;
593+
580594
final Map<Integer, FutureTask<Object>> tasks = Maps.newHashMapWithExpectedSize(shardIds.size());
581-
FutureTask<Object> task;
582595
int i = 0;
583596

584597
for (final int shardId : shardIds) {
585-
shardDs = getShardDs(dp, transactionalDatasources, shardId);
598+
final DataSource shardDs = getShardDs(dp, transactionalDatasources, shardId);
586599
if (LOG.isDebugEnabled()) {
587600
LOG.debug(getDebugLog(paramValues.get(i)));
588601
}
589602

590-
task = new FutureTask<Object>(new Call(this, shardDs, paramValues.get(i), invocation));
603+
final FutureTask<Object> task = new FutureTask<>(new Call(
604+
executor,
605+
shardDs,
606+
getQuery(),
607+
paramValues.get(i),
608+
getTypes(),
609+
invocation,
610+
returnType
611+
));
591612
tasks.put(shardId, task);
592613
parallelThreadPool.execute(task);
593614
i++;

0 commit comments

Comments
 (0)