|
| 1 | +[id="serverless-containersource-custom-sources_{context}"] |
| 2 | += Creating custom event sources by using a container source |
| 3 | + |
| 4 | +You can use a container source to create and manage a container for your custom event source image. |
| 5 | + |
| 6 | +To implement a custom event source by using a container source, you must first create a container image of your event source, and then create a container source that specifies the correct configuration, including the container image URI. |
| 7 | + |
| 8 | +== Guidelines for creating a container image |
| 9 | + |
| 10 | +* A container image can be developed using any language, and can be built and published with any tool that you prefer. |
| 11 | + |
| 12 | +* The main process of the container image must accept parameters from arguments and environment variables. |
| 13 | + |
| 14 | +* Two environment variables are injected by the container source controller: `K_SINK` and `K_CE_OVERRIDES`. These variables are resolved from the `sink` and `ceOverrides` spec, respectively. |
| 15 | + |
| 16 | +* Event messages are sent to the sink URI specified in the `K_SINK` environment variable. The event message can be in any format; however, using the link:https://cloudevents.io/[`CloudEvent`] spec is recommended. |
| 17 | + |
| 18 | +== Example container images |
| 19 | + |
| 20 | +The following is an example of a heartbeats container image: |
| 21 | + |
| 22 | +[source,go] |
| 23 | +---- |
| 24 | +package main |
| 25 | +
|
| 26 | +import ( |
| 27 | + "context" |
| 28 | + "encoding/json" |
| 29 | + "flag" |
| 30 | + "fmt" |
| 31 | + "log" |
| 32 | + "os" |
| 33 | + "strconv" |
| 34 | + "time" |
| 35 | +
|
| 36 | + duckv1 "knative.dev/pkg/apis/duck/v1" |
| 37 | +
|
| 38 | + cloudevents "github.com/cloudevents/sdk-go/v2" |
| 39 | + "github.com/kelseyhightower/envconfig" |
| 40 | +) |
| 41 | +
|
| 42 | +type Heartbeat struct { |
| 43 | + Sequence int `json:"id"` |
| 44 | + Label string `json:"label"` |
| 45 | +} |
| 46 | +
|
| 47 | +var ( |
| 48 | + eventSource string |
| 49 | + eventType string |
| 50 | + sink string |
| 51 | + label string |
| 52 | + periodStr string |
| 53 | +) |
| 54 | +
|
| 55 | +func init() { |
| 56 | + flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)") |
| 57 | + flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)") |
| 58 | + flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") |
| 59 | + flag.StringVar(&label, "label", "", "a special label") |
| 60 | + flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") |
| 61 | +} |
| 62 | +
|
| 63 | +type envConfig struct { |
| 64 | + // Sink URL where to send heartbeat cloud events |
| 65 | + Sink string `envconfig:"K_SINK"` |
| 66 | +
|
| 67 | + // CEOverrides are the CloudEvents overrides to be applied to the outbound event. |
| 68 | + CEOverrides string `envconfig:"K_CE_OVERRIDES"` |
| 69 | +
|
| 70 | + // Name of this pod. |
| 71 | + Name string `envconfig:"POD_NAME" required:"true"` |
| 72 | +
|
| 73 | + // Namespace this pod exists in. |
| 74 | + Namespace string `envconfig:"POD_NAMESPACE" required:"true"` |
| 75 | +
|
| 76 | + // Whether to run continuously or exit. |
| 77 | + OneShot bool `envconfig:"ONE_SHOT" default:"false"` |
| 78 | +} |
| 79 | +
|
| 80 | +func main() { |
| 81 | + flag.Parse() |
| 82 | +
|
| 83 | + var env envConfig |
| 84 | + if err := envconfig.Process("", &env); err != nil { |
| 85 | + log.Printf("[ERROR] Failed to process env var: %s", err) |
| 86 | + os.Exit(1) |
| 87 | + } |
| 88 | +
|
| 89 | + if env.Sink != "" { |
| 90 | + sink = env.Sink |
| 91 | + } |
| 92 | +
|
| 93 | + var ceOverrides *duckv1.CloudEventOverrides |
| 94 | + if len(env.CEOverrides) > 0 { |
| 95 | + overrides := duckv1.CloudEventOverrides{} |
| 96 | + err := json.Unmarshal([]byte(env.CEOverrides), &overrides) |
| 97 | + if err != nil { |
| 98 | + log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err) |
| 99 | + os.Exit(1) |
| 100 | + } |
| 101 | + ceOverrides = &overrides |
| 102 | + } |
| 103 | +
|
| 104 | + p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) |
| 105 | + if err != nil { |
| 106 | + log.Fatalf("failed to create http protocol: %s", err.Error()) |
| 107 | + } |
| 108 | +
|
| 109 | + c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow()) |
| 110 | + if err != nil { |
| 111 | + log.Fatalf("failed to create client: %s", err.Error()) |
| 112 | + } |
| 113 | +
|
| 114 | + var period time.Duration |
| 115 | + if p, err := strconv.Atoi(periodStr); err != nil { |
| 116 | + period = time.Duration(5) * time.Second |
| 117 | + } else { |
| 118 | + period = time.Duration(p) * time.Second |
| 119 | + } |
| 120 | +
|
| 121 | + if eventSource == "" { |
| 122 | + eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name) |
| 123 | + log.Printf("Heartbeats Source: %s", eventSource) |
| 124 | + } |
| 125 | +
|
| 126 | + if len(label) > 0 && label[0] == '"' { |
| 127 | + label, _ = strconv.Unquote(label) |
| 128 | + } |
| 129 | + hb := &Heartbeat{ |
| 130 | + Sequence: 0, |
| 131 | + Label: label, |
| 132 | + } |
| 133 | + ticker := time.NewTicker(period) |
| 134 | + for { |
| 135 | + hb.Sequence++ |
| 136 | +
|
| 137 | + event := cloudevents.NewEvent("1.0") |
| 138 | + event.SetType(eventType) |
| 139 | + event.SetSource(eventSource) |
| 140 | + event.SetExtension("the", 42) |
| 141 | + event.SetExtension("heart", "yes") |
| 142 | + event.SetExtension("beats", true) |
| 143 | +
|
| 144 | + if ceOverrides != nil && ceOverrides.Extensions != nil { |
| 145 | + for n, v := range ceOverrides.Extensions { |
| 146 | + event.SetExtension(n, v) |
| 147 | + } |
| 148 | + } |
| 149 | +
|
| 150 | + if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil { |
| 151 | + log.Printf("failed to set cloudevents data: %s", err.Error()) |
| 152 | + } |
| 153 | +
|
| 154 | + log.Printf("sending cloudevent to %s", sink) |
| 155 | + if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) { |
| 156 | + log.Printf("failed to send cloudevent: %v", res) |
| 157 | + } |
| 158 | +
|
| 159 | + if env.OneShot { |
| 160 | + return |
| 161 | + } |
| 162 | +
|
| 163 | + // Wait for next tick |
| 164 | + <-ticker.C |
| 165 | + } |
| 166 | +} |
| 167 | +---- |
| 168 | + |
| 169 | +The following is an example of a container source that references the previous heartbeats container image: |
| 170 | + |
| 171 | +[source,yaml] |
| 172 | +---- |
| 173 | +apiVersion: sources.knative.dev/v1 |
| 174 | +kind: ContainerSource |
| 175 | +metadata: |
| 176 | + name: test-heartbeats |
| 177 | +spec: |
| 178 | + template: |
| 179 | + spec: |
| 180 | + containers: |
| 181 | + # This corresponds to a heartbeats image URI that you have built and published |
| 182 | + - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats |
| 183 | + name: heartbeats |
| 184 | + args: |
| 185 | + - --period=1 |
| 186 | + env: |
| 187 | + - name: POD_NAME |
| 188 | + value: "example-pod" |
| 189 | + - name: POD_NAMESPACE |
| 190 | + value: "event-test" |
| 191 | + sink: |
| 192 | + ref: |
| 193 | + apiVersion: serving.knative.dev/v1 |
| 194 | + kind: Service |
| 195 | + name: example-service |
| 196 | +... |
| 197 | +---- |
0 commit comments