|
| 1 | +--- |
| 2 | +title: "创建一个基于 Knative 的函数以通过 Dapr 组件与中间件交互" |
| 3 | +linkTitle: "创建一个基于 Knative 的函数以通过 Dapr 组件与中间件交互" |
| 4 | +weight: 5200 |
| 5 | +description: > |
| 6 | + 了解如何创建一个基于 Knative 的函数,通过 Dapr 组件与中间件交互。 |
| 7 | +--- |
| 8 | + |
| 9 | +本文档描述了如何创建一个基于 Knative 的函数,通过 Dapr 组件与中间件交互。 |
| 10 | + |
| 11 | +## 概述 |
| 12 | + |
| 13 | +与异步函数类似,基于 Knative 运行时的函数可以通过 Dapr 组件与中间件交互。本文档使用两个函数,`function-front` 和 `kafka-input`,进行演示。 |
| 14 | + |
| 15 | +以下图表说明了这些函数之间的关系。 |
| 16 | + |
| 17 | + |
| 18 | + |
| 19 | +## 先决条件 |
| 20 | + |
| 21 | +- 您已经[安装了 OpenFunction](../../getting-started/installation/)。 |
| 22 | +- 您已经[创建了一个 secret](../../getting-started/quickstarts/prerequisites/)。 |
| 23 | + |
| 24 | +## 创建 Kafka 服务器和主题 |
| 25 | + |
| 26 | +1. 运行以下命令,在默认命名空间中安装 [strimzi-kafka-operator](https://github.com/strimzi/strimzi-kafka-operator)。 |
| 27 | + |
| 28 | + ```shell |
| 29 | + helm repo add strimzi https://strimzi.io/charts/ |
| 30 | + helm install kafka-operator -n default strimzi/strimzi-kafka-operator |
| 31 | + ``` |
| 32 | + |
| 33 | +2. 使用以下内容创建一个文件 `kafka.yaml`。 |
| 34 | + |
| 35 | + ```yaml |
| 36 | + apiVersion: kafka.strimzi.io/v1beta2 |
| 37 | + kind: Kafka |
| 38 | + metadata: |
| 39 | + name: kafka-server |
| 40 | + namespace: default |
| 41 | + spec: |
| 42 | + kafka: |
| 43 | + version: 3.3.1 |
| 44 | + replicas: 1 |
| 45 | + listeners: |
| 46 | + - name: plain |
| 47 | + port: 9092 |
| 48 | + type: internal |
| 49 | + tls: false |
| 50 | + - name: tls |
| 51 | + port: 9093 |
| 52 | + type: internal |
| 53 | + tls: true |
| 54 | + config: |
| 55 | + offsets.topic.replication.factor: 1 |
| 56 | + transaction.state.log.replication.factor: 1 |
| 57 | + transaction.state.log.min.isr: 1 |
| 58 | + default.replication.factor: 1 |
| 59 | + min.insync.replicas: 1 |
| 60 | + inter.broker.protocol.version: "3.1" |
| 61 | + storage: |
| 62 | + type: ephemeral |
| 63 | + zookeeper: |
| 64 | + replicas: 1 |
| 65 | + storage: |
| 66 | + type: ephemeral |
| 67 | + entityOperator: |
| 68 | + topicOperator: {} |
| 69 | + userOperator: {} |
| 70 | + --- |
| 71 | + apiVersion: kafka.strimzi.io/v1beta2 |
| 72 | + kind: KafkaTopic |
| 73 | + metadata: |
| 74 | + name: sample-topic |
| 75 | + namespace: default |
| 76 | + labels: |
| 77 | + strimzi.io/cluster: kafka-server |
| 78 | + spec: |
| 79 | + partitions: 10 |
| 80 | + replicas: 1 |
| 81 | + config: |
| 82 | + retention.ms: 7200000 |
| 83 | + segment.bytes: 1073741824 |
| 84 | + ``` |
| 85 | +
|
| 86 | +3. 运行以下命令,在默认命名空间中部署一个名为 `kafka-server` 的 1 副本 Kafka 服务器和一个名为 `sample-topic` 的 1 副本 Kafka 主题。 |
| 87 | + |
| 88 | + ```shell |
| 89 | + kubectl apply -f kafka.yaml |
| 90 | + ``` |
| 91 | + |
| 92 | +4. 运行以下命令,检查 pod 状态,等待 Kafka 和 Zookeeper 启动并运行。 |
| 93 | + |
| 94 | + ```shell |
| 95 | + $ kubectl get po |
| 96 | + NAME READY STATUS RESTARTS AGE |
| 97 | + kafka-server-entity-operator-568957ff84-nmtlw 3/3 Running 0 8m42s |
| 98 | + kafka-server-kafka-0 1/1 Running 0 9m13s |
| 99 | + kafka-server-zookeeper-0 1/1 Running 0 9m46s |
| 100 | + strimzi-cluster-operator-687fdd6f77-cwmgm 1/1 Running 0 11m |
| 101 | + ``` |
| 102 | + |
| 103 | +5. 运行以下命令,查看 Kafka 集群的元数据。 |
| 104 | + |
| 105 | + ```shell |
| 106 | + # 启动一个实用工具 pod。 |
| 107 | + $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm |
| 108 | + # 检查 Kafka 集群的元数据。 |
| 109 | + $ kafkacat -L -b kafka-server-kafka-brokers:9092 |
| 110 | + ``` |
| 111 | + |
| 112 | +## 创建函数 |
| 113 | + |
| 114 | +1. 使用以下示例 YAML 文件创建一个清单 `kafka-input.yaml`,并修改 `spec.image` 的值以设置您自己的镜像仓库地址。字段 `spec.serving.inputs` 定义了一个指向 Kafka 服务器的 Dapr 组件的输入源。这意味着 `kafka-input` 函数将由 Kafka 服务器的主题 `sample-topic` 中的事件驱动。 |
| 115 | + |
| 116 | + ```yaml |
| 117 | + apiVersion: core.openfunction.io/v1beta2 |
| 118 | + kind: Function |
| 119 | + metadata: |
| 120 | + name: kafka-input |
| 121 | + spec: |
| 122 | + version: "v1.0.0" |
| 123 | + image: <your registry name>/kafka-input:latest |
| 124 | + imageCredentials: |
| 125 | + name: push-secret |
| 126 | + build: |
| 127 | + builder: openfunction/builder-go:latest |
| 128 | + env: |
| 129 | + FUNC_NAME: "HandleKafkaInput" |
| 130 | + FUNC_CLEAR_SOURCE: "true" |
| 131 | + srcRepo: |
| 132 | + url: "https://github.com/OpenFunction/samples.git" |
| 133 | + sourceSubPath: "functions/async/bindings/kafka-input" |
| 134 | + revision: "main" |
| 135 | + serving: |
| 136 | + scaleOptions: |
| 137 | + minReplicas: 0 |
| 138 | + maxReplicas: 10 |
| 139 | + keda: |
| 140 | + triggers: |
| 141 | + - type: kafka |
| 142 | + metadata: |
| 143 | + topic: sample-topic |
| 144 | + bootstrapServers: kafka-server-kafka-brokers.default.svc:9092 |
| 145 | + consumerGroup: kafka-input |
| 146 | + lagThreshold: "20" |
| 147 | + scaledObject: |
| 148 | + pollingInterval: 15 |
| 149 | + cooldownPeriod: 60 |
| 150 | + advanced: |
| 151 | + horizontalPodAutoscalerConfig: |
| 152 | + behavior: |
| 153 | + scaleDown: |
| 154 | + stabilizationWindowSeconds: 45 |
| 155 | + policies: |
| 156 | + - type: Percent |
| 157 | + value: 50 |
| 158 | + periodSeconds: 15 |
| 159 | + scaleUp: |
| 160 | + stabilizationWindowSeconds: 0 |
| 161 | +
|
| 162 | + triggers: |
| 163 | + dapr: |
| 164 | + - name: target-topic |
| 165 | + type: bindings.kafka |
| 166 | + bindings: |
| 167 | + target-topic: |
| 168 | + type: bindings.kafka |
| 169 | + version: v1 |
| 170 | + metadata: |
| 171 | + - name: brokers |
| 172 | + value: "kafka-server-kafka-brokers:9092" |
| 173 | + - name: topics |
| 174 | + value: "sample-topic" |
| 175 | + - name: consumerGroup |
| 176 | + value: "kafka-input" |
| 177 | + - name: publishTopic |
| 178 | + value: "sample-topic" |
| 179 | + - name: authRequired |
| 180 | + value: "false" |
| 181 | + template: |
| 182 | + containers: |
| 183 | + - name: function |
| 184 | + imagePullPolicy: Always |
| 185 | + ``` |
| 186 | + |
| 187 | +2. 运行以下命令创建函数 `kafka-input`。 |
| 188 | + |
| 189 | + ```shell |
| 190 | + kubectl apply -f kafka-input.yaml |
| 191 | + ``` |
| 192 | + |
| 193 | +3. 使用以下示例 YAML 文件创建一个清单 `function-front.yaml`,并修改 `spec.image` 的值以设置您自己的镜像仓库地址。 |
| 194 | + |
| 195 | + ```yaml |
| 196 | + apiVersion: core.openfunction.io/v1beta2 |
| 197 | + kind: Function |
| 198 | + metadata: |
| 199 | + name: function-front |
| 200 | + spec: |
| 201 | + version: "v1.0.0" |
| 202 | + image: "<your registry name>/sample-knative-dapr:latest" |
| 203 | + imageCredentials: |
| 204 | + name: push-secret |
| 205 | + build: |
| 206 | + builder: openfunction/builder-go:latest |
| 207 | + env: |
| 208 | + FUNC_NAME: "ForwardToKafka" |
| 209 | + FUNC_CLEAR_SOURCE: "true" |
| 210 | + srcRepo: |
| 211 | + url: "https://github.com/OpenFunction/samples.git" |
| 212 | + sourceSubPath: "functions/knative/with-output-binding" |
| 213 | + revision: "main" |
| 214 | + serving: |
| 215 | + hooks: |
| 216 | + pre: |
| 217 | + - plugin-custom |
| 218 | + - plugin-example |
| 219 | + post: |
| 220 | + - plugin-example |
| 221 | + - plugin-custom |
| 222 | + scaleOptions: |
| 223 | + minReplicas: 0 |
| 224 | + maxReplicas: 5 |
| 225 | + outputs: |
| 226 | + - dapr: |
| 227 | + name: kafka-server |
| 228 | + operation: "create" |
| 229 | + bindings: |
| 230 | + kafka-server: |
| 231 | + type: bindings.kafka |
| 232 | + version: v1 |
| 233 | + metadata: |
| 234 | + - name: brokers |
| 235 | + value: "kafka-server-kafka-brokers:9092" |
| 236 | + - name: authRequired |
| 237 | + value: "false" |
| 238 | + - name: publishTopic |
| 239 | + value: "sample-topic" |
| 240 | + - name: topics |
| 241 | + value: "sample-topic" |
| 242 | + - name: consumerGroup |
| 243 | + value: "function-front" |
| 244 | + template: |
| 245 | + containers: |
| 246 | + - name: function |
| 247 | + imagePullPolicy: Always |
| 248 | + ``` |
| 249 | + |
| 250 | + |
| 251 | + {{% alert title="注意" color="success" %}} |
| 252 | + |
| 253 | + `metadata.plugins.pre` 定义了在执行用户函数之前需要调用的插件的顺序。`metadata.plugins.post` 定义了在执行用户函数之后需要调用的插件的顺序。有关这两个插件的逻辑以及执行插件后的效果的更多信息,请参阅 [插件机制](https://github.com/OpenFunction/samples/blob/main/functions-framework/README.md#plugin-mechanism)。 |
| 254 | + |
| 255 | + {{% /alert %}} |
| 256 | + |
| 257 | +4. 在清单中,`spec.serving.outputs` 定义了一个指向 Kafka 服务器的 Dapr 组件的输出。这使您可以在 `function-front` 函数中向输出 `target` 发送自定义内容。 |
| 258 | + |
| 259 | + ```go |
| 260 | + func Sender(ctx ofctx.Context, in []byte) (ofctx.Out, error) { |
| 261 | + ... |
| 262 | + _, err := ctx.Send("target", greeting) |
| 263 | + ... |
| 264 | + } |
| 265 | + ``` |
| 266 | + |
| 267 | +5. 运行以下命令创建函数 `function-front`。 |
| 268 | + |
| 269 | + ```shell |
| 270 | + kubectl apply -f function-front.yaml |
| 271 | + ``` |
| 272 | + |
| 273 | +## 检查结果 |
| 274 | + |
| 275 | +1. 运行以下命令查看函数的状态。 |
| 276 | + |
| 277 | + ```shell |
| 278 | + $ kubectl get functions.core.openfunction.io |
| 279 | +
|
| 280 | + NAME BUILDSTATE SERVINGSTATE BUILDER SERVING URL AGE |
| 281 | + function-front Succeeded Running builder-bhbtk serving-vc6jw https://openfunction.io/default/function-front 2m41s |
| 282 | + kafka-input Succeeded Running builder-dprfd serving-75vrt 2m21s |
| 283 | + ``` |
| 284 | + |
| 285 | + {{% alert title="注意" color="success" %}} |
| 286 | + |
| 287 | + `URL`,由 OpenFunction Domain 提供,是可以访问的地址。要通过此 URL 地址访问函数,您需要确保 DNS 可以解析此地址。 |
| 288 | + |
| 289 | + {{% /alert %}} |
| 290 | + |
| 291 | +2. 运行以下命令在集群中创建一个用于访问函数的 pod。 |
| 292 | + |
| 293 | + ```shell |
| 294 | + kubectl run curl --image=radial/busyboxplus:curl -i --tty --rm |
| 295 | + ``` |
| 296 | + |
| 297 | +3. 运行以下命令通过 `URL` 访问函数。 |
| 298 | + |
| 299 | + ```shell |
| 300 | + [ root@curl:/ ]$ curl -d '{"message":"Awesome OpenFunction!"}' -H "Content-Type: application/json" -X POST http://openfunction.io.svc.cluster.local/default/function-front |
| 301 | + ``` |
| 302 | + |
| 303 | +4. 运行以下命令查看 `function-front` 的日志。 |
| 304 | + |
| 305 | + ```shell |
| 306 | + kubectl logs -f \ |
| 307 | + $(kubectl get po -l \ |
| 308 | + openfunction.io/serving=$(kubectl get functions function-front -o jsonpath='{.status.serving.resourceRef}') \ |
| 309 | + -o jsonpath='{.items[0].metadata.name}') \ |
| 310 | + function |
| 311 | + ``` |
| 312 | + |
| 313 | + 输出如下所示。 |
| 314 | + |
| 315 | + ```shell |
| 316 | + dapr client initializing for: 127.0.0.1:50001 |
| 317 | + I0125 06:51:55.584973 1 framework.go:107] Plugins for pre-hook stage: |
| 318 | + I0125 06:51:55.585044 1 framework.go:110] - plugin-custom |
| 319 | + I0125 06:51:55.585052 1 framework.go:110] - plugin-example |
| 320 | + I0125 06:51:55.585057 1 framework.go:115] Plugins for post-hook stage: |
| 321 | + I0125 06:51:55.585062 1 framework.go:118] - plugin-custom |
| 322 | + I0125 06:51:55.585067 1 framework.go:118] - plugin-example |
| 323 | + I0125 06:51:55.585179 1 knative.go:46] Knative Function serving http: listening on port 8080 |
| 324 | + 2022/01/25 06:52:02 http - Data: {"message":"Awesome OpenFunction!"} |
| 325 | + I0125 06:52:02.246450 1 plugin-example.go:83] the sum is: 2 |
| 326 | + ``` |
| 327 | + |
| 328 | +5. 运行以下命令查看 `kafka-input` 的日志。 |
| 329 | + |
| 330 | + ```shell |
| 331 | + kubectl logs -f \ |
| 332 | + $(kubectl get po -l \ |
| 333 | + openfunction.io/serving=$(kubectl get functions kafka-input -o jsonpath='{.status.serving.resourceRef}') \ |
| 334 | + -o jsonpath='{.items[0].metadata.name}') \ |
| 335 | + function |
| 336 | + ``` |
| 337 | + |
| 338 | + 输出如下所示。 |
| 339 | + |
| 340 | + ```shell |
| 341 | + dapr client initializing for: 127.0.0.1:50001 |
| 342 | + I0125 06:35:28.332381 1 framework.go:107] Plugins for pre-hook stage: |
| 343 | + I0125 06:35:28.332863 1 framework.go:115] Plugins for post-hook stage: |
| 344 | + I0125 06:35:28.333749 1 async.go:39] Async Function serving grpc: listening on port 8080 |
| 345 | + message from Kafka '{Awesome OpenFunction!}' |
| 346 | + ``` |
0 commit comments