Skip to content

Commit fe261ba

Browse files
Remove checked exceptions and simplify handling of errors
1 parent c036ec1 commit fe261ba

File tree

5 files changed

+47
-85
lines changed

5 files changed

+47
-85
lines changed

temporal-sdk/src/main/java/io/temporal/common/SimplePlugin.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Collections;
4242
import java.util.List;
4343
import java.util.Objects;
44+
import java.util.concurrent.Callable;
4445
import java.util.function.BiConsumer;
4546
import java.util.function.Consumer;
4647
import java.util.function.Supplier;
@@ -304,8 +305,8 @@ public void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker)
304305
}
305306

306307
@Override
307-
public void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next)
308-
throws Exception {
308+
public void startWorker(
309+
@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) {
309310
next.run();
310311
for (BiConsumer<String, Worker> callback : workerStartCallbacks) {
311312
callback.accept(taskQueue, worker);
@@ -328,15 +329,15 @@ public WorkflowServiceStubs connectServiceClient(
328329
}
329330

330331
@Override
331-
public void startWorkerFactory(WorkerFactory factory, Runnable next) throws Exception {
332+
public void startWorkerFactory(WorkerFactory factory, Runnable next) {
332333
next.run();
333334
for (Consumer<WorkerFactory> callback : workerFactoryStartCallbacks) {
334335
callback.accept(factory);
335336
}
336337
}
337338

338339
@Override
339-
public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) throws Exception {
340+
public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) {
340341
for (Consumer<WorkerFactory> callback : workerFactoryShutdownCallbacks) {
341342
callback.accept(factory);
342343
}
@@ -345,9 +346,11 @@ public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) throws E
345346

346347
@Override
347348
public void replayWorkflowExecution(
348-
@Nonnull Worker worker, @Nonnull WorkflowExecutionHistory history, @Nonnull Runnable next)
349+
@Nonnull Worker worker,
350+
@Nonnull WorkflowExecutionHistory history,
351+
@Nonnull Callable<Void> next)
349352
throws Exception {
350-
next.run();
353+
next.call();
351354
for (BiConsumer<Worker, WorkflowExecutionHistory> callback : replayExecutionCallbacks) {
352355
callback.accept(worker, history);
353356
}

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Objects;
2828
import java.util.UUID;
29+
import java.util.concurrent.Callable;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicBoolean;
@@ -478,51 +479,28 @@ public void replayWorkflowExecution(io.temporal.internal.common.WorkflowExecutio
478479
history.getHistory(), history.getWorkflowExecution().getWorkflowId());
479480

480481
// Build plugin chain in reverse order (first plugin wraps all others)
481-
// Wrap checked exception in RuntimeException for Runnable compatibility
482-
Runnable chain =
482+
Callable<Void> chain =
483483
() -> {
484-
try {
485-
workflowWorker.queryWorkflowExecution(
486-
history,
487-
WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
488-
String.class,
489-
String.class,
490-
new Object[] {});
491-
} catch (Exception e) {
492-
throw new ReplayException(e);
493-
}
484+
workflowWorker.queryWorkflowExecution(
485+
history,
486+
WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
487+
String.class,
488+
String.class,
489+
new Object[] {});
490+
return null;
494491
};
495492

496493
for (int i = plugins.size() - 1; i >= 0; i--) {
497494
WorkerPlugin plugin = plugins.get(i);
498-
Runnable next = chain;
495+
Callable<Void> next = chain;
499496
chain =
500497
() -> {
501-
try {
502-
plugin.replayWorkflowExecution(this, publicHistory, next);
503-
} catch (Exception e) {
504-
throw new ReplayException(e);
505-
}
498+
plugin.replayWorkflowExecution(this, publicHistory, next);
499+
return null;
506500
};
507501
}
508502

509-
try {
510-
chain.run();
511-
} catch (ReplayException e) {
512-
throw e.getCause();
513-
}
514-
}
515-
516-
/** Internal exception to wrap checked exceptions during replay. */
517-
private static class ReplayException extends RuntimeException {
518-
ReplayException(Exception cause) {
519-
super(cause);
520-
}
521-
522-
@Override
523-
public synchronized Exception getCause() {
524-
return (Exception) super.getCause();
525-
}
503+
chain.call();
526504
}
527505

528506
/**

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -255,17 +255,7 @@ public synchronized void start() {
255255
for (int i = plugins.size() - 1; i >= 0; i--) {
256256
final Runnable next = startChain;
257257
final WorkerPlugin workerPlugin = plugins.get(i);
258-
startChain =
259-
() -> {
260-
try {
261-
workerPlugin.startWorkerFactory(this, next);
262-
} catch (RuntimeException e) {
263-
throw e;
264-
} catch (Exception e) {
265-
throw new RuntimeException(
266-
"Plugin " + workerPlugin.getName() + " failed during startup", e);
267-
}
268-
};
258+
startChain = () -> workerPlugin.startWorkerFactory(this, next);
269259
}
270260

271261
// Execute the chain
@@ -284,21 +274,7 @@ private void doStart() {
284274
for (int i = plugins.size() - 1; i >= 0; i--) {
285275
final Runnable next = startChain;
286276
final WorkerPlugin workerPlugin = plugins.get(i);
287-
startChain =
288-
() -> {
289-
try {
290-
workerPlugin.startWorker(taskQueue, worker, next);
291-
} catch (RuntimeException e) {
292-
throw e;
293-
} catch (Exception e) {
294-
throw new RuntimeException(
295-
"Plugin "
296-
+ workerPlugin.getName()
297-
+ " failed during worker startup for task queue "
298-
+ taskQueue,
299-
e);
300-
}
301-
};
277+
startChain = () -> workerPlugin.startWorker(taskQueue, worker, next);
302278
}
303279

304280
// Execute the chain for this worker
@@ -386,7 +362,7 @@ private void shutdownInternal(boolean interruptUserTasks) {
386362
() -> {
387363
try {
388364
workerPlugin.shutdownWorkerFactory(this, next);
389-
} catch (Exception e) {
365+
} catch (RuntimeException e) {
390366
log.warn("Plugin {} failed during shutdown", workerPlugin.getName(), e);
391367
// Still try to continue shutdown
392368
next.run();
@@ -423,7 +399,7 @@ private void doShutdown(boolean interruptUserTasks) {
423399
() -> {
424400
try {
425401
workerPlugin.shutdownWorker(taskQueue, worker, next);
426-
} catch (Exception e) {
402+
} catch (RuntimeException e) {
427403
log.warn(
428404
"Plugin {} failed during worker shutdown for task queue {}",
429405
workerPlugin.getName(),

temporal-sdk/src/main/java/io/temporal/worker/WorkerPlugin.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.common.Experimental;
2424
import io.temporal.common.SimplePlugin;
2525
import io.temporal.common.WorkflowExecutionHistory;
26+
import java.util.concurrent.Callable;
2627
import javax.annotation.Nonnull;
2728

2829
/**
@@ -48,7 +49,7 @@
4849
* }
4950
*
5051
* @Override
51-
* public void startWorkerFactory(WorkerFactory factory, Runnable next) throws Exception {
52+
* public void startWorkerFactory(WorkerFactory factory, Runnable next) {
5253
* registry.recordWorkerStart();
5354
* try {
5455
* next.run();
@@ -127,7 +128,7 @@ public interface WorkerPlugin {
127128
*
128129
* <pre>{@code
129130
* @Override
130-
* public void startWorker(String taskQueue, Worker worker, Runnable next) throws Exception {
131+
* public void startWorker(String taskQueue, Worker worker, Runnable next) {
131132
* logger.info("Starting worker for task queue: {}", taskQueue);
132133
* perWorkerResources.put(taskQueue, new ResourcePool());
133134
* next.run();
@@ -137,10 +138,8 @@ public interface WorkerPlugin {
137138
* @param taskQueue the task queue name for the worker
138139
* @param worker the worker being started
139140
* @param next runnable that starts the next in chain (eventually starts the actual worker)
140-
* @throws Exception if startup fails
141141
*/
142-
void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next)
143-
throws Exception;
142+
void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next);
144143

145144
/**
146145
* Allows the plugin to wrap individual worker shutdown. Called during shutdown phase in reverse
@@ -182,7 +181,7 @@ void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Run
182181
*
183182
* <pre>{@code
184183
* @Override
185-
* public void startWorkerFactory(WorkerFactory factory, Runnable next) throws Exception {
184+
* public void startWorkerFactory(WorkerFactory factory, Runnable next) {
186185
* logger.info("Starting workers...");
187186
* next.run();
188187
* logger.info("Workers started");
@@ -191,9 +190,8 @@ void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Run
191190
*
192191
* @param factory the worker factory being started
193192
* @param next runnable that starts the next in chain (eventually starts actual workers)
194-
* @throws Exception if startup fails
195193
*/
196-
void startWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next) throws Exception;
194+
void startWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next);
197195

198196
/**
199197
* Allows the plugin to wrap worker factory shutdown. Called during shutdown phase in reverse
@@ -217,8 +215,7 @@ void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Run
217215
* @param factory the worker factory being shut down
218216
* @param next runnable that shuts down the next in chain (eventually shuts down actual workers)
219217
*/
220-
void shutdownWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next)
221-
throws Exception;
218+
void shutdownWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next);
222219

223220
// ==================== Replay Methods ====================
224221

@@ -234,11 +231,11 @@ void shutdownWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable nex
234231
* <pre>{@code
235232
* @Override
236233
* public void replayWorkflowExecution(
237-
* Worker worker, WorkflowExecutionHistory history, Runnable next) throws Exception {
234+
* Worker worker, WorkflowExecutionHistory history, Callable<Void> next) throws Exception {
238235
* logger.info("Replaying workflow: {}", history.getWorkflowExecution().getWorkflowId());
239236
* long start = System.currentTimeMillis();
240237
* try {
241-
* next.run();
238+
* next.call();
242239
* logger.info("Replay succeeded in {}ms", System.currentTimeMillis() - start);
243240
* } catch (Exception e) {
244241
* logger.error("Replay failed after {}ms", System.currentTimeMillis() - start, e);
@@ -249,10 +246,12 @@ void shutdownWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable nex
249246
*
250247
* @param worker the worker performing the replay
251248
* @param history the workflow execution history being replayed
252-
* @param next runnable that performs the next in chain (eventually performs the actual replay)
249+
* @param next callable that performs the next in chain (eventually performs the actual replay)
253250
* @throws Exception if replay fails
254251
*/
255252
void replayWorkflowExecution(
256-
@Nonnull Worker worker, @Nonnull WorkflowExecutionHistory history, @Nonnull Runnable next)
253+
@Nonnull Worker worker,
254+
@Nonnull WorkflowExecutionHistory history,
255+
@Nonnull Callable<Void> next)
257256
throws Exception;
258257
}

temporal-sdk/src/test/java/io/temporal/common/SimplePluginBuilderTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,13 @@ public void testOnReplayWorkflowExecution() throws Exception {
541541
AtomicBoolean nextCalled = new AtomicBoolean(false);
542542

543543
((WorkerPlugin) plugin)
544-
.replayWorkflowExecution(mockWorker, mockHistory, () -> nextCalled.set(true));
544+
.replayWorkflowExecution(
545+
mockWorker,
546+
mockHistory,
547+
() -> {
548+
nextCalled.set(true);
549+
return null;
550+
});
545551

546552
assertTrue("next should be called", nextCalled.get());
547553
assertTrue("Callback should have been called", callbackCalled.get());
@@ -563,7 +569,7 @@ public void testMultipleOnReplayWorkflowExecutionCallbacks() throws Exception {
563569
Worker mockWorker = mock(Worker.class);
564570
WorkflowExecutionHistory mockHistory = mock(WorkflowExecutionHistory.class);
565571

566-
((WorkerPlugin) plugin).replayWorkflowExecution(mockWorker, mockHistory, () -> {});
572+
((WorkerPlugin) plugin).replayWorkflowExecution(mockWorker, mockHistory, () -> null);
567573

568574
assertEquals("All callbacks should be called", 3, callCount.get());
569575
}

0 commit comments

Comments
 (0)