Skip to content

Commit b89a14d

Browse files
committed
merge with master
2 parents 3280ba3 + 710dccb commit b89a14d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+6694
-5786
lines changed

CHANGELOG.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Support publishing events consumed from [NATS](https://nats.io) topics. See the [documentation](https://accenture.github.io/reactive-interaction-gateway/docs/event-streams.html#nats) for how to get started. [#297](https://github.com/Accenture/reactive-interaction-gateway/issues/297)
13+
- Added validation for reverse proxy configuration. Now it crashes RIG on start when configuration is not valid or returns `400` when using REST API to update configuration. [#277](https://github.com/Accenture/reactive-interaction-gateway/issues/277)
14+
- Added basic distributed tracing support in [W3C Trace Context specification](https://www.w3.org/TR/trace-context/) with Jaeger and Openzipkin exporters. RIG opens a span at the API Gateway and emits trace context in Cloud Events following the [distributed tracing spec](https://github.com/cloudevents/spec/blob/v1.0/extensions/distributed-tracing.md). [#281](https://github.com/Accenture/reactive-interaction-gateway/issues/281)
15+
- Added possibility to set response code for `response_from` messages in reverse proxy (`kafka` and `http_async`). [#321](https://github.com/Accenture/reactive-interaction-gateway/pull/321)
16+
- Added new version - `v3` - for internal endpoints to support response code in the `/responses` endpoint
1217
- Added Helm v3 template to the `deployment` folder [#288](https://github.com/Accenture/reactive-interaction-gateway/issues/288)
1318

1419
### Changed
1520

16-
- Support publishing events consumed from [NATS](https://nats.io) topics. See the [documentation](https://accenture.github.io/reactive-interaction-gateway/docs/event-streams.html#nats) for how to get started. [#297](https://github.com/Accenture/reactive-interaction-gateway/issues/297)
17-
- Added validation for reverse proxy configuration. Now it crashes RIG on start when configuration is not valid or returns `400` when using REST API to update configuration. [#277](https://github.com/Accenture/reactive-interaction-gateway/issues/277)
18-
- Added basic distributed tracing support in [W3C Trace Context specification](https://www.w3.org/TR/trace-context/) with Jaeger and Openzipkin exporters. RIG opens a span at the API Gateway and emits trace context in Cloud Events following the [distributed tracing spec](https://github.com/cloudevents/spec/blob/v1.0/extensions/distributed-tracing.md). [#281](https://github.com/Accenture/reactive-interaction-gateway/issues/281)
21+
- Incorporated [cloudevents-ex](https://github.com/kevinbader/cloudevents-ex) to handle binary and structured modes for [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md) in a proper way. This introduces some **breaking changes**:
22+
- Binary mode is now using `ce_` prefix for CloudEvents context attribute headers, before it was `ce-` - done according to the [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md)
23+
- Change above affects also `"response_from": "kafka"` proxy functionality. RIG will forward to clients only Kafka body, no headers. This means, when using binary mode, clients receive only the data part, no CloudEvents context attributes.
24+
- Changed `response_from` handler to expect a message in binary format, **NOT** a cloud event (`kafka` and `http_async`). [#321](https://github.com/Accenture/reactive-interaction-gateway/pull/321)
1925
- Updated Helm v2 template, kubectl yaml file and instructions in the `deployment` folder [#288](https://github.com/Accenture/reactive-interaction-gateway/issues/288)
2026

2127
### Fixed

Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ RUN apk add --no-cache bash
4040
ENV LANG C.UTF-8
4141
ENV LC_ALL C.UTF-8
4242

43+
RUN addgroup -S rig -g 1000 && adduser -S rig -G rig --uid 1000
4344
WORKDIR /opt/sites/rig
4445
COPY --from=build /opt/sites/rig/_build/prod/rel/rig /opt/sites/rig/
46+
RUN chown -R rig:rig /opt/sites/rig
47+
USER rig
4548

4649
# Proxy
4750
EXPOSE 4000

aws.dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ ENV KINESIS_OTP_JAR=/opt/sites/rig/kinesis-client/local-maven-repo/org/erlang/ot
4949
# Install Java
5050
RUN apk add --no-cache openjdk8-jre
5151

52+
RUN addgroup -S rig -g 1000 && adduser -S rig -G rig --uid 1000
5253
WORKDIR /opt/sites/rig
5354
COPY --from=elixir-build /opt/sites/rig/_build/prod/rel/rig /opt/sites/rig/
5455
COPY --from=java-build opt/sites/rig/kinesis-client /opt/sites/rig/kinesis-client
56+
RUN chown -R rig:rig /opt/sites/rig
57+
USER rig
5558

5659
# Proxy
5760
EXPOSE 4000

config/config.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ config :logger, :console,
102102
format: "\n$time [$level] $levelpad$message\n$metadata\n",
103103
metadata: metadata |> Enum.uniq()
104104

105-
config :rig, Rig.Application, log_level: {:system, :atom, "LOG_LEVEL", :debug}
105+
config :rig, Rig.Application,
106+
log_level: {:system, :atom, "LOG_LEVEL", :debug},
107+
schema_registry_host: {:system, "KAFKA_SCHEMA_REGISTRY_HOST", nil}
106108

107109
# --------------------------------------
108110
# Session and Authorization

config/rig_api/config.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ config :phoenix, :serve_endpoints, true
4545

4646
config :rig, RigApi.V1.APIs, rig_proxy: RigInboundGateway.Proxy
4747
config :rig, RigApi.V2.APIs, rig_proxy: RigInboundGateway.Proxy
48+
config :rig, RigApi.V3.APIs, rig_proxy: RigInboundGateway.Proxy
4849

4950
config :rig, :event_filter, Rig.EventFilter
5051

config/rig_api/test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ config :rig, RigApi.Endpoint,
1010

1111
config :rig, RigApi.V1.APIs, rig_proxy: RigInboundGateway.ProxyMock
1212
config :rig, RigApi.V2.APIs, rig_proxy: RigInboundGateway.ProxyMock
13+
config :rig, RigApi.V3.APIs, rig_proxy: RigInboundGateway.ProxyMock
1314

1415
config :rig, :event_filter, Rig.EventFilterMock

config/rig_tests/test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ config :rig, RigTests.Proxy.ResponseFrom.KafkaTest,
2323
ssl_keyfile_pass: {:system, "KAFKA_SSL_KEYFILE_PASS", ""},
2424
# Credentials for SASL/Plain authentication. Example: "plain:myusername:mypassword"
2525
sasl: {:system, "KAFKA_SASL", nil},
26-
response_topic: "rig-proxy-response"
26+
response_topic: {:system, "PROXY_KAFKA_RESPONSE_TOPICS", "rig-proxy-response"}
2727

2828
config :rig, RigTests.Proxy.PublishToEventStream.KafkaTest,
2929
server_id: :rig_proxy_publish_kafkatest_genserver,

docs/api-gateway.md

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ This defines a single service called "my-service". The URL is read from an given
4646
As a demo service, we use a small Node.js script:
4747

4848
```js
49-
const http = require('http');
49+
const http = require("http");
5050
const port = 3000;
5151
const handler = (_req, res) => res.end("Hi, I'm a demo service!\n");
5252
const server = http.createServer(handler);
53-
server.listen(port, err => {
53+
server.listen(port, (err) => {
5454
if (err) {
5555
return console.error(err);
5656
}
@@ -180,7 +180,7 @@ The endpoint expects the following request format:
180180
### Wait for response
181181
182182
Sometimes it makes sense to provide a simple request-response API to something that runs asynchronously on the backend. For example, let's say there's a ticket reservation process that takes 10 seconds in total and involves three different services that communicate via message passing. For an external client, it may be simpler to wait 10 seconds for the response instead of polling for a response every other second.
183-
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).
183+
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). **Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).**
184184
185185
Configuration of such API endpoint might look like this:
186186
@@ -205,26 +205,37 @@ Configuration of such API endpoint might look like this:
205205
206206
> Note the presence of `response_from` field. This tells RIG to wait for different event with the same correlation ID.
207207
208-
As an alternative you can set `response_from` to `http_async`. This means that correlated response has to be sent to internal `:4010/v2/responses` `POST` endpoint with a body like this:
208+
#### Supported combinations (`target` -> `response_from`)
209+
210+
- HTTP -> `kafka`/`http_async`/`kinesis`
211+
- Kafka -> `kafka`
212+
- Kinesis -> **not supported**
213+
- Nats -> `nats`
214+
215+
`http_async` means that correlated response has to be sent to internal `:4010/v3/responses` `POST` endpoint.
216+
217+
#### Supported formats
218+
219+
All `response_from` options support only binary mode.
220+
221+
Message headers:
222+
223+
```plaintext
224+
rig-correlation: "correlation_id_sent_by_rig"
225+
rig-response-code: "201"
226+
content-type: "application/json"
227+
```
228+
229+
> All headers are required.
230+
231+
Message body:
209232
210233
```json
211234
{
212-
"id": "1",
213-
"specversion": "0.2",
214-
"source": "my-service",
215-
"type": "com.example",
216-
"rig": {
217-
"correlation": "_id_"
218-
},
219-
"data": {
220-
...
221-
}
222-
...
235+
"foo": "bar"
223236
}
224237
```
225238
226-
> **NOTE:** Kinesis doesn't support `response_from` field yet.
227-
228239
## Auth
229240
230241
RIG can do simple auth check for endpoints. Currently supports JWT.

docs/avro.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,37 +24,37 @@ Adopting Avro for event (de)serialization is fairly straightforward. First you n
2424
## RIG as a Kafka producer
2525

2626
- producer evaluates if serialization is turned on by checking `KAFKA_SERIALIZER` environment variable and if it's value is `avro`
27-
- If it is, creates headers for Kafka event by appending `ce-` prefix for every field, besides `data` field
27+
- If it is, creates headers for Kafka event by appending `ce_` prefix for every field, except `data` field - **binary mode**
2828
- **nested context attributes are stringified**, since Kafka headers don't support nested values (this is common when using Cloud events extensions)
2929
- after that, the `data` field is serialized using the schema name (function for getting schemas from registry is cached in-memory)
3030
- producer sends event with created headers and data (in binary format `<<0, 0, 0, 0, 1, 5, 3, 8, ...>>`) to Kafka
3131

32-
> If `KAFKA_SERIALIZER` is not set to `avro`, producer sets **only** `ce-contenttype` or `ce-contentType` for kafka event
32+
> If `KAFKA_SERIALIZER` is not set to `avro`, producer sets **only** `ce_contenttype` or `ce_contentType` for kafka event
3333
3434
## RIG as a Kafka consumer
3535

36-
- when consuming Kafka event, RIG checks headers of such event and removes `ce-` prefix
37-
- based on headers decides cloud events version and content type
38-
- In case content type is `avro/binary`, schema ID is taken from event value and deserialized
39-
- If content type is **not** present, RIG checks for Avro format (`<<0, 0, 0, 0, 1, 5, 3, 8, ...>>`) and attempts for deserialization, otherwise event is sent to client as it is without any deserialization
36+
Event parsing is based on the [Kafka Transport Binding for CloudEvents v1.0](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md) implemented via [cloudevents-ex](https://github.com/kevinbader/cloudevents-ex). Check the [Event format](./event-format.md#kafka-transport-binding) section.
4037

4138
## Example 1: producing to and consuming from the same topic
4239

4340
In this example we'll have RIG send a message to itself to see whether RIG producing and consuming parts work correctly. The idea is that RIG produces a serialized event as a result to an HTTP request and, a few moments later, consumes that same event (and deserializes it correctly).
4441

4542
```bash
43+
4644
## 1. Start Kafka with Zookeeper and Kafka Schema Registry
45+
4746
KAFKA_PORT_PLAIN=17092 KAFKA_PORT_SSL=17093 HOST=localhost docker-compose -f integration_tests/kafka_tests/docker-compose.yml up -d
4847

4948
## 2. Start Rig
49+
5050
# Here we say to use Avro, consume on topic "rigRequest" and use "rigRequest-value" schema from Kafka Schema Registry
5151
# Proxy is turned on to be able to produce Kafka event with headers (needed for cloud events)
5252
docker run --name rig \
5353
-e KAFKA_BROKERS=kafka:9292 \
5454
-e KAFKA_SERIALIZER=avro \
5555
-e KAFKA_SCHEMA_REGISTRY_HOST=kafka-schema-registry:8081 \
5656
-e KAFKA_SOURCE_TOPICS=rigRequest \
57-
-e PROXY_CONFIG_FILE=proxy/proxy.test.json \
57+
-e PROXY_CONFIG_FILE='[{"id":"my-api","name":"my-api","versioned":false,"version_data":{"default":{"endpoints":[{"id":"post-myapi-publish-async","path":"/myapi/publish-async","method":"POST","target":"kafka"}]}},"proxy":{"use_env":true,"target_url":"KAFKA_HOST","port":9092}}]' \
5858
-e PROXY_KAFKA_REQUEST_TOPIC=rigRequest \
5959
-e PROXY_KAFKA_REQUEST_AVRO=rigRequest-value \
6060
-e LOG_LEVEL=debug \
@@ -63,25 +63,25 @@ docker run --name rig \
6363
accenture/reactive-interaction-gateway
6464

6565
## 3. Register Avro schema in Kafka Schema Registry
66+
6667
curl -d '{"schema":"{\"name\":\"rigproducer\",\"type\":\"record\",\"fields\":[{\"name\":\"example\",\"type\":\"string\"}]}"}' -H "Content-Type: application/vnd.schemaregistry.v1+json" -X POST http://localhost:8081/subjects/rigRequest-value/versions
6768

6869
## 4. Send HTTP request to RIG proxy
70+
6971
# Request will produce serialized Kafka event to Kafka
7072
curl -d '{"event":{"id":"069711bf-3946-4661-984f-c667657b8d85","type":"com.example","time":"2018-04-05T17:31:00Z","specversion":"0.2","source":"\/cli","contenttype":"avro\/binary","data":{"example":"test"}},"partition":"test_key"}' -H "Content-Type: application/json" -X POST http://localhost:4000/myapi/publish-async
7173

7274
## 5. In terminal you should see something like below -- in nutshell it means event was successfully consumed, deserialized and forwarded to UI client
73-
16:46:31.549 [debug] Decoded Avro message="{\"example\":\"test\"}"
74-
application=rig_kafka module=RigKafka.Avro function=decode/1 file=lib/rig_kafka/avro.ex line=28 pid=<0.419.0>
7575

76-
16:46:31.550 [debug] [:start_object, {:string, "contenttype"}, :colon, {:string, "avro/binary"}, :comma, {:string, "data"}, :colon, :start_object, {:string, "example"}, :colon, {:string, "test"}, :end_object, :comma, {:string, "id"}, :colon, {:string, "069711bf-3946-4661-984f-c667657b8d85"}, :comma, {:string, "rig"}, :colon, :start_object, {:string, "correlation"}, :colon, {:string, "g2dkAA1ub25vZGVAbm9ob3N0AAADzAAAAAAA"}, :comma, {:string, "headers"}, :colon, :start_array, :start_array, {:string, "accept"}, :end_array, :comma, :start_array, {:string, "*/*"}, :end_array, :comma, :start_array, {:string, "content-length"}, :end_array, :comma, :start_array, {:string, "221"}, :end_array, :comma, :start_array, {:string, "content-type"}, :end_array, :comma, :start_array, {:string, ...}, :end_array, ...]
77-
application=rig module=Rig.EventStream.KafkaToFilter function=kafka_handler/1 file=lib/rig/event_stream/kafka_to_filter.ex line=20 pid=<0.419.0>
76+
21:54:52.869 module=Avrora.Storage.Registry [debug] obtaining schema with global id `1`
77+
21:54:52.870 module=Rig.EventStream.KafkaToFilter [debug] %Cloudevents.Format.V_0_2.Event{contenttype: "avro/binary", data: %{"example" => "test"}, extensions: %{"rig" => %{"correlation" => "Ve1d-XF0Qi46lwh47X5IqI7m_FCIqCLsqyV0KTCxg28Hnd7ytczBe1cASZYPxA7GNFCZ4AzDC0QX1w0=", "headers" => [["accept", "*/*"], ["content-length", "221"], ["content-type", "application/json"], ["host", "localhost:4000"], ["user-agent", "curl/7.54.0"]], "host" => "localhost", "method" => "POST", "path" => "/myapi/publish-async", "port" => 4000, "query" => "", "remoteip" => "172.28.0.1", "scheme" => "http"}}, id: "069711bf-3946-4661-984f-c667657b8d85", schemaurl: nil, source: "/cli", specversion: "0.2", time: "2018-04-05T17:31:00Z", type: "com.example"}
7878
```
7979

8080
## Example 2: Kafka schema Registry CLI
8181

8282
To check if it works also with native serializer we can leverage the CLI shipped with the Kafka Schema Registry image.
8383

84-
```bash
84+
``` bash
8585
# 1. Get inside Kafka Schema Registry container
8686
docker exec -it kafka-schema-registry bash
8787

0 commit comments

Comments
 (0)