|
| 1 | +# Consuming AWS SQS Events with Knative Eventing |
| 2 | + |
| 3 | +**Author: Matthias Weßendorf, Senior Principal Software Engineer @ Red Hat** |
| 4 | + |
| 5 | +_In a [previous post](/blog/articles/consuming_s3_data_with_knative){:target="_blank"} we discussed the consumption of notifications from an AWS S3 bucket using Apache Camel K. While this is a good approach for getting data from cloud providers, like AWS, into Knative, the Knative Eventing team is aiming to integrate this at the core of its offering with a new CRD, the `IntegrationSource`. This post will describe howto receive SQS notifications and forward them to a regular Knative Broker for further processing._ |
| 6 | + |
| 7 | +## Installation |
| 8 | + |
| 9 | +The `IntegrationSource` will be part of Knative Eventing in a future release. Currently it is under development but already included in the `main` branch. For installing Knative Eventing from the sources you can follow the [develpoment guide](https://github.com/knative/eventing/blob/main/DEVELOPMENT.md){:target="_blank"}. |
| 10 | + |
| 11 | +!!! note |
| 12 | + |
| 13 | + Installing Knative Eventing from the source repository is not recommended for production cases. The purpose of this blog post is to give an early introduction to the new `IntegrationSource` CRD.. |
| 14 | + |
| 15 | +## Creating a Knative Broker instance |
| 16 | + |
| 17 | +Once the `main` branch of Knative Eventing is installed we are using a Knative Broker as the heart of our system, acting as an [Event Mesh](https://knative.dev/docs/eventing/event-mesh/){:target="_blank"} for both event producers and event consumers: |
| 18 | + |
| 19 | +```yaml |
| 20 | +apiVersion: eventing.knative.dev/v1 |
| 21 | +kind: Broker |
| 22 | +metadata: |
| 23 | + namespace: default |
| 24 | + name: my-broker |
| 25 | +``` |
| 26 | +
|
| 27 | +Now event producers can send events to it and event consumers can receive events. |
| 28 | +
|
| 29 | +## Using IntegrationSource for AWS SQS |
| 30 | +
|
| 31 | +In order to send data from AWS SQS to a Knative component, like `my-broker` we created above, we are using the new `IntegrationSource` CRD. It basically allows to declaratively move data from a system, like AWS SQS, _towards_ a Knative resource, like our above Broker: |
| 32 | + |
| 33 | +```yaml |
| 34 | +apiVersion: sources.knative.dev/v1alpha1 |
| 35 | +kind: IntegrationSource |
| 36 | +metadata: |
| 37 | + name: aws-sqs-source |
| 38 | +spec: |
| 39 | + aws: |
| 40 | + sqs: |
| 41 | + queueNameOrArn: "my-queue" |
| 42 | + region: "my-queue" |
| 43 | + visibilityTimeout: 20 |
| 44 | + auth: |
| 45 | + secret: |
| 46 | + ref: |
| 47 | + name: "my-secret" |
| 48 | + sink: |
| 49 | + ref: |
| 50 | + apiVersion: eventing.knative.dev/v1 |
| 51 | + kind: Broker |
| 52 | + name: my-broker |
| 53 | +``` |
| 54 | + |
| 55 | +The `IntegrationSource` has an `aws` field, for defining different Amazon Web Services, such as `s3`, `ddb-streams` or like in this case `sqs`. Underneath the `aws` property is also a reference to a _Kubernetes Secret_, which contains the credentials for connecting to AWS. All SQS notifications are processed by the source and being forwarded as CloudEvents to the provided `sink` |
| 56 | + |
| 57 | +!!! note |
| 58 | + |
| 59 | + If you compare the `IntegrationSource` to the `Pipe` from Apache Camel on the [previous article](/blog/articles/consuming_s3_data_with_knative){:target="_blank"} you will notice the new resource is less verbose and is directly following established Knative development principles, like any other Knative Event Source. |
| 60 | + |
| 61 | +## Creating the Kubernetes Secret for the IntegrationSource |
| 62 | + |
| 63 | +For connecting to any AWS service the `IntegrationSource` uses regular Kubernetes `Secret`s, present in the namespace of the resource. The `Secret` can be created like: |
| 64 | + |
| 65 | +``` |
| 66 | +$ kubectl -n <namespace> create secret generic <secret-name> --from-literal=aws.accessKey=<...> --from-literal=aws.secretKey=<...> |
| 67 | +``` |
| 68 | + |
| 69 | +## Setting up the Consumer application |
| 70 | + |
| 71 | +Now that we have the `Broker` and the `IntegrationSource` connected to it, it is time to define an application that is receiving _and_ processing the SQS notifications: |
| 72 | + |
| 73 | +```yaml |
| 74 | +apiVersion: v1 |
| 75 | +kind: Pod |
| 76 | +metadata: |
| 77 | + name: log-receiver |
| 78 | + labels: |
| 79 | + app: log-receiver |
| 80 | +spec: |
| 81 | + containers: |
| 82 | + - name: log-receiver |
| 83 | + image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display |
| 84 | + imagePullPolicy: Always |
| 85 | + ports: |
| 86 | + - containerPort: 8080 |
| 87 | + protocol: TCP |
| 88 | + name: log-receiver |
| 89 | +--- |
| 90 | +apiVersion: v1 |
| 91 | +kind: Service |
| 92 | +metadata: |
| 93 | + name: log-receiver |
| 94 | +spec: |
| 95 | + selector: |
| 96 | + app: log-receiver |
| 97 | + ports: |
| 98 | + - port: 80 |
| 99 | + protocol: TCP |
| 100 | + targetPort: log-receiver |
| 101 | + name: http |
| 102 | +``` |
| 103 | + |
| 104 | +Here we define a simple `Pod` and its `Service`, which points to an HTTP-Server, that receives the CloudEvents. As you can see, this is **not** a AWS SQS specific consumer. Basically _any_ HTTP Webserver, in any given language, can be used for processing the CloudEvents coming from a Knative Broker. |
| 105 | + |
| 106 | +## Subscribe the Consumer application to AWS SQS events |
| 107 | + |
| 108 | +In order to be able to receive the SQS event notifications, we need to create a `Trigger` for our _Consumer application_: |
| 109 | + |
| 110 | +```yaml |
| 111 | +apiVersion: eventing.knative.dev/v1 |
| 112 | +kind: Trigger |
| 113 | +metadata: |
| 114 | + name: aws-sqs-trigger |
| 115 | +spec: |
| 116 | + broker: my-broker |
| 117 | + subscriber: |
| 118 | + ref: |
| 119 | + apiVersion: v1 |
| 120 | + kind: Service |
| 121 | + name: log-receiver |
| 122 | +``` |
| 123 | + |
| 124 | +For debugging purpose we create a `Trigger` without any `filters` so it will forward _all_ CloudEvents to the `log-receiver` application. Once deployed, in the log of the `log-receiver` pod we should be seeing the following for any produced SQS notification on our queue: |
| 125 | + |
| 126 | +``` |
| 127 | +☁️ cloudevents.Event |
| 128 | +Context Attributes, |
| 129 | + specversion: 1.0 |
| 130 | + type: dev.knative.connector.event.aws-sqs |
| 131 | + source: dev.knative.eventing.aws-sqs-source |
| 132 | + subject: aws-sqs-source |
| 133 | + id: 9CC70D09569020C-0000000000000001 |
| 134 | + time: 2024-11-08T07:34:16.413Z |
| 135 | + datacontenttype: application/json |
| 136 | +Extensions, |
| 137 | + knativearrivaltime: 2024-11-08T07:34:16.487697262Z |
| 138 | +Data, |
| 139 | + <test data notification> |
| 140 | +``` |
| 141 | + |
| 142 | +## Conclusion and Outlook |
| 143 | + |
| 144 | +With the new `IntegrationSource` we will have a good way to integrate services from public cloud providers like AWS, by leveraging Apache Camel Kamelets behind the scenes. The initial set of services is on AWS, like `s3`, `sqs` or `ddb-streams`. However we are planning to add support for different services and providers. |
| 145 | + |
| 146 | +Since Apache Camel Kamelets can also act as `Sink`s, the team is working on providing a `IntegrationSink`, following same principles. |
0 commit comments