Skip to content

Commit 2cf3398

Browse files
GH-37 Refactor sproc executor call
1 parent 1d118ee commit 2cf3398

File tree

1 file changed

+18
-16
lines changed

1 file changed

+18
-16
lines changed

src/main/java/de/zalando/sprocwrapper/proxy/StoredProcedure.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class StoredProcedure {
9191
private static final Executor SINGLE_ROW_SIMPLE_TYPE_EXECUTOR = new SingleRowSimpleTypeExecutor();
9292
private static final Executor SINGLE_ROW_TYPE_MAPPER_EXECUTOR = new SingleRowTypeMapperExecutor();
9393

94+
private static final ExecutorService PARALLEL_THREAD_POOL = Executors.newCachedThreadPool();
95+
9496
private final long timeout;
9597
private final AdvisoryLock adivsoryLock;
9698

@@ -376,18 +378,6 @@ private Map<Integer, Object[]> partitionArguments(final DataSourceProvider dataS
376378
return argumentsByShardId;
377379
}
378380

379-
private Callable<Object> with(final DataSource shardDs, final Object[] params, final InvocationContext invocation) {
380-
final StoredProcedure sproc = this;
381-
return new Callable<Object>() {
382-
@Override
383-
public Object call() throws Exception {
384-
return sproc.executor.executeSProc(shardDs, sproc.query, params, sproc.types, invocation, sproc.returnType);
385-
}
386-
};
387-
}
388-
389-
private static ExecutorService parallelThreadPool = Executors.newCachedThreadPool();
390-
391381
public Object execute(final DataSourceProvider dp, final InvocationContext invocation) {
392382

393383
List<Integer> shardIds = null;
@@ -443,7 +433,7 @@ public Object execute(final DataSourceProvider dp, final InvocationContext invoc
443433
}
444434

445435
// most common case: only one shard and no argument partitioning
446-
return executor.executeSProc(firstDs, query, paramValues.get(0), types, invocation, returnType);
436+
return execute(firstDs, paramValues.get(0), invocation);
447437
} else {
448438
Map<Integer, SameConnectionDatasource> transactionalDatasources = null;
449439
try {
@@ -528,8 +518,7 @@ private Object executeSequential(final DataSourceProvider dp, final InvocationCo
528518

529519
sprocResult = null;
530520
try {
531-
sprocResult = executor.executeSProc(shardDs, query, paramValues.get(i), types, invocation,
532-
returnType);
521+
sprocResult = execute(shardDs, paramValues.get(i), invocation);
533522
} catch (final Exception e) {
534523

535524
// remember all exceptions and go on
@@ -569,7 +558,7 @@ private Object executeInParallel(final DataSourceProvider dp, final InvocationCo
569558

570559
final FutureTask<Object> task = new FutureTask<>(with(shardDs, paramValues.get(i), invocation));
571560
tasks.put(shardId, task);
572-
parallelThreadPool.execute(task);
561+
PARALLEL_THREAD_POOL.execute(task);
573562
i++;
574563
}
575564

@@ -605,6 +594,19 @@ private Object executeInParallel(final DataSourceProvider dp, final InvocationCo
605594
return sprocResult;
606595
}
607596

597+
private Callable<Object> with(final DataSource shardDs, final Object[] params, final InvocationContext invocation) {
598+
return new Callable<Object>() {
599+
@Override
600+
public Object call() throws Exception {
601+
return StoredProcedure.this.execute(shardDs, params, invocation);
602+
}
603+
};
604+
}
605+
606+
private Object execute(final DataSource shardDs, final Object[] params, final InvocationContext invocation) {
607+
return executor.executeSProc(shardDs, query, params, types, invocation, returnType);
608+
}
609+
608610
@SuppressWarnings({ "rawtypes", "unchecked" })
609611
private boolean addResultsBreakWhenSharded(final Collection results, final Object sprocResult) {
610612
boolean breakSearch = false;

0 commit comments

Comments
 (0)