Skip to content

Commit fd65ea9

Browse files
Inject namespace header (#2400)
Inject namespace header
1 parent de33b3a commit fd65ea9

File tree

5 files changed

+135
-6
lines changed

5 files changed

+135
-6
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import io.grpc.ManagedChannel;
24+
import io.grpc.Metadata;
25+
import io.grpc.health.v1.HealthCheckResponse;
26+
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
27+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
28+
import io.temporal.serviceclient.GrpcMetadataProviderInterceptor;
29+
import io.temporal.serviceclient.WorkflowServiceStubs;
30+
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
31+
import java.time.Duration;
32+
import java.util.Collections;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.function.Supplier;
35+
import javax.annotation.Nullable;
36+
37+
/** Inject the namespace into the gRPC header */
38+
class NamespaceInjectWorkflowServiceStubs implements WorkflowServiceStubs {
39+
private static Metadata.Key<String> TEMPORAL_NAMESPACE_HEADER_KEY =
40+
Metadata.Key.of("temporal-namespace", Metadata.ASCII_STRING_MARSHALLER);
41+
private final Metadata metadata;
42+
private final WorkflowServiceStubs next;
43+
44+
public NamespaceInjectWorkflowServiceStubs(WorkflowServiceStubs next, String namespace) {
45+
this.next = next;
46+
this.metadata = new Metadata();
47+
metadata.put(TEMPORAL_NAMESPACE_HEADER_KEY, namespace);
48+
}
49+
50+
@Override
51+
public WorkflowServiceStubsOptions getOptions() {
52+
return next.getOptions();
53+
}
54+
55+
@Override
56+
public WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub() {
57+
return next.blockingStub()
58+
.withInterceptors(
59+
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata)));
60+
}
61+
62+
@Override
63+
public WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub() {
64+
return next.futureStub()
65+
.withInterceptors(
66+
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata)));
67+
}
68+
69+
@Override
70+
public ManagedChannel getRawChannel() {
71+
return next.getRawChannel();
72+
}
73+
74+
@Override
75+
public void shutdown() {
76+
next.shutdown();
77+
}
78+
79+
@Override
80+
public void shutdownNow() {
81+
next.shutdownNow();
82+
}
83+
84+
@Override
85+
public boolean isShutdown() {
86+
return next.isShutdown();
87+
}
88+
89+
@Override
90+
public boolean isTerminated() {
91+
return next.isTerminated();
92+
}
93+
94+
@Override
95+
public boolean awaitTermination(long timeout, TimeUnit unit) {
96+
return next.awaitTermination(timeout, unit);
97+
}
98+
99+
@Override
100+
public void connect(@Nullable Duration timeout) {
101+
next.connect(timeout);
102+
}
103+
104+
@Override
105+
public HealthCheckResponse healthCheck() {
106+
return next.healthCheck();
107+
}
108+
109+
@Override
110+
public Supplier<GetSystemInfoResponse.Capabilities> getServerCapabilities() {
111+
return next.getServerCapabilities();
112+
}
113+
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public static WorkflowClient newInstance(
8989
WorkflowClientInternalImpl(
9090
WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) {
9191
options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults();
92+
workflowServiceStubs =
93+
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
9294
this.options = options;
9395
this.workflowServiceStubs = workflowServiceStubs;
9496
this.metricsScope =

temporal-sdk/src/test/java/io/temporal/authorization/AuthorizationTokenTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.temporal.workflow.shared.TestWorkflows;
3434
import java.time.Duration;
3535
import java.util.ArrayList;
36+
import java.util.Arrays;
3637
import java.util.Collections;
3738
import java.util.List;
3839
import org.junit.After;
@@ -43,6 +44,8 @@
4344
import org.junit.runner.Description;
4445

4546
public class AuthorizationTokenTest {
47+
private static Metadata.Key<String> TEMPORAL_NAMESPACE_HEADER_KEY =
48+
Metadata.Key.of("temporal-namespace", Metadata.ASCII_STRING_MARSHALLER);
4649
private static final String TASK_QUEUE = "test-workflow";
4750
private static final String AUTH_TOKEN = "Bearer <token>";
4851

@@ -89,7 +92,8 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
8992
new GrpcRequest(
9093
method.getBareMethodName(),
9194
headers.get(
92-
AuthorizationGrpcMetadataProvider.AUTHORIZATION_HEADER_KEY)));
95+
AuthorizationGrpcMetadataProvider.AUTHORIZATION_HEADER_KEY),
96+
headers.get(TEMPORAL_NAMESPACE_HEADER_KEY)));
9397
super.start(responseListener, headers);
9498
}
9599
}
@@ -121,9 +125,18 @@ public void allRequestsShouldHaveAnAuthToken() {
121125
assertEquals("TestWorkflow1-input1", result);
122126

123127
assertFalse(loggedRequests.isEmpty());
128+
// These methods are not namespace specific
129+
List<String> methodsToSkip =
130+
Arrays.asList("GetSystemInfo", "Check", "UnlockTimeSkipping", "LockTimeSkipping");
124131
for (GrpcRequest grpcRequest : loggedRequests) {
125132
assertEquals(
126133
"All requests should have an auth token", AUTH_TOKEN, grpcRequest.authTokenValue);
134+
if (!methodsToSkip.contains(grpcRequest.methodName)) {
135+
assertEquals(
136+
"All requests should have a namespace " + grpcRequest.methodName,
137+
testEnvironment.getNamespace(),
138+
grpcRequest.namespace);
139+
}
127140
}
128141
}
129142

@@ -139,10 +152,12 @@ public String execute(String input) {
139152
private static class GrpcRequest {
140153
final String methodName;
141154
final String authTokenValue;
155+
final String namespace;
142156

143-
GrpcRequest(String methodName, String authTokenValue) {
157+
GrpcRequest(String methodName, String authTokenValue, String namespace) {
144158
this.methodName = methodName;
145159
this.authTokenValue = authTokenValue;
160+
this.namespace = namespace;
146161
}
147162
}
148163
}

temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ public void retryUntilDurable() {
396396
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
397397
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
398398
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
399+
when(blockingStub.withInterceptors(any())).thenReturn(blockingStub);
399400
when(blockingStub.withDeadline(any())).thenReturn(blockingStub);
400401

401402
Scope scope = mock(Scope.class);
@@ -457,6 +458,7 @@ public void handleSuccessfulStartButUpdateOnlyErr() {
457458
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
458459
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
459460
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
461+
when(blockingStub.withInterceptors(any())).thenReturn(blockingStub);
460462
when(blockingStub.withDeadline(any())).thenReturn(blockingStub);
461463

462464
Scope scope = mock(Scope.class);

temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -634,10 +634,7 @@ public T addGrpcMetadataProvider(GrpcMetadataProvider grpcMetadataProvider) {
634634
*/
635635
public T addApiKey(AuthorizationTokenSupplier apiKey) {
636636
addGrpcMetadataProvider(
637-
new AuthorizationGrpcMetadataProvider(
638-
() -> {
639-
return "Bearer " + apiKey.supply();
640-
}));
637+
new AuthorizationGrpcMetadataProvider(() -> "Bearer " + apiKey.supply()));
641638
return self();
642639
}
643640

0 commit comments

Comments
 (0)