Skip to content

Commit 535540f

Browse files
authored
Merge pull request #308 from Accenture/307-use-ce-lib
307 use ce lib
2 parents 925d764 + df549cc commit 535540f

File tree

43 files changed

+4996
-5685
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+4996
-5685
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
- Added validation for reverse proxy configuration. Now it crashes RIG on start when configuration is not valid or returns `400` when using REST API to update configuration. [#277](https://github.com/Accenture/reactive-interaction-gateway/issues/277)
1414
- Added basic distributed tracing support in [W3C Trace Context specification](https://www.w3.org/TR/trace-context/) with Jaeger and Openzipkin exporters. RIG opens a span at the API Gateway and emits trace context in Cloud Events following the [distributed tracing spec](https://github.com/cloudevents/spec/blob/v1.0/extensions/distributed-tracing.md). [#281](https://github.com/Accenture/reactive-interaction-gateway/issues/281)
1515

16-
<!-- ### Changed -->
16+
### Changed
17+
18+
- Incorporated [cloudevents-ex](https://github.com/kevinbader/cloudevents-ex) to handle binary and structured modes for [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md) in a proper way. This introduces some **breaking changes**:
19+
- Binary mode is now using `ce_` prefix for CloudEvents context attribute headers, before it was `ce-` - done according to the [Kafka protocol binding](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md)
20+
- Change above affects also `"response_from": "kafka"` proxy functionality. RIG will forward to clients only Kafka body, no headers. This means, when using binary mode, clients receive only the data part, no CloudEvents context attributes.
1721

1822
### Fixed
1923

config/config.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ config :logger, :console,
102102
format: "\n$time [$level] $levelpad$message\n$metadata\n",
103103
metadata: metadata |> Enum.uniq()
104104

105-
config :rig, Rig.Application, log_level: {:system, :atom, "LOG_LEVEL", :debug}
105+
config :rig, Rig.Application,
106+
log_level: {:system, :atom, "LOG_LEVEL", :debug},
107+
schema_registry_host: {:system, "KAFKA_SCHEMA_REGISTRY_HOST", nil}
106108

107109
# --------------------------------------
108110
# Session and Authorization

config/rig_tests/test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ config :rig, RigTests.Proxy.ResponseFrom.KafkaTest,
2323
ssl_keyfile_pass: {:system, "KAFKA_SSL_KEYFILE_PASS", ""},
2424
# Credentials for SASL/Plain authentication. Example: "plain:myusername:mypassword"
2525
sasl: {:system, "KAFKA_SASL", nil},
26-
response_topic: "rig-proxy-response"
26+
response_topic: {:system, "PROXY_KAFKA_RESPONSE_TOPICS", "rig-proxy-response"}
2727

2828
config :rig, RigTests.Proxy.PublishToEventStream.KafkaTest,
2929
server_id: :rig_proxy_publish_kafkatest_genserver,

docs/api-gateway.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ This defines a single service called "my-service". The URL is read from an given
4646
As a demo service, we use a small Node.js script:
4747

4848
```js
49-
const http = require('http');
49+
const http = require("http");
5050
const port = 3000;
5151
const handler = (_req, res) => res.end("Hi, I'm a demo service!\n");
5252
const server = http.createServer(handler);
53-
server.listen(port, err => {
53+
server.listen(port, (err) => {
5454
if (err) {
5555
return console.error(err);
5656
}
@@ -182,6 +182,8 @@ The endpoint expects the following request format:
182182
Sometimes it makes sense to provide a simple request-response API to something that runs asynchronously on the backend. For example, let's say there's a ticket reservation process that takes 10 seconds in total and involves three different services that communicate via message passing. For an external client, it may be simpler to wait 10 seconds for the response instead of polling for a response every other second.
183183
A behavior like this can be configured using an endpoints' `response_from` property. When set to `kafka`, the response to the request is not taken from the `target` (e.g., for `target` = `http` this means the backend's HTTP response is ignored), but instead it's read from a Kafka topic. In order to enable RIG to correlate the response from the topic with the original request, RIG adds a correlation ID to the request (using a query parameter in case of `target` = `http`, or backed into the produced CloudEvent otherwise). Backend services that work with the request need to include that correlation ID in their response; otherwise, RIG won't be able to forward it to the client (and times out).
184184
185+
> In case you want to use _binary_ transport mode, make sure that `rig` extension (containing correlation ID) is prefixed with `ce_` as well.
186+
185187
Configuration of such API endpoint might look like this:
186188
187189
```json

docs/avro.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,37 +24,37 @@ Adopting Avro for event (de)serialization is fairly straightforward. First you n
2424
## RIG as a Kafka producer
2525

2626
- producer evaluates if serialization is turned on by checking `KAFKA_SERIALIZER` environment variable and if it's value is `avro`
27-
- If it is, creates headers for Kafka event by appending `ce-` prefix for every field, besides `data` field
27+
- If it is, creates headers for Kafka event by appending `ce_` prefix for every field, except `data` field - **binary mode**
2828
- **nested context attributes are stringified**, since Kafka headers don't support nested values (this is common when using Cloud events extensions)
2929
- after that, the `data` field is serialized using the schema name (function for getting schemas from registry is cached in-memory)
3030
- producer sends event with created headers and data (in binary format `<<0, 0, 0, 0, 1, 5, 3, 8, ...>>`) to Kafka
3131

32-
> If `KAFKA_SERIALIZER` is not set to `avro`, producer sets **only** `ce-contenttype` or `ce-contentType` for kafka event
32+
> If `KAFKA_SERIALIZER` is not set to `avro`, producer sets **only** `ce_contenttype` or `ce_contentType` for kafka event
3333
3434
## RIG as a Kafka consumer
3535

36-
- when consuming Kafka event, RIG checks headers of such event and removes `ce-` prefix
37-
- based on headers decides cloud events version and content type
38-
- In case content type is `avro/binary`, schema ID is taken from event value and deserialized
39-
- If content type is **not** present, RIG checks for Avro format (`<<0, 0, 0, 0, 1, 5, 3, 8, ...>>`) and attempts for deserialization, otherwise event is sent to client as it is without any deserialization
36+
Event parsing is based on the [Kafka Transport Binding for CloudEvents v1.0](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md) implemented via [cloudevents-ex](https://github.com/kevinbader/cloudevents-ex). Check the [Event format](./event-format.md#kafka-transport-binding) section.
4037

4138
## Example 1: producing to and consuming from the same topic
4239

4340
In this example we'll have RIG send a message to itself to see whether RIG producing and consuming parts work correctly. The idea is that RIG produces a serialized event as a result to an HTTP request and, a few moments later, consumes that same event (and deserializes it correctly).
4441

4542
```bash
43+
4644
## 1. Start Kafka with Zookeeper and Kafka Schema Registry
45+
4746
KAFKA_PORT_PLAIN=17092 KAFKA_PORT_SSL=17093 HOST=localhost docker-compose -f integration_tests/kafka_tests/docker-compose.yml up -d
4847

4948
## 2. Start Rig
49+
5050
# Here we say to use Avro, consume on topic "rigRequest" and use "rigRequest-value" schema from Kafka Schema Registry
5151
# Proxy is turned on to be able to produce Kafka event with headers (needed for cloud events)
5252
docker run --name rig \
5353
-e KAFKA_BROKERS=kafka:9292 \
5454
-e KAFKA_SERIALIZER=avro \
5555
-e KAFKA_SCHEMA_REGISTRY_HOST=kafka-schema-registry:8081 \
5656
-e KAFKA_SOURCE_TOPICS=rigRequest \
57-
-e PROXY_CONFIG_FILE=proxy/proxy.test.json \
57+
-e PROXY_CONFIG_FILE='[{"id":"my-api","name":"my-api","versioned":false,"version_data":{"default":{"endpoints":[{"id":"post-myapi-publish-async","path":"/myapi/publish-async","method":"POST","target":"kafka"}]}},"proxy":{"use_env":true,"target_url":"KAFKA_HOST","port":9092}}]' \
5858
-e PROXY_KAFKA_REQUEST_TOPIC=rigRequest \
5959
-e PROXY_KAFKA_REQUEST_AVRO=rigRequest-value \
6060
-e LOG_LEVEL=debug \
@@ -63,25 +63,25 @@ docker run --name rig \
6363
accenture/reactive-interaction-gateway
6464

6565
## 3. Register Avro schema in Kafka Schema Registry
66+
6667
curl -d '{"schema":"{\"name\":\"rigproducer\",\"type\":\"record\",\"fields\":[{\"name\":\"example\",\"type\":\"string\"}]}"}' -H "Content-Type: application/vnd.schemaregistry.v1+json" -X POST http://localhost:8081/subjects/rigRequest-value/versions
6768

6869
## 4. Send HTTP request to RIG proxy
70+
6971
# Request will produce serialized Kafka event to Kafka
7072
curl -d '{"event":{"id":"069711bf-3946-4661-984f-c667657b8d85","type":"com.example","time":"2018-04-05T17:31:00Z","specversion":"0.2","source":"\/cli","contenttype":"avro\/binary","data":{"example":"test"}},"partition":"test_key"}' -H "Content-Type: application/json" -X POST http://localhost:4000/myapi/publish-async
7173

7274
## 5. In terminal you should see something like below -- in nutshell it means event was successfully consumed, deserialized and forwarded to UI client
73-
16:46:31.549 [debug] Decoded Avro message="{\"example\":\"test\"}"
74-
application=rig_kafka module=RigKafka.Avro function=decode/1 file=lib/rig_kafka/avro.ex line=28 pid=<0.419.0>
7575

76-
16:46:31.550 [debug] [:start_object, {:string, "contenttype"}, :colon, {:string, "avro/binary"}, :comma, {:string, "data"}, :colon, :start_object, {:string, "example"}, :colon, {:string, "test"}, :end_object, :comma, {:string, "id"}, :colon, {:string, "069711bf-3946-4661-984f-c667657b8d85"}, :comma, {:string, "rig"}, :colon, :start_object, {:string, "correlation"}, :colon, {:string, "g2dkAA1ub25vZGVAbm9ob3N0AAADzAAAAAAA"}, :comma, {:string, "headers"}, :colon, :start_array, :start_array, {:string, "accept"}, :end_array, :comma, :start_array, {:string, "*/*"}, :end_array, :comma, :start_array, {:string, "content-length"}, :end_array, :comma, :start_array, {:string, "221"}, :end_array, :comma, :start_array, {:string, "content-type"}, :end_array, :comma, :start_array, {:string, ...}, :end_array, ...]
77-
application=rig module=Rig.EventStream.KafkaToFilter function=kafka_handler/1 file=lib/rig/event_stream/kafka_to_filter.ex line=20 pid=<0.419.0>
76+
21:54:52.869 module=Avrora.Storage.Registry [debug] obtaining schema with global id `1`
77+
21:54:52.870 module=Rig.EventStream.KafkaToFilter [debug] %Cloudevents.Format.V_0_2.Event{contenttype: "avro/binary", data: %{"example" => "test"}, extensions: %{"rig" => %{"correlation" => "Ve1d-XF0Qi46lwh47X5IqI7m_FCIqCLsqyV0KTCxg28Hnd7ytczBe1cASZYPxA7GNFCZ4AzDC0QX1w0=", "headers" => [["accept", "*/*"], ["content-length", "221"], ["content-type", "application/json"], ["host", "localhost:4000"], ["user-agent", "curl/7.54.0"]], "host" => "localhost", "method" => "POST", "path" => "/myapi/publish-async", "port" => 4000, "query" => "", "remoteip" => "172.28.0.1", "scheme" => "http"}}, id: "069711bf-3946-4661-984f-c667657b8d85", schemaurl: nil, source: "/cli", specversion: "0.2", time: "2018-04-05T17:31:00Z", type: "com.example"}
7878
```
7979

8080
## Example 2: Kafka schema Registry CLI
8181

8282
To check if it works also with native serializer we can leverage the CLI shipped with the Kafka Schema Registry image.
8383

84-
```bash
84+
``` bash
8585
# 1. Get inside Kafka Schema Registry container
8686
docker exec -it kafka-schema-registry bash
8787

docs/event-format.md

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,17 @@ We aim at following the [CloudEvents specification](https://github.com/cloudeven
1414

1515
```json
1616
{
17-
"specversion" : "0.2",
18-
"type" : "com.github.pull.create",
19-
"source" : "https://github.com/cloudevents/spec/pull/123",
20-
"id" : "A234-1234-1234",
21-
"time" : "2018-04-05T17:31:00Z",
22-
"comexampleextension1" : "value",
23-
"comexampleextension2" : {
24-
"othervalue": 5
25-
},
26-
"contenttype" : "text/xml",
27-
"data" : "<much wow=\"xml\"/>"
17+
"specversion": "0.2",
18+
"type": "com.github.pull.create",
19+
"source": "https://github.com/cloudevents/spec/pull/123",
20+
"id": "A234-1234-1234",
21+
"time": "2018-04-05T17:31:00Z",
22+
"comexampleextension1": "value",
23+
"comexampleextension2": {
24+
"othervalue": 5
25+
},
26+
"contenttype": "text/xml",
27+
"data": "<much wow=\"xml\"/>"
2828
}
2929
```
3030

@@ -41,8 +41,6 @@ For further details and examples, checkout the dedicated [Section on Avro](avro)
4141

4242
## Transport Bindings
4343

44-
While the HTTP transport binding seems solid already, the Kafka transport mode is not yet available. To support Kafka anyway, we have adapted the HTTP transport modes for Kafka. Consequently, the Kafka transport mode implementation may change in the future, depending on the outcome of the [standardization process](https://github.com/cloudevents/spec/pull/337).
45-
4644
When sending or receiving an event, you have a couple of options:
4745

4846
- Send the event as-is, or send only the "data" (= payload) in the "body" and the rest (the so-called "context attributes") in headers. The former is called _structured_ transport mode, while the latter is known as _binary_ transport mode.
@@ -90,7 +88,7 @@ Request body:
9088

9189
### Binary
9290

93-
In binary mode the request body only contains the `data` value of the corresponding CloudEvent. The _context attributes_ - i.e., all other fields - are moved into the HTTP header. The data/body encoding is determined by the `content-type` header. At the time of writing there are two content types supported: `application/json` and `avro/binary`.
91+
In binary mode the request body only contains the `data` value of the corresponding CloudEvent. The _context attributes_ - i.e., all other fields - are moved into the HTTP header (**this means also [extensions](https://github.com/cloudevents/spec/blob/v1.0/spec.md#extension-context-attributes)**). The data/body encoding is determined by the `content-type` header. At the time of writing there are two content types supported: `application/json` and `avro/binary`.
9492

9593
<details>
9694
<summary>Same example event, sent using HTTP request in binary mode</summary>
@@ -119,7 +117,7 @@ Request body:
119117

120118
## Kafka Transport Binding
121119

122-
As mentioned above, the corresponding CloudEvents transport binding specification is still under [active development](https://github.com/cloudevents/spec/pull/337/files). We follow an approach very similar to the HTTP transport binding outlined above. We utilize Kafka headers that have been introduced in Kafka version `0.11`. In order to support older Kafka versions as well, RIG defaults to structured mode and does not require any headers at all (see below).
120+
Implemented using [Kafka Transport Binding for CloudEvents v1.0](https://github.com/cloudevents/spec/blob/v1.0/kafka-protocol-binding.md). We utilize Kafka headers that have been introduced in Kafka version `0.11`. In order to support older Kafka versions as well, RIG defaults to structured mode and does not require any headers at all (see below).
123121

124122
Like with the HTTP transport binding, we define two modes of operation: structured and binary.
125123

@@ -158,7 +156,7 @@ Message body:
158156

159157
### Binary
160158

161-
In binary mode the message body only contains the `data` value of the corresponding CloudEvent. The _context attributes_ - i.e., all other fields - are moved into the message header. The data/body encoding is determined by the `content-type` header. In this mode there is no default for `content-type` and RIG rejects messages that come without it. At the time of writing there are two content types supported: `application/json` and `avro/binary`.
159+
In binary mode the message body only contains the `data` value of the corresponding CloudEvent. The _context attributes_ - i.e., all other fields - are moved into the message header (**this means also [extensions](https://github.com/cloudevents/spec/blob/v1.0/spec.md#extension-context-attributes)**). The data/body encoding is determined by the `content-type` header. In this mode there is no default for `content-type` and RIG rejects messages that come without it. At the time of writing there are two content types supported: `application/json` and `avro/binary`.
162160

163161
<details>
164162
<summary>Same example in binary mode</summary>
@@ -167,10 +165,10 @@ In binary mode the message body only contains the `data` value of the correspond
167165
In binary mode the message header contains all context attributes. It also announces the body encoding:
168166

169167
```plaintext
170-
ce-specversion: 0.2
171-
ce-type: com.example.someevent
172-
ce-source: example
173-
ce-id: 80dc037c-fb24-43e9-9759-94f91f310a4b1
168+
ce_specversion: 0.2
169+
ce_type: com.example.someevent
170+
ce_source: example
171+
ce_id: 80dc037c-fb24-43e9-9759-94f91f310a4b1
174172
Content-Type: application/json; charset=UTF-8
175173
```
176174

@@ -185,6 +183,6 @@ Message body:
185183
</p>
186184
</details>
187185

188-
[CloudEvents]: https://cloudevents.io/
189-
[CNCF]: https://www.cncf.io/
190-
[Apache Avro]: https://avro.apache.org/
186+
[cloudevents]: https://cloudevents.io/
187+
[cncf]: https://www.cncf.io/
188+
[apache avro]: https://avro.apache.org/

0 commit comments

Comments
 (0)