Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/grpc.echo/.github/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@ set -x

EXIT=0

# Test EchoService health (should be SERVING)
PORT="7151"
INPUT='{"service": "example.EchoService"}'
EXPECTED='{
"status": "SERVING"
}'
echo "# Testing EchoService health"
OUTPUT=$(docker compose run --rm grpcurl -plaintext -proto health.proto -d '{"service": "example.EchoService"}' zilla.examples.dev:7151 grpc.health.v1.Health.Check)
if [ "$OUTPUT" = "$EXPECTED" ]; then
echo ✅ "EchoService is SERVING"
else
echo ❌ "Unexpected EchoService health status: $OUTPUT"
EXIT=1
fi

# GIVEN
PORT="7151"
INPUT='{"message":"Hello World"}'
Expand Down Expand Up @@ -58,3 +73,4 @@ else
fi

exit $EXIT

6 changes: 5 additions & 1 deletion examples/grpc.echo/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ services:
hostname: zilla.examples.dev
ports:
- 7151:7151
- 4444:4444
healthcheck:
interval: 5s
timeout: 3s
retries: 5
test: ["CMD", "bash", "-c", "echo -n '' > /dev/tcp/127.0.0.1/7151"]
environment:
ZILLA_INCUBATOR_ENABLED: "true"
JAVA_TOOL_OPTIONS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:4444"
volumes:
- ./etc:/etc/zilla
command: start -v -e

command: start -v -e
grpcurl:
image: fullstorydev/grpcurl
stdin_open: true
Expand All @@ -25,6 +27,8 @@ services:
- on-demand
volumes:
- ./etc/protos/echo.proto:/echo.proto
- ./etc/protos/health.proto:/health.proto


networks:
default:
Expand Down
78 changes: 78 additions & 0 deletions examples/grpc.echo/etc/protos/health.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

// Copyright 2015 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The canonical version of this proto can be found at
// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto

syntax = "proto3";

package grpc.health.v1;

option csharp_namespace = "Grpc.Health.V1";
option go_package = "google.golang.org/grpc/health/grpc_health_v1";
option java_multiple_files = true;
option java_outer_classname = "HealthProto";
option java_package = "io.grpc.health.v1";

message HealthCheckRequest {
string service = 1;
}

message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}

service Health {
// If the requested service is unknown, the call will fail with status
// NOT_FOUND.
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

// Performs a watch for the serving status of the requested service.
// The server will immediately send back a message indicating the current
// serving status. It will then subsequently send a new message whenever
// the service's serving status changes.
//
// If the requested service is unknown when the call is received, the
// server will send a message setting the serving status to
// SERVICE_UNKNOWN but will *not* terminate the call. If at some
// future point, the serving status of the service becomes known, the
// server will send a new message with the service's serving status.
//
// If the call terminates with status UNIMPLEMENTED, then clients
// should assume this method is not supported and should not retry the
// call. If the call terminates with any other status (including OK),
// clients should retry the call with appropriate exponential backoff.
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
5 changes: 5 additions & 0 deletions examples/grpc.echo/etc/zilla.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ catalogs:
subjects:
echo:
path: protos/echo.proto

bindings:
north_tcp_server:
type: tcp
Expand All @@ -30,6 +31,9 @@ bindings:
north_grpc_server:
type: grpc
kind: server
options:
services:
- grpc.health.v1.Health
catalog:
host_filesystem:
- subject: echo
Expand All @@ -40,6 +44,7 @@ bindings:
north_echo_server:
type: echo
kind: server

telemetry:
exporters:
stdout_logs_exporter:
Expand Down
1 change: 1 addition & 0 deletions runtime/binding-grpc/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.

This project includes:
Protocol Buffers [Core] under BSD-3-Clause

6 changes: 6 additions & 0 deletions runtime/binding-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.5</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,28 @@
package io.aklivity.zilla.runtime.binding.grpc.config;

import java.util.List;
import java.util.function.Function;

import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class GrpcOptionsConfig extends OptionsConfig
{
public final List<GrpcProtobufConfig> protobufs;
public final List<String> services;

public GrpcOptionsConfig(
List<GrpcProtobufConfig> protobufs)
public static GrpcOptionsConfigBuilder<GrpcOptionsConfig> builder()
{
this.protobufs = protobufs;
return new GrpcOptionsConfigBuilder<>(GrpcOptionsConfig.class::cast);
}

public static <T> GrpcOptionsConfigBuilder<T> builder(
Function<OptionsConfig, T> mapper)
{
return new GrpcOptionsConfigBuilder<>(mapper);
}

GrpcOptionsConfig(
List<String> services)
{
this.services = services;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.aklivity.zilla.runtime.binding.grpc.config;

import java.util.List;
import java.util.function.Function;

import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class GrpcOptionsConfigBuilder<T> extends ConfigBuilder<T, GrpcOptionsConfigBuilder<T>>
{
private final Function<OptionsConfig, T> mapper;

private List<String> services;

GrpcOptionsConfigBuilder(
Function<OptionsConfig, T> mapper)
{
this.mapper = mapper;
}

@Override
@SuppressWarnings("unchecked")
protected Class<GrpcOptionsConfigBuilder<T>> thisType()
{
return (Class<GrpcOptionsConfigBuilder<T>>) getClass();
}


public GrpcOptionsConfigBuilder<T> services(
List<String> services)
{
this.services = services;
return this;
}

@Override
public T build()
{
return mapper.apply(new GrpcOptionsConfig(services));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.aklivity.zilla.runtime.binding.grpc.internal.config;

import static io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcType.BASE64;
Expand All @@ -20,6 +21,10 @@
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -55,6 +60,7 @@
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;


public final class GrpcBindingConfig
{
private static final Pattern METHOD_PATTERN = Pattern.compile("/(?<ServiceName>.*?)/(?<Method>.*)");
Expand All @@ -71,9 +77,11 @@ public final class GrpcBindingConfig
public final GrpcOptionsConfig options;
public final List<GrpcRouteConfig> routes;


private final GrpcProtobufParser parser;
private final HttpGrpcHeaderHelper helper;
private final Set<GrpcCatalogSchema> catalogs;
public final List<GrpcProtobufConfig> grpcServices;

public GrpcBindingConfig(
BindingConfig binding,
Expand All @@ -97,6 +105,35 @@ public GrpcBindingConfig(
}
}
this.catalogs = catalogs;

this.grpcServices = new ArrayList<>();

for (String serviceName : options.services)
{
if ("grpc.health.v1.Health".equals(serviceName))
{
String schema = loadProtoSchema("health.proto");
GrpcProtobufConfig protobufConfig = this.parser.parse("health.proto", schema);
this.grpcServices.add(protobufConfig);
}
}

}

private String loadProtoSchema(String resourcePath)
{
try (InputStream inputStream = getClass().getResourceAsStream(resourcePath))
{
if (inputStream == null)
{
throw new IllegalArgumentException("Could not find " + resourcePath + " on classpath");
}
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
catch (IOException e)
{
throw new RuntimeException("Failed to load proto schema", e);
}
}

public GrpcRouteConfig resolve(
Expand Down Expand Up @@ -129,12 +166,13 @@ public GrpcMethodResult resolveMethod(
final String methodName = matcher.group(METHOD);

GrpcMethodConfig method = resolveMethod(catalogs, serviceName, methodName);

if (method == null && options != null)
if (method == null)
{
method = resolveMethod(options.protobufs, serviceName, methodName);
// TODO: this may not the best
method = resolveMethod(grpcServices, serviceName, methodName);
}


if (method != null)
{
methodResolver = new GrpcMethodResult(
Expand Down
Loading
Loading