Skip to content

Commit fbf41f0

Browse files
aliokevankanderson
andauthored
Update sample source guide (#2475)
* Make sample source guide up-to-date * Docs for adding event source to eventing-contrib * Get rid of some whitespace * Fix broken markdown * Update docs/eventing/samples/writing-receive-adapter-source/03-controller.md Co-authored-by: Evan Anderson <[email protected]> * Update docs/eventing/samples/writing-receive-adapter-source/03-controller.md Co-authored-by: Evan Anderson <[email protected]> * Remove an empty line Co-authored-by: Evan Anderson <[email protected]>
1 parent 5d1305d commit fbf41f0

File tree

8 files changed

+211
-142
lines changed

8 files changed

+211
-142
lines changed

docs/eventing/samples/writing-receive-adapter-source/01-theory.md

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ Specifically, the `clientset`, `cache`, `informers`, and `listers` can all be ge
7070
import (
7171
// ...
7272
sampleSourceClient "knative.dev/sample-source/pkg/client/injection/client"
73-
samplesourceinformer knative.dev/sample-source/pkg/client/injection/informers/samples/v1alpha1/samplesource"
73+
samplesourceinformer "knative.dev/sample-source/pkg/client/injection/informers/samples/v1alpha1/samplesource"
7474
)
7575
// ...
7676
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
@@ -83,22 +83,21 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
8383
// ...
8484
}
8585
```
86-
Ensure that the specific source subdirectory has been added to the injection portion of the `hack/update-codegen.sh` script.
87-
88-
```patch
89-
90-
# Sources
91-
+API_DIRS_SOURCES=(github/pkg camel/source/pkg kafka/source/pkg awssqs/pkg couchdb/source/pkg prometheus/pkg YourSourceHere/pkg)
92-
-API_DIRS_SOURCES=(github/pkg camel/source/pkg kafka/source/pkg awssqs/pkg couchdb/source/pkg prometheus/pkg)
93-
94-
```
95-
and
96-
```patch
97-
-i knative.dev/eventing-contrib/camel/source/pkg/apis \
98-
- -i knative.dev/eventing-contrib/github/pkg/apis
99-
+ -i knative.dev/eventing-contrib/github/pkg/apis \
100-
+ -i knative.dev/eventing-contrib/YourSourceHere/pkg/apis
10186
87+
Sample source's [`update-codegen.sh`](https://github.com/knative/sample-source/blob/master/hack/update-codegen.sh) have the configuration
88+
to have the required things above generated and injected:
89+
```bash
90+
# Generation
91+
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
92+
knative.dev/sample-source/pkg/client knative.dev/sample-source/pkg/apis \
93+
"samples:v1alpha1" \
94+
--go-header-file ${REPO_ROOT}/hack/boilerplate/boilerplate.go.txt
95+
96+
# Injection
97+
${KNATIVE_CODEGEN_PKG}/hack/generate-knative.sh "injection" \
98+
knative.dev/sample-source/pkg/client knative.dev/sample-source/pkg/apis \
99+
"samples:v1alpha1" \
100+
--go-header-file ${REPO_ROOT}/hack/boilerplate/boilerplate.go.txt
102101
```
103102
104103
File Layout & Hierarchy:

docs/eventing/samples/writing-receive-adapter-source/02-lifecycle-and-types.md

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,27 @@ type SampleSource struct {
3232

3333
// SampleSourceSpec holds the desired state of the SampleSource (from the client).
3434
type SampleSourceSpec struct {
35-
// ServiceAccountName holds the name of the Kubernetes service account
36-
// as which the underlying K8s resources should be run. If unspecified
37-
// this will default to the "default" service account for the namespace
38-
// in which the SampleSource exists.
39-
// +optional
40-
ServiceAccountName string `json:"serviceAccountName,omitempty"`
35+
// inherits duck/v1 SourceSpec, which currently provides:
36+
// * Sink - a reference to an object that will resolve to a domain name or
37+
// a URI directly to use as the sink.
38+
// * CloudEventOverrides - defines overrides to control the output format
39+
// and modifications of the event sent to the sink.
40+
duckv1.SourceSpec `json:",inline"`
4141

42-
// Interval is the time interval between events.
43-
//
44-
// The string format is a sequence of decimal numbers, each with optional
45-
// fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
46-
// units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
47-
Interval string `json:"interval"`
42+
// ServiceAccountName holds the name of the Kubernetes service account
43+
// as which the underlying K8s resources should be run. If unspecified
44+
// this will default to the "default" service account for the namespace
45+
// in which the SampleSource exists.
46+
// +optional
47+
ServiceAccountName string `json:"serviceAccountName,omitempty"`
4848

49-
// Sink is a reference to an object that will resolve to a host
50-
// name to use as the sink.
51-
Sink *duckv1.Destination `json:"sink"`
49+
// Interval is the time interval between events.
50+
//
51+
// The string format is a sequence of decimal numbers, each with optional
52+
// fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
53+
// units are "ns", "us" (or "µs"), "ms", "s", "m", "h". If unspecified
54+
// this will default to "10s".
55+
Interval string `json:"interval"`
5256
}
5357

5458
// SampleSourceStatus communicates the observed state of the SampleSource (from the controller).
@@ -72,7 +76,7 @@ const (
7276

7377
```
7478
Define the functions that will be called from the Reconciler functions to set the lifecycle conditions. This is typically done in
75-
`pkg/apis/samples/VERSION/sampleservice_lifecycle.go`
79+
`pkg/apis/samples/VERSION/samplesource_lifecycle.go`
7680

7781
```go
7882
// InitializeConditions sets relevant unset conditions to Unknown state.

docs/eventing/samples/writing-receive-adapter-source/03-controller.md

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,44 +11,55 @@ Pass the new controller implementation to the shared main
1111
```go
1212
import (
1313
// The set of controllers this controller process runs.
14-
"knative.dev/sample-source/pkg/reconciler"
14+
"knative.dev/sample-source/pkg/reconciler/sample"
1515

1616
// This defines the shared main for injected controllers.
1717
"knative.dev/pkg/injection/sharedmain"
1818
)
1919

2020
func main() {
21-
sharedmain.Main("sample-source-controller",
22-
reconciler.NewController
23-
)
21+
sharedmain.Main("sample-source-controller", sample.NewController)
2422
}
2523
```
26-
Define the NewController implementation, it will be passed a configmap.Watcher, as well as a context which the injected listers will use for the reconciler struct arguments
24+
Define the NewController implementation, it will be passed a `configmap.Watcher`, as well as a context which the injected listers will use for the reconciler struct arguments
2725
```go
2826
func NewController(
2927
ctx context.Context,
3028
cmw configmap.Watcher,
3129
) *controller.Impl {
3230
// ...
31+
deploymentInformer := deploymentinformer.Get(ctx)
32+
sinkBindingInformer := sinkbindinginformer.Get(ctx)
3333
sampleSourceInformer := samplesourceinformer.Get(ctx)
3434

3535
r := &Reconciler{
36-
KubeClientSet: kubeclient.Get(ctx),
37-
EventingClientSet: eventingclient.Get(ctx),
38-
samplesourceLister: sampleSourceInformer.Lister(),
39-
deploymentLister: deploymentInformer.Lister(),
40-
samplesourceClientSet: samplesourceClient.Get(ctx),
41-
}
36+
dr: &reconciler.DeploymentReconciler{KubeClientSet: kubeclient.Get(ctx)},
37+
sbr: &reconciler.SinkBindingReconciler{EventingClientSet: eventingclient.Get(ctx)},
38+
// Config accessor takes care of tracing/config/logging config propagation to the receive adapter
39+
configAccessor: reconcilersource.WatchConfigurations(ctx, "sample-source", cmw),
40+
}
4241
```
4342
The base reconciler is imported from the knative.dev/pkg dependency:
4443
```go
4544
import (
4645
// ...
47-
"knative.dev/eventing/pkg/reconciler"
46+
reconcilersource "knative.dev/eventing/pkg/reconciler/source"
4847
// ...
4948
)
5049
```
5150
Ensure the correct informers have EventHandlers filtered to them
5251
```go
5352
sampleSourceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
5453
```
54+
Controller for the `SampleSource` uses `Deployment` and `SinkBinding` resources to deploy and also bind the event source and the receive adapter. Also ensure the informers are set up correctly for these secondary resources
55+
```go
56+
deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
57+
FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("SampleSource")),
58+
Handler: controller.HandleAll(impl.EnqueueControllerOf),
59+
})
60+
61+
sinkBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
62+
FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("SampleSource")),
63+
Handler: controller.HandleAll(impl.EnqueueControllerOf),
64+
})
65+
```

docs/eventing/samples/writing-receive-adapter-source/04-reconciler.md

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,64 +7,100 @@ type: "docs"
77

88
## Reconciler Functionality
99
General steps the reconciliation process needs to cover:
10-
1. Target the specific samplesource via the `sampleServiceClientSet`:
10+
1. Update the `ObservedGeneration` and initialize the `Status` conditions (as defined in `samplesource_lifecycle.go` and `samplesource_types.go`)
1111
```go
12-
// Get the resource with this namespace/name.
13-
original, err := r.Lister.SampleSources(namespace).Get(name)
12+
src.Status.InitializeConditions()
13+
src.Status.ObservedGeneration = src.Generation
14+
```
15+
2. Create/reconcile the Receive Adapter (detailed below)
16+
3. If successful, update the `Status` and `MarkDeployed`
17+
```go
18+
src.Status.PropagateDeploymentAvailability(ra)
19+
```
20+
4. Create/reconcile the `SinkBinding` for the Receive Adapter targeting the `Sink` (detailed below)
21+
5. MarkSink with the result
22+
```go
23+
src.Status.MarkSink(sb.Status.SinkURI)
24+
```
25+
6. Return a new reconciler event stating that the process is done
26+
```go
27+
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)
1428
```
15-
2. Update the ObservedGeneration
16-
Initialize the Status conditions (as defined in `samplesource_lifecycle.go` and `samplesource_types.go`)
17-
3. Reconcile the Sink and MarkSink with the result
18-
Create the Receive Adapter (detailed below)
19-
3. If successful, update the Status and MarkDeployed
20-
4. Reconcile the EventTypes and corresponding Status
21-
Creation and deletion of the events is done with the inherited `EventingClientSet().EventingV1alpha1()` api
22-
5. Update the full status field from the resulting reconcile attempt via the source’s clientset and api
23-
`r.samplesourceClientSet.SamplesV1alpha1().SampleSources(desired.Namespace).UpdateStatus(existing)`
24-
2529

2630
## Reconcile/Create The Receive Adapter
2731
As part of the source reconciliation, we have to create and deploy
28-
(and update if necessary) the underlying receive adapter. The two
29-
client sets used in this process is the `kubeClientSet` for the
30-
Deployment tracking, and the `EventingClientSet` for the event
31-
recording.
32+
(and update if necessary) the underlying receive adapter.
3233

3334
Verify the specified kubernetes resources are valid, and update the `Status` accordingly
3435

3536
Assemble the ReceiveAdapterArgs
3637
```go
3738
raArgs := resources.ReceiveAdapterArgs{
38-
EventSource: eventSource,
39-
Image: r.receiveAdapterImage,
40-
Source: src,
41-
Labels: resources.GetLabels(src.Name),
42-
SinkURI: sinkURI,
39+
EventSource: src.Namespace + "/" + src.Name,
40+
Image: r.ReceiveAdapterImage,
41+
Source: src,
42+
Labels: resources.Labels(src.Name),
43+
AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
4344
}
4445
```
4546
NB The exact arguments may change based on functional requirements
4647
Create the underlying deployment from the arguments provided, matching pod templates, labels, owner references, etc as needed to fill out the deployment
47-
Example: [pkg/reconciler/resources/receive_adapter.go](https://github.com/knative/sample-source/tree/master/pkg/reconciler/resources/receive_adapter.go)
48+
Example: [pkg/reconciler/sample/resources/receive_adapter.go](https://github.com/knative/sample-source/blob/master/pkg/reconciler/sample/resources/receive_adapter.go)
4849

4950
1. Fetch the existing receive adapter deployment
5051
```go
51-
ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{})
52+
namespace := owner.GetObjectMeta().GetNamespace()
53+
ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
5254
```
5355
2. Otherwise, create the deployment
5456
```go
55-
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
57+
ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
5658
```
5759
3. Check if the expected vs existing spec is different, and update the deployment if required
5860
```go
59-
} else if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) {
60-
ra.Spec.Template.Spec = expected.Spec.Template.Spec
61-
if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil {
62-
return ra, err
63-
}
61+
} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
62+
ra.Spec.Template.Spec = expected.Spec.Template.Spec
63+
if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
64+
return ra, err
65+
}
6466
```
6567
4. If updated, record the event
6668
```go
67-
r.Recorder.Eventf(src, corev1.EventTypeNormal, samplesourceDeploymentUpdated, "Deployment updated")
68-
return ra, nil
69+
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)
6970
```
7071
72+
## Reconcile/Create The SinkBinding
73+
Instead of directly giving the details of the sink to the receive adapter, use a `SinkBinding` to bind the receive adapter with the sink.
74+
75+
Steps here are almost the same with the `Deployment` reconciliation above, but it is for another resource, `SinkBinding`.
76+
77+
1. Create a `Reference` for the receive adapter deployment. This deployment will be `SinkBinding`'s source:
78+
```go
79+
tracker.Reference{
80+
APIVersion: appsv1.SchemeGroupVersion.String(),
81+
Kind: "Deployment",
82+
Namespace: ra.Namespace,
83+
Name: ra.Name,
84+
}
85+
```
86+
2. Fetch the existing `SinkBinding`
87+
```go
88+
namespace := owner.GetObjectMeta().GetNamespace()
89+
sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
90+
```
91+
2. If it doesn't exist, create it
92+
```go
93+
sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
94+
```
95+
3. Check if the expected vs existing spec is different, and update the `SinkBinding` if required
96+
```go
97+
else if r.specChanged(sb.Spec, expected.Spec) {
98+
sb.Spec = expected.Spec
99+
if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil {
100+
return sb, err
101+
}
102+
```
103+
4. If updated, record the event
104+
```go
105+
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)
106+
```

0 commit comments

Comments
 (0)