Skip to content

Commit 4ae92d3

Browse files
authored
Ensure namespace exists on worker startup (#2609)
1 parent 94007cc commit 4ae92d3

File tree

3 files changed

+90
-16
lines changed

3 files changed

+90
-16
lines changed

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,26 @@
44
import com.google.common.base.Preconditions;
55
import com.google.common.base.Strings;
66
import com.uber.m3.tally.Scope;
7+
import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest;
8+
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
79
import io.temporal.client.WorkflowClient;
810
import io.temporal.client.WorkflowClientOptions;
911
import io.temporal.common.converter.DataConverter;
1012
import io.temporal.internal.client.WorkflowClientInternal;
1113
import io.temporal.internal.sync.WorkflowThreadExecutor;
1214
import io.temporal.internal.task.VirtualThreadDelegate;
13-
import io.temporal.internal.worker.*;
15+
import io.temporal.internal.worker.ShutdownManager;
1416
import io.temporal.internal.worker.WorkflowExecutorCache;
17+
import io.temporal.internal.worker.WorkflowRunLockManager;
1518
import io.temporal.serviceclient.MetricsTag;
1619
import java.util.HashMap;
1720
import java.util.Map;
1821
import java.util.Objects;
19-
import java.util.concurrent.*;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.SynchronousQueue;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.TimeUnit;
2027
import java.util.concurrent.atomic.AtomicInteger;
2128
import javax.annotation.Nonnull;
2229
import javax.annotation.Nullable;
@@ -196,9 +203,14 @@ public synchronized void start() {
196203

197204
// Workers check and require that Temporal Server is available during start to fail-fast in case
198205
// of configuration issues.
199-
// TODO(https://github.com/temporalio/sdk-java/issues/2060) consider using describeNamespace as
200-
// a connection check.
201-
workflowClient.getWorkflowServiceStubs().getServerCapabilities();
206+
DescribeNamespaceResponse response =
207+
workflowClient
208+
.getWorkflowServiceStubs()
209+
.blockingStub()
210+
.describeNamespace(
211+
DescribeNamespaceRequest.newBuilder()
212+
.setNamespace(workflowClient.getOptions().getNamespace())
213+
.build());
202214

203215
for (Worker worker : workers.values()) {
204216
worker.start();

temporal-sdk/src/test/java/io/temporal/workerFactory/WorkerFactoryTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package io.temporal.workerFactory;
22

3+
import static org.junit.Assert.assertEquals;
34
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertThrows;
46
import static org.junit.Assert.assertTrue;
57

8+
import io.grpc.Status;
9+
import io.grpc.StatusRuntimeException;
610
import io.temporal.client.WorkflowClient;
11+
import io.temporal.client.WorkflowClientOptions;
712
import io.temporal.serviceclient.WorkflowServiceStubs;
813
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
914
import io.temporal.worker.WorkerFactory;
@@ -128,4 +133,24 @@ public void factoryCanBeShutdownMoreThanOnce() {
128133
factory.shutdown();
129134
factory.awaitTermination(1, TimeUnit.MILLISECONDS);
130135
}
136+
137+
@Test
138+
public void startFailsOnNonexistentNamespace() {
139+
WorkflowServiceStubs serviceLocal =
140+
WorkflowServiceStubs.newServiceStubs(
141+
WorkflowServiceStubsOptions.newBuilder().setTarget(serviceAddress).build());
142+
WorkflowClient clientLocal =
143+
WorkflowClient.newInstance(
144+
serviceLocal, WorkflowClientOptions.newBuilder().setNamespace("i_dont_exist").build());
145+
WorkerFactory factoryLocal = WorkerFactory.newInstance(clientLocal);
146+
factoryLocal.newWorker("task-queue");
147+
148+
StatusRuntimeException ex = assertThrows(StatusRuntimeException.class, factoryLocal::start);
149+
assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode());
150+
151+
factoryLocal.shutdownNow();
152+
factoryLocal.awaitTermination(5, TimeUnit.SECONDS);
153+
serviceLocal.shutdownNow();
154+
serviceLocal.awaitTermination(5, TimeUnit.SECONDS);
155+
}
131156
}

temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/WorkerVersioningTest.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static org.junit.jupiter.api.Assertions.assertTrue;
44

5+
import io.grpc.Status;
6+
import io.grpc.StatusRuntimeException;
57
import io.temporal.api.common.v1.WorkflowExecution;
68
import io.temporal.api.enums.v1.EventType;
79
import io.temporal.api.enums.v1.VersioningBehavior;
@@ -11,7 +13,14 @@
1113
import io.temporal.common.WorkflowExecutionHistory;
1214
import io.temporal.spring.boot.autoconfigure.workerversioning.TestWorkflow;
1315
import io.temporal.spring.boot.autoconfigure.workerversioning.TestWorkflow2;
14-
import org.junit.jupiter.api.*;
16+
import io.temporal.worker.WorkerFactory;
17+
import java.time.Duration;
18+
import org.junit.jupiter.api.Assumptions;
19+
import org.junit.jupiter.api.BeforeAll;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.api.TestInstance;
23+
import org.junit.jupiter.api.Timeout;
1524
import org.springframework.beans.factory.annotation.Autowired;
1625
import org.springframework.boot.test.context.SpringBootTest;
1726
import org.springframework.context.ConfigurableApplicationContext;
@@ -20,7 +29,7 @@
2029
import org.springframework.test.context.ActiveProfiles;
2130

2231
@SpringBootTest(classes = WorkerVersioningTest.Configuration.class)
23-
@ActiveProfiles(profiles = "worker-versioning")
32+
@ActiveProfiles(profiles = {"worker-versioning", "disable-start-workers"})
2433
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2534
public class WorkerVersioningTest {
2635
@Autowired ConfigurableApplicationContext applicationContext;
@@ -43,15 +52,13 @@ void setUp() {
4352
@Test
4453
@Timeout(value = 10)
4554
public void testAutoDiscovery() {
46-
workflowClient
47-
.getWorkflowServiceStubs()
48-
.blockingStub()
49-
.setWorkerDeploymentCurrentVersion(
50-
SetWorkerDeploymentCurrentVersionRequest.newBuilder()
51-
.setNamespace(workflowClient.getOptions().getNamespace())
52-
.setDeploymentName("dname")
53-
.setVersion("dname.bid")
54-
.build());
55+
// Manually start the worker because we disable automatic worker start, due to
56+
// automatic worker start running prior to the docker check, which causes namespace
57+
// errors when running in-mem unit tests
58+
WorkerFactory workerFactory = applicationContext.getBean(WorkerFactory.class);
59+
workerFactory.start();
60+
61+
setCurrentVersionWithRetry();
5562

5663
TestWorkflow testWorkflow =
5764
workflowClient.newWorkflowStub(
@@ -84,6 +91,36 @@ public void testAutoDiscovery() {
8491
== VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE));
8592
}
8693

94+
@SuppressWarnings("deprecation")
95+
private void setCurrentVersionWithRetry() {
96+
long deadline = System.currentTimeMillis() + Duration.ofSeconds(10).toMillis();
97+
while (true) {
98+
try {
99+
workflowClient
100+
.getWorkflowServiceStubs()
101+
.blockingStub()
102+
.setWorkerDeploymentCurrentVersion(
103+
SetWorkerDeploymentCurrentVersionRequest.newBuilder()
104+
.setNamespace(workflowClient.getOptions().getNamespace())
105+
.setDeploymentName("dname")
106+
.setVersion("dname.bid")
107+
.build());
108+
return;
109+
} catch (StatusRuntimeException e) {
110+
if (e.getStatus().getCode() != Status.Code.NOT_FOUND
111+
|| System.currentTimeMillis() > deadline) {
112+
throw e;
113+
}
114+
try {
115+
Thread.sleep(100);
116+
} catch (InterruptedException ie) {
117+
Thread.currentThread().interrupt();
118+
throw new RuntimeException(ie);
119+
}
120+
}
121+
}
122+
}
123+
87124
@ComponentScan(
88125
excludeFilters =
89126
@ComponentScan.Filter(

0 commit comments

Comments
 (0)