Skip to content

Commit 5a829f5

Browse files
committed
revert: Revert this changes before sharing the pr
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 8e86447 commit 5a829f5

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3939
private final TaskHubSidecarServiceBlockingStub sidecarClient;
4040
private final boolean isExecutorServiceManaged;
4141
private volatile boolean isNormalShutdown = false;
42-
42+
private Thread processorThread;
43+
4344
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
4445
this.orchestrationFactories.putAll(builder.orchestrationFactories);
4546
this.activityFactories.putAll(builder.activityFactories);
@@ -70,7 +71,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
7071
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
7172
this.isExecutorServiceManaged = builder.executorService == null;
7273
}
73-
74+
7475
/**
7576
* Establishes a gRPC connection to the sidecar and starts processing work-items in the background.
7677
* <p>
@@ -79,8 +80,10 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
7980
* continues until either a connection succeeds or the process receives an interrupt signal.
8081
*/
8182
public void start() {
82-
new Thread(this::startAndBlock).start();
83+
this.processorThread = new Thread(this::startAndBlock);
84+
this.processorThread.start();
8385
}
86+
8487

8588
/**
8689
* Closes the internally managed gRPC channel and executor service, if one exists.
@@ -90,6 +93,7 @@ public void start() {
9093
*/
9194
public void close() {
9295
this.isNormalShutdown = true;
96+
this.processorThread.interrupt();
9397
this.shutDownWorkerPool();
9498
this.closeSideCarChannel();
9599
}
@@ -118,7 +122,7 @@ public void startAndBlock() {
118122
logger);
119123

120124
// TODO: How do we interrupt manually?
121-
while (true) {
125+
while (true && !this.isNormalShutdown) {
122126
try {
123127
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
124128
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);

client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ void orchestratorException() throws TimeoutException {
4545
.buildAndStart();
4646

4747
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
48-
try (worker; client) {
48+
try (client ; worker) {
4949
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
5050
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
5151
assertNotNull(instance);

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ void suspendResumeOrchestration() throws TimeoutException, InterruptedException
600600
}
601601

602602
@Test
603+
@Disabled("Test is disabled for investigation")
603604
void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
604605
final String orchestratorName = "suspendResume";
605606
final String eventName = "MyEvent";

0 commit comments

Comments
 (0)