Skip to content

Commit b1664e2

Browse files
committed
Update kafka-connector chart for async invocations with backpressure
Signed-off-by: Han Verstraete (OpenFaaS Ltd) <[email protected]>
1 parent a382352 commit b1664e2

File tree

5 files changed

+185
-37
lines changed

5 files changed

+185
-37
lines changed

chart/kafka-connector/README.md

Lines changed: 92 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ $ helm repo add openfaas https://openfaas.github.io/faas-netes/
4747

4848
Prepare a custom [values.yaml](values.yaml) with:
4949

50-
* brokerHosts - comma separted list of host:port
51-
* topics - the topics to subscribe to
52-
* replicas - this should match the partition size, so if the size is 3, set this to 3
50+
- brokerHosts - comma separted list of host:port
51+
- topics - the topics to subscribe to
52+
- replicas - this should match the partition size, so if the size is 3, set this to 3
5353

5454
Then you will need to read up on the encryption and authentication options and update the settings accordingly.
5555

@@ -65,46 +65,103 @@ $ helm repo update && \
6565
6666
## Encryption options
6767

68-
1) TLS off (default)
69-
2) TLS on
68+
1. TLS off (default)
69+
2. TLS on
7070

7171
## Authentication options
7272

73-
1) TLS with SASL using CA from the default trust store
74-
3) TLS with SASL using a custom CA
75-
4) TLS with client certificates
73+
1. TLS with SASL using CA from the default trust store
74+
2. TLS with SASL using a custom CA
75+
3. TLS with client certificates
76+
77+
## Async invocations
78+
79+
The connector can be configured to invoke function asynchronously. This lets you use [OpenFaaS async](https://docs.openfaas.com/reference/async/) features like retries.
80+
To prevent the connector from consuming all Kafka messages at once and submitting them to the OpenFaaS async queue a limit on the number of inflight async invocations can be configured.
81+
82+
Configure the connector for async invocations:
83+
84+
```yaml
85+
# Invoke functions asynchronously.
86+
asyncInvocation: true
87+
88+
async:
89+
# Limit the number of inflight async invocations for the connector.
90+
# A value of 0 indicates no concurrency limit.
91+
maxInflight: 0
92+
93+
# Configure an externally-managed NATS server.
94+
# NATS is used for async invocations and is required when
95+
# setting the 'async.maxInflight' parameter to a value other than 0.
96+
# By default the OpenFaaS embedded nats deployment is used.
97+
nats:
98+
external:
99+
enabled: false
100+
host: ""
101+
port: ""
102+
```
103+
104+
### Reset the inflight concurrency counter
105+
106+
If the inflight counter gets out if sync for some reason, e.g a misconfiguration, network issues, it can be forcefully reset.
107+
The connecter checks if a Lease object exists on startup and resets the counter if the Lease does not exist.
108+
109+
Remove the lease and restart the connector to reset the counter.
110+
111+
1. Remove the lease
112+
113+
```sh
114+
$ kubectl get lease -n openfaas
115+
116+
NAME HOLDER AGE
117+
kafka-connector 18m
118+
```
119+
120+
```sh
121+
kubectl delete lease kafka-connector -n openfaas
122+
```
123+
124+
2. Restart the connector
125+
126+
```sh
127+
kubectl rollout restart deploy/kafka-connector -n openfaas
128+
```
76129

77130
## Configuration
78131

79132
Additional kafka-connector options in `values.yaml`.
80133

81-
| Parameter | Description | Default |
82-
|------------------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|
83-
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
84-
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
85-
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
86-
| `asyncInvocation` | For long running or slow functions, offload to asychronous function invocations and carry on processing the stream | `false` |
87-
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
88-
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
89-
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
90-
| `printResponse` | Output the response of calling a function in the logs. | `true` |
91-
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
92-
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
93-
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `` |
94-
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
95-
| `sasl` | Enable auth with a SASL username/password | `false` |
96-
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
97-
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
98-
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
99-
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
100-
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
101-
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
102-
| `group` | Set the Kafka consumer group name. | `""` |
103-
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
104-
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
105-
| `initialOffset` | Either newest or oldest. | `"oldest"` |
106-
| `logs.debug` | Print debug logs | `false` |
107-
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |
134+
| Parameter | Description | Default |
135+
| ----------------------- | ---------------------------------------------------------------------------------------------------------------------------------- | ------------------------------ |
136+
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
137+
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
138+
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
139+
| `asyncInvocation` | Invoke function asychronously and carry on processing the stream | `false` |
140+
| `async.maxInflight` | Limit the number of inflight async invocations for the connector. A value of 0 indicates no concurrency limit. | `0` |
141+
| `nats.external.enabled` | Whether to use an externally-managed NATS server. | `false` |
142+
| `nats.external.host` | The host at which the externally-managed NATS server can be reached | `""` |
143+
| `nats.external.port` | The port at which the externally-managed NATS server can be reached | `""` |
144+
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
145+
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
146+
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
147+
| `printResponse` | Output the response of calling a function in the logs. | `true` |
148+
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
149+
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
150+
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `""` |
151+
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
152+
| `sasl` | Enable auth with a SASL username/password | `false` |
153+
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
154+
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
155+
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
156+
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
157+
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
158+
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
159+
| `group` | Set the Kafka consumer group name. | `""` |
160+
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
161+
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
162+
| `initialOffset` | Either newest or oldest. | `"oldest"` |
163+
| `logs.debug` | Print debug logs | `false` |
164+
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |
108165

109166
Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. See `values.yaml` for the default configuration.
110167

chart/kafka-connector/templates/deployment.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ spec:
3232
app: {{ template "connector.name" . }}
3333
component: kafka-connector
3434
spec:
35+
serviceAccountName: {{ template "connector.fullname" . }}
3536
volumes:
3637
- name: openfaas-license
3738
secret:
@@ -87,6 +88,12 @@ spec:
8788
- "-key-file=/var/secrets/broker-key/broker-key"
8889
{{- end }}
8990
env:
91+
- name: connector_id
92+
value: "{{template "connector.fullname" . }}"
93+
- name: namespace
94+
valueFrom:
95+
fieldRef:
96+
fieldPath: metadata.namespace
9097
- name: gateway_url
9198
value: {{ .Values.gatewayURL | quote }}
9299
- name: topics
@@ -99,6 +106,17 @@ spec:
99106
value: {{ .Values.printRequestBody | quote }}
100107
- name: asynchronous_invocation
101108
value: {{ .Values.asyncInvocation | quote }}
109+
- name: async_max_inflight
110+
value: {{ .Values.asyncMaxInflight | quote }}
111+
- name: async_callback_url
112+
value: "http://{{ template "connector.fullname" . }}.{{ .Release.Namespace }}:8080/api/v1/callback"
113+
- name: nats_url
114+
{{- if .Values.nats.external.enabled }}
115+
value: "nats://{{ .Values.nats.external.host}}:{{ .Values.nats.external.port }}"
116+
{{- else }}
117+
value: "nats://nats.openfaas:4222"
118+
{{- end }}
119+
value: {{ .Values.natsURL | quote }}
102120
{{- if .Values.basic_auth }}
103121
- name: basic_auth
104122
value: "true"
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
apiVersion: v1
2+
kind: ServiceAccount
3+
metadata:
4+
name: {{ template "connector.fullname" . }}
5+
namespace: {{ .Release.Namespace | quote }}
6+
labels:
7+
app: {{ template "connector.fullname" . }}
8+
component: kafka-connector
9+
---
10+
apiVersion: rbac.authorization.k8s.io/v1
11+
kind: Role
12+
metadata:
13+
name: {{ template "connector.fullname" . }}
14+
namespace: {{ .Release.Namespace | quote }}
15+
labels:
16+
app: {{ template "connector.name" . }}
17+
component: kafka-connector
18+
rules:
19+
- apiGroups: ["coordination.k8s.io"]
20+
resources: ["leases"]
21+
verbs: ["get", "create"]
22+
---
23+
apiVersion: rbac.authorization.k8s.io/v1
24+
kind: RoleBinding
25+
metadata:
26+
name: {{ template "connector.fullname" . }}
27+
namespace: {{ .Release.Namespace | quote }}
28+
labels:
29+
app: {{ template "connector.fullname" . }}
30+
component: kafka-connector
31+
subjects:
32+
- kind: ServiceAccount
33+
name: {{ template "connector.fullname" . }}
34+
namespace: {{ .Release.Namespace | quote }}
35+
roleRef:
36+
kind: Role
37+
name: {{ template "connector.fullname" . }}
38+
apiGroup: rbac.authorization.k8s.io
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
apiVersion: v1
2+
kind: Service
3+
metadata:
4+
labels:
5+
app: {{ template "connector.name" . }}
6+
component: kafka-connector
7+
chart: {{ .Chart.Name }}-{{ .Chart.Version }}
8+
heritage: {{ .Release.Service }}
9+
release: {{ .Release.Name }}
10+
name: {{ template "connector.fullname" . }}
11+
namespace: {{ .Release.Namespace | quote }}
12+
spec:
13+
type: ClusterIP
14+
ports:
15+
- name: http
16+
port: 8080
17+
protocol: TCP
18+
targetPort: 8080
19+
selector:
20+
app: kafka-connector

chart/kafka-connector/values.yaml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,14 @@ upstreamTimeout: 2m
3030
# interval for rebuilding the map of functions and topics
3131
rebuildInterval: 30s
3232

33-
# Use with slow consumers or long running functions
33+
# Invoke functions asynchronously.
3434
asyncInvocation: false
3535

36+
async:
37+
# Limit the number of inflight async invocations for the connector.
38+
# A value of 0 indicates no concurrency limit.
39+
maxInflight: 0
40+
3641
# 1MB = 1024 bytes * 1024
3742
maxBytes: "1048576"
3843

@@ -76,6 +81,16 @@ gatewayURL: http://gateway.openfaas:8080
7681
# Basic auth for the gateway
7782
basic_auth: true
7883

84+
# NATS is used for async invocations and is required when
85+
# setting the 'async.maxInflight' parameter to a value other than 0.
86+
nats:
87+
# Configure an externally-managed NATS server.
88+
# When disabled the OpenFaaS embedded nats deployment is used.
89+
external:
90+
enabled: false
91+
host: ""
92+
port: ""
93+
7994
nodeSelector: {}
8095

8196
tolerations: []
@@ -138,4 +153,4 @@ keySecret: ""
138153

139154
# caSecret: kafka-broker-ca
140155
# certSecret: kafka-broker-cert
141-
# keySecret: kafka-broker-key
156+
# keySecret: kafka-broker-key

0 commit comments

Comments
 (0)