Skip to content

Commit cfe1e54

Browse files
anfelbarUnnar
andauthored
Client is able to forward notification and invocation data (#32)
* Added ficti grpc stub excample from grpc library for testing purposes * Added server example to forward notifications and invocations It uses a basic protobuf and mannagedchannel for receiving/sending data from/to the client * Added option for passing forwarding endpoint to the client * Added license information to demo server files * Added documentation about forwarding to external server * Corrected typo by Unnar Co-authored-by: Unnar Freyr Erlendsson <[email protected]> * Added license file for demoserver, modified version from grpc-java Co-authored-by: Unnar Freyr Erlendsson <[email protected]>
1 parent 8462eee commit cfe1e54

File tree

9 files changed

+567
-19
lines changed

9 files changed

+567
-19
lines changed

java/com/engflow/notificationqueue/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ java_binary(
4545
":engflowapis_java_proto",
4646
":eventstore_java_grpc",
4747
":notification_queue_java_grpc",
48+
"//java/com/engflow/notificationqueue/demoserver:server_java_grpc_proto",
49+
"//java/com/engflow/notificationqueue/demoserver:server_java_proto",
50+
"//java/com/engflow/notificationqueue/demoserver:server_proto",
4851
"@com_google_protobuf//:any_proto",
4952
"@com_google_protobuf//java/core",
5053
"@io_grpc_grpc_java//api",

java/com/engflow/notificationqueue/Client.java

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
import com.engflow.notification.v1.PullNotificationRequest;
1010
import com.engflow.notification.v1.PullNotificationResponse;
1111
import com.engflow.notification.v1.QueueId;
12+
import com.engflow.notificationqueue.demoserver.EngFlowRequest;
13+
import com.engflow.notificationqueue.demoserver.EngFlowResponse;
14+
import com.engflow.notificationqueue.demoserver.ForwardingGrpc;
1215
import com.google.common.base.Preconditions;
1316
import com.google.common.base.Strings;
1417
import com.google.protobuf.Any;
1518
import com.google.protobuf.InvalidProtocolBufferException;
1619
import io.grpc.ManagedChannel;
1720
import io.grpc.Metadata;
21+
import io.grpc.StatusRuntimeException;
1822
import io.grpc.netty.GrpcSslContexts;
1923
import io.grpc.netty.NegotiationType;
2024
import io.grpc.netty.NettyChannelBuilder;
@@ -38,7 +42,9 @@ public static void main(String[] args) throws Exception {
3842
System.err.println(e);
3943
System.err.println(" --notification_queue_endpoint");
4044
System.err.println(" --queue_name");
41-
System.err.println("Please provide also authentication credentials");
45+
System.err.println(
46+
"Please provide also authentication credentials "
47+
+ "and if you want to forward then add another external server endpoint");
4248
return;
4349
}
4450

@@ -62,17 +68,31 @@ public static void main(String[] args) throws Exception {
6268
throw new IllegalStateException(e);
6369
}
6470

71+
ManagedChannel forwardChannel = null;
72+
if (!Strings.isNullOrEmpty(clientOptions.getOption("forward"))) {
73+
try {
74+
forwardChannel = createChannel(clientOptions.getOption("forward"), null, null);
75+
} catch (IllegalArgumentException e) {
76+
System.err.println("Could not open forwarding channel");
77+
} catch (IllegalStateException e) {
78+
System.err.println("Could not open forwarding channel");
79+
} catch (IOException e) {
80+
System.err.println("Could not open forwarding channel");
81+
}
82+
}
6583
try {
6684
final Metadata header = new Metadata();
6785
Metadata.Key<String> userKey =
6886
Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
6987
header.put(userKey, "Bearer " + clientOptions.getOption("token"));
70-
pull(channel, clientOptions.getOption("queue_name"), header);
88+
pull(channel, clientOptions.getOption("queue_name"), header, forwardChannel);
7189
} finally {
7290
if (channel != null) {
7391
channel.shutdownNow();
92+
forwardChannel.shutdownNow();
7493
try {
7594
channel.awaitTermination(10, TimeUnit.SECONDS);
95+
forwardChannel.awaitTermination(10, TimeUnit.SECONDS);
7696
} catch (InterruptedException e) {
7797
System.out.println("Could not shut down channel within timeout");
7898
}
@@ -89,18 +109,30 @@ public static void main(String[] args) throws Exception {
89109
* @param header metadata for token authentication (if needed)
90110
* @throws InterruptedException
91111
*/
92-
private static void pull(ManagedChannel channel, String queueName, Metadata header)
112+
private static void pull(
113+
ManagedChannel channel, String queueName, Metadata header, ManagedChannel forwardChannel)
93114
throws InterruptedException {
115+
94116
NotificationQueueGrpc.NotificationQueueStub asyncStub = NotificationQueueGrpc.newStub(channel);
95117
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
96118
final CountDownLatch finishLatch = new CountDownLatch(1);
119+
System.out.println("Listening for build events...");
97120
StreamObserver<PullNotificationRequest> requestObserver =
98121
asyncStub.pull(
99122
new StreamObserver<PullNotificationResponse>() {
100123
@Override
101124
public void onNext(PullNotificationResponse response) {
102125
Notification streamedNotification = response.getNotification().getNotification();
103-
System.out.println("Notification: " + streamedNotification);
126+
System.out.println("Notification: " + streamedNotification.toString());
127+
try {
128+
/** Forward notification data to external server */
129+
forwardToBESStub(
130+
forwardChannel,
131+
streamedNotification.getId().toString(),
132+
streamedNotification.getPayload().toString());
133+
} catch (Exception e) {
134+
System.err.println("Could not forward notification to external sever...");
135+
}
104136
Any notificationContent = streamedNotification.getPayload();
105137
try {
106138
BuildLifecycleEventNotification lifeCycleEvent =
@@ -116,7 +148,7 @@ public void onNext(PullNotificationResponse response) {
116148
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
117149
* acquired invocation id
118150
*/
119-
getInvocations(channel, invocation, header);
151+
getInvocations(channel, invocation, header, forwardChannel);
120152
} catch (InterruptedException e) {
121153
System.err.println("Could not get invocation with uuid " + invocation);
122154
}
@@ -162,7 +194,8 @@ public void onCompleted() {
162194
* @param header metadata for token authentication (if needed)
163195
* @throws InterruptedException
164196
*/
165-
public static void getInvocations(ManagedChannel channel, String invocationId, Metadata header)
197+
private static void getInvocations(
198+
ManagedChannel channel, String invocationId, Metadata header, ManagedChannel forwardChannel)
166199
throws InterruptedException {
167200
EventStoreGrpc.EventStoreStub asyncStub = EventStoreGrpc.newStub(channel);
168201
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
@@ -172,6 +205,13 @@ public static void getInvocations(ManagedChannel channel, String invocationId, M
172205
@Override
173206
public void onNext(StreamedBuildEvent response) {
174207
System.out.println("Invocation: " + response.toString());
208+
String buildEvent = response.getEvent().toString();
209+
try {
210+
/** Forward invocation data to external server */
211+
forwardToBESStub(forwardChannel, invocationId, buildEvent);
212+
} catch (Exception e) {
213+
System.err.println("Could not forward invocation to external sever...");
214+
}
175215
}
176216

177217
@Override
@@ -186,6 +226,30 @@ public void onCompleted() {
186226
});
187227
}
188228

229+
/**
230+
* Forwards data to an external grpc stub.
231+
*
232+
* @param channel a grpc channel for connection
233+
* @param id the id of the data to send
234+
* @param payload the payload
235+
*/
236+
private static void forwardToBESStub(ManagedChannel channel, String id, String payload) {
237+
if (channel == null) {
238+
return;
239+
}
240+
final ForwardingGrpc.ForwardingBlockingStub blockingStub =
241+
ForwardingGrpc.newBlockingStub(channel);
242+
EngFlowRequest request = EngFlowRequest.newBuilder().setId(id).setPayload(payload).build();
243+
EngFlowResponse response;
244+
try {
245+
response = blockingStub.forwardStream(request);
246+
System.out.println("Forwarding: " + response.getMessage());
247+
} catch (StatusRuntimeException e) {
248+
System.out.println("Could not forward data to external server.");
249+
return;
250+
}
251+
}
252+
189253
private static ManagedChannel createChannel(
190254
String endpoint, @Nullable String clientCertificate, @Nullable String clientKey)
191255
throws IOException {

java/com/engflow/notificationqueue/NotificationOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,16 @@ private void instantiateOptions() {
9191
.desc("Token for open access clusters.")
9292
.build();
9393
options.addOption(remoteHeader);
94+
95+
Option forwardSever =
96+
Option.builder()
97+
.longOpt("forward")
98+
.argName("property=value")
99+
.hasArgs()
100+
.valueSeparator()
101+
.numberOfArgs(2)
102+
.desc("The external service endpoint in grpc://host:port format.")
103+
.build();
104+
options.addOption(forwardSever);
94105
}
95106
}

java/com/engflow/notificationqueue/README.md

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dependency (see [WORKSPACE](../../../../WORKSPACE)).
1111
The first argument you must have to execute the client is the cluster `grpc` endpoint
1212
you want to listen to.
1313
The second argument is the queue name. As in this example we are interested in
14-
getting lifecycle events, we pull from the queue called `lifecycle-events`.
14+
getting lifecycle events from the cluster, we pull from the queue called `lifecycle-events`.
1515

1616
1. `--notification_queue_endpoint=CLUSTER_URL` the URL of the cluster gRPC
1717
server. Must start with `grpc://` or `grpcs://`
@@ -26,7 +26,7 @@ but are required by the cluster, the connection is rejected.
2626
### Using certificates to run the client
2727

2828
In the first case you should have valid credentials in `.crt` and `.key` files. Add
29-
the full path of your certificate files into following arguments
29+
the full path of your certificate files into the following options
3030

3131
1. `--tls_certificate=certificate file containing your public key`
3232
holds the path of the crt file to access the cluster.
@@ -37,8 +37,8 @@ the full path of your certificate files into following arguments
3737

3838
Run the client using
3939

40-
```
41-
bazel run //java/com/engflow/notificationqueue/client -- \
40+
```bash
41+
bazel run //java/com/engflow/notificationqueue:client -- \
4242
'--notification_queue_endpoint=grpcs://example.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \
4343
'--tls_certificate=example_client.crt' '--tls_key=example_client.key'
4444
```
@@ -59,36 +59,61 @@ the argument
5959

6060
Run the client using
6161

62-
```
63-
bazel run //java/com/engflow/notificationqueue/client -- \
62+
```bash
63+
bazel run //java/com/engflow/notificationqueue:client -- \
6464
'--notification_queue_endpoint=grpcs://envoy.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \
65-
'--token=ghs_vHu2hAHwhg2EjBXrs4koOxk5PfSKVb2lzAUM
65+
'--token=ghs_vHu2hAHwhg2EjBXrs4koOxk5PfSKVb2lzAUM'
6666
```
6767

68-
At this point, the client should be listening to the lifecycle events of the cluster. It then remains
69-
to build something and to get its notifications and invocations.
68+
Note: The token provided in the example is not valid. You should count with a
69+
valid envoy-mobile token to use envoy-cluster. Change your target cluster and
70+
acquire a valid token for executing the client.
71+
72+
### Forwarding data to external server
73+
74+
One useful use case for the **notification** and **event store APIs** is forwarding the
75+
received data to external servers, for instance another _build event store_ or _result store_ sever.
76+
The current client example implements communication
77+
with a demo grpc stub. To do so, the client uses `Request` and `Response`
78+
abstractions provided by a [server proto definition].
7079

80+
To execute a client that forwards information to an external server you add the flag
81+
`--forward`. Let us first execute a demonstration server that will receive the data from
82+
the client. To execute the server run
83+
84+
```bash
85+
bazel run //java/com/engflow/notificationqueue/demoserver:server
86+
```
87+
88+
This will start the demo server listening at `localhost:50051`. Now you can start the client with a given
89+
authentication method and the forwarding endpoint
90+
91+
```bash
92+
bazel run //java/com/engflow/notificationqueue:client -- \
93+
'--notification_queue_endpoint=grpcs://example.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \
94+
'--tls_certificate=example_client.crt' '--tls_key=example_client.key' '--forward=grpc://localhost:50051'
95+
```
7196

7297
# Executing a foo application
7398

7499
You may now build any targets on the cluster. Open your favorite foo project and build it using
75100
the runtime flags `remote_cache` and `bes_backend` arguments like this
76101

77-
```
78-
bazel build //... --remote_cache=CLUSTER_URL --bes_backend=CLUSTER_URL
102+
```bash
103+
bazel build //... '--remote_cache=grpcs://example.cluster.engflow.com' '--bes_backend=grpcs://example.cluster.engflow.com'
79104
```
80105

81106
You should see a series of notifications like this one
82107

83-
```
108+
```json
84109
type_url: "type.googleapis.com/engflow.eventstore.v1.BuildLifecycleEventNotification"
85110
value: "\022$e03d2afe-1a78-4f14-a0f7-85ae65e7e856\"%user_keyword=engflow:StreamSource=BES\"/user_keyword=engflow:StreamType=ClientBEPStream\272\006&\n$1e4f34ee-4669-4ce0-a3fe-5e115ad4772e"
86111
```
87112

88113
The value, with some garbage characters, contains the uuid for one invocation.
89114
Using this uuid we may get an invocation like this one
90115

91-
```
116+
```json
92117
StreamedBuildEvent: continuation_token: "CiQyMWFjMDlkNC0zZWIzLTQ2MzQtODI0MS0yMzk0Y2JhN2UwMGEQARjSCiAB"
93118
event {
94119
stream_id {
@@ -107,3 +132,6 @@ event {
107132
}
108133
}
109134
```
135+
136+
137+
[server proto definition]: demoserver/server.proto
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package(default_visibility = ["//visibility:public"])
2+
3+
load("@io_grpc_grpc_java//:java_grpc_library.bzl", "java_grpc_library")
4+
5+
proto_library(
6+
name = "server_proto",
7+
srcs = ["server.proto"],
8+
)
9+
10+
java_proto_library(
11+
name = "server_java_proto",
12+
deps = [":server_proto"],
13+
)
14+
15+
java_grpc_library(
16+
name = "server_java_grpc_proto",
17+
srcs = [
18+
":server_proto",
19+
],
20+
deps = [
21+
":server_java_proto",
22+
],
23+
)
24+
25+
java_binary(
26+
name = "server",
27+
srcs = [
28+
"DemoServer.java",
29+
],
30+
main_class = "com.engflow.notificationqueue.demoserver.DemoServer",
31+
deps = [
32+
":server_java_grpc_proto",
33+
":server_java_proto",
34+
":server_proto",
35+
"@io_grpc_grpc_java//api",
36+
"@io_grpc_grpc_java//netty",
37+
"@io_grpc_grpc_java//stub",
38+
],
39+
)

0 commit comments

Comments
 (0)