Skip to content

Commit 3c51b95

Browse files
committed
use context to transport cluster, workspace and now recorder
On-behalf-of: @SAP [email protected]
1 parent 9605045 commit 3c51b95

File tree

5 files changed

+82
-37
lines changed

5 files changed

+82
-37
lines changed

internal/controller/sync/controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
205205
return reconcile.Result{}, nil
206206
}
207207

208-
cInfo := sync.NewClusterInfo(logicalcluster.Name(request.ClusterName))
208+
recorder := cl.GetEventRecorderFor(ControllerName)
209209

210-
// if desired, fetch the cluster path as well (some downstream service providers might make use of it,
210+
ctx = sync.WithClusterName(ctx, logicalcluster.Name(request.ClusterName))
211+
ctx = sync.WithEventRecorder(ctx, recorder)
212+
213+
// if desired, fetch the workspace path as well (some downstream service providers might make use of it,
211214
// but since it requires an additional permission claim, it's optional)
212215
if r.pubRes.Spec.EnableWorkspacePaths {
213216
lc := &kcpdevcorev1alpha1.LogicalCluster{}
@@ -216,7 +219,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
216219
}
217220

218221
path := lc.Annotations[kcpcore.LogicalClusterPathAnnotationKey]
219-
cInfo = cInfo.WithWorkspacePath(logicalcluster.NewPath(path))
222+
ctx = sync.WithWorkspacePath(ctx, logicalcluster.NewPath(path))
220223
}
221224

222225
// sync main object
@@ -225,7 +228,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
225228
return reconcile.Result{}, fmt.Errorf("failed to create syncer: %w", err)
226229
}
227230

228-
requeue, err := syncer.Process(ctx, cInfo, remoteObj)
231+
requeue, err := syncer.Process(ctx, remoteObj)
229232
if err != nil {
230233
return reconcile.Result{}, err
231234
}

internal/sync/context.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,56 @@ limitations under the License.
1717
package sync
1818

1919
import (
20+
"context"
21+
2022
"github.com/kcp-dev/logicalcluster/v3"
23+
24+
"k8s.io/client-go/tools/record"
25+
)
26+
27+
type contextKey int
28+
29+
const (
30+
clusterContextKey contextKey = iota
31+
workspaceContextKey
32+
recorderContextKey
2133
)
2234

23-
type clusterInfo struct {
24-
clusterName logicalcluster.Name
25-
workspacePath logicalcluster.Path
35+
func WithClusterName(ctx context.Context, cluster logicalcluster.Name) context.Context {
36+
return context.WithValue(ctx, clusterContextKey, cluster)
2637
}
2738

28-
func NewClusterInfo(clusterName logicalcluster.Name) clusterInfo {
29-
return clusterInfo{
30-
clusterName: clusterName,
39+
func clusterFromContext(ctx context.Context) logicalcluster.Name {
40+
cluster, ok := ctx.Value(clusterContextKey).(logicalcluster.Name)
41+
if !ok {
42+
return ""
3143
}
44+
45+
return cluster
46+
}
47+
48+
func WithWorkspacePath(ctx context.Context, path logicalcluster.Path) context.Context {
49+
return context.WithValue(ctx, workspaceContextKey, path)
3250
}
3351

34-
func (c *clusterInfo) WithWorkspacePath(path logicalcluster.Path) clusterInfo {
35-
return clusterInfo{
36-
clusterName: c.clusterName,
37-
workspacePath: path,
52+
func workspacePathFromContext(ctx context.Context) logicalcluster.Path {
53+
path, ok := ctx.Value(workspaceContextKey).(logicalcluster.Path)
54+
if !ok {
55+
return logicalcluster.None
3856
}
57+
58+
return path
59+
}
60+
61+
func WithEventRecorder(ctx context.Context, recorder record.EventRecorder) context.Context {
62+
return context.WithValue(ctx, recorderContextKey, recorder)
63+
}
64+
65+
func recorderFromContext(ctx context.Context) record.EventRecorder {
66+
cluster, ok := ctx.Value(recorderContextKey).(record.EventRecorder)
67+
if !ok {
68+
return nil
69+
}
70+
71+
return cluster
3972
}

internal/sync/syncer.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222

23+
"github.com/kcp-dev/logicalcluster/v3"
2324
"go.uber.org/zap"
2425

2526
"github.com/kcp-dev/api-syncagent/internal/mutation"
@@ -136,11 +137,16 @@ func NewResourceSyncer(
136137
// Each of these steps can potentially end the current processing and return (true, nil). In this
137138
// case, the caller should re-fetch the remote object and call Process() again (most likely in the
138139
// next reconciliation). Only when (false, nil) is returned is the entire process finished.
139-
func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteObj *unstructured.Unstructured) (requeue bool, err error) {
140-
log := s.log.With("source-object", newObjectKey(remoteObj, info.clusterName, info.workspacePath))
140+
// The context must contain a cluster name and event recorder, optionally a workspace path.
141+
func (s *ResourceSyncer) Process(ctx context.Context, remoteObj *unstructured.Unstructured) (requeue bool, err error) {
142+
clusterName := clusterFromContext(ctx)
143+
workspacePath := workspacePathFromContext(ctx)
144+
objectKey := newObjectKey(remoteObj, clusterName, workspacePath)
145+
146+
log := s.log.With("source-object", objectKey)
141147

142148
// find the local equivalent object in the local service cluster
143-
localObj, err := s.findLocalObject(ctx, info, remoteObj)
149+
localObj, err := s.findLocalObject(ctx, objectKey)
144150
if err != nil {
145151
return false, fmt.Errorf("failed to find local equivalent: %w", err)
146152
}
@@ -151,8 +157,8 @@ func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteOb
151157
// Prepare object sync sides.
152158

153159
sourceSide := syncSide{
154-
clusterName: info.clusterName,
155-
workspacePath: info.workspacePath,
160+
clusterName: clusterName,
161+
workspacePath: workspacePath,
156162
client: s.remoteClient,
157163
object: remoteObj,
158164
}
@@ -172,7 +178,7 @@ func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteOb
172178
agentName: s.agentName,
173179
subresources: s.subresources,
174180
// use the projection and renaming rules configured in the PublishedResource
175-
destCreator: s.newLocalObjectCreator(info),
181+
destCreator: s.newLocalObjectCreator(clusterName, workspacePath),
176182
// for the main resource, status subresource handling is enabled (this
177183
// means _allowing_ status back-syncing, it still depends on whether the
178184
// status subresource even exists whether an update happens)
@@ -210,8 +216,8 @@ func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteOb
210216
return s.processRelatedResources(ctx, log, stateStore, sourceSide, destSide)
211217
}
212218

213-
func (s *ResourceSyncer) findLocalObject(ctx context.Context, info clusterInfo, remoteObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
214-
localSelector := labels.SelectorFromSet(newObjectKey(remoteObj, info.clusterName, info.workspacePath).Labels())
219+
func (s *ResourceSyncer) findLocalObject(ctx context.Context, objectKey objectKey) (*unstructured.Unstructured, error) {
220+
localSelector := labels.SelectorFromSet(objectKey.Labels())
215221

216222
localObjects := &unstructured.UnstructuredList{}
217223
localObjects.SetAPIVersion(s.destDummy.GetAPIVersion())
@@ -234,7 +240,7 @@ func (s *ResourceSyncer) findLocalObject(ctx context.Context, info clusterInfo,
234240
}
235241
}
236242

237-
func (s *ResourceSyncer) newLocalObjectCreator(info clusterInfo) objectCreatorFunc {
243+
func (s *ResourceSyncer) newLocalObjectCreator(clusterName logicalcluster.Name, workspacePath logicalcluster.Path) objectCreatorFunc {
238244
return func(remoteObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
239245
// map from the remote API into the actual, local API group
240246
destObj := remoteObj.DeepCopy()
@@ -244,7 +250,7 @@ func (s *ResourceSyncer) newLocalObjectCreator(info clusterInfo) objectCreatorFu
244250
destScope := syncagentv1alpha1.ResourceScope(s.localCRD.Spec.Scope)
245251

246252
// map namespace/name
247-
mappedName, err := templating.GenerateLocalObjectName(s.pubRes, remoteObj, info.clusterName, info.workspacePath)
253+
mappedName, err := templating.GenerateLocalObjectName(s.pubRes, remoteObj, clusterName, workspacePath)
248254
if err != nil {
249255
return nil, fmt.Errorf("failed to generate local object name: %w", err)
250256
}

internal/sync/syncer_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3838
"k8s.io/apimachinery/pkg/runtime"
3939
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
40+
"k8s.io/client-go/tools/record"
4041
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
4142
fakectrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
4243
)
@@ -910,7 +911,8 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
910911
}
911912

912913
ctx := t.Context()
913-
cInfo := NewClusterInfo(clusterName)
914+
ctx = WithClusterName(ctx, clusterName)
915+
ctx = WithEventRecorder(ctx, record.NewFakeRecorder(99))
914916

915917
// setup a custom state backend that we can prime
916918
var backend *kubernetesBackend
@@ -940,7 +942,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
940942
t.Fatalf("Detected potential infinite loop, stopping after %d requeues.", i)
941943
}
942944

943-
requeue, err = syncer.Process(ctx, cInfo, target)
945+
requeue, err = syncer.Process(ctx, target)
944946
if err != nil {
945947
break
946948
}
@@ -960,7 +962,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
960962
}
961963
}
962964
} else {
963-
requeue, err = syncer.Process(ctx, cInfo, testcase.remoteObject)
965+
requeue, err = syncer.Process(ctx, testcase.remoteObject)
964966
}
965967

966968
finalRemoteObject, getErr := getFinalObjectVersion(ctx, remoteClient, testcase.remoteObject, testcase.expectedRemoteObject)
@@ -1217,7 +1219,8 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12171219
}
12181220

12191221
ctx := t.Context()
1220-
cInfo := NewClusterInfo(clusterName)
1222+
ctx = WithClusterName(ctx, clusterName)
1223+
ctx = WithEventRecorder(ctx, record.NewFakeRecorder(99))
12211224

12221225
// setup a custom state backend that we can prime
12231226
var backend *kubernetesBackend
@@ -1247,7 +1250,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12471250
t.Fatalf("Detected potential infinite loop, stopping after %d requeues.", i)
12481251
}
12491252

1250-
requeue, err = syncer.Process(ctx, cInfo, target)
1253+
requeue, err = syncer.Process(ctx, target)
12511254
if err != nil {
12521255
break
12531256
}
@@ -1267,7 +1270,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12671270
}
12681271
}
12691272
} else {
1270-
requeue, err = syncer.Process(ctx, cInfo, testcase.remoteObject)
1273+
requeue, err = syncer.Process(ctx, testcase.remoteObject)
12711274
}
12721275

12731276
finalRemoteObject, getErr := getFinalObjectVersion(ctx, remoteClient, testcase.remoteObject, testcase.expectedRemoteObject)

internal/sync/templating/naming.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ type localObjectNamingContext struct {
4040
ClusterPath logicalcluster.Path
4141
}
4242

43-
func newLocalObjectNamingContext(object *unstructured.Unstructured, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) localObjectNamingContext {
43+
func newLocalObjectNamingContext(object *unstructured.Unstructured, clusterName logicalcluster.Name, workspacePath logicalcluster.Path) localObjectNamingContext {
4444
return localObjectNamingContext{
4545
Object: object.Object,
4646
ClusterName: clusterName,
47-
ClusterPath: clusterPath,
47+
ClusterPath: workspacePath,
4848
}
4949
}
5050

@@ -53,7 +53,7 @@ var defaultNamingScheme = syncagentv1alpha1.ResourceNaming{
5353
Name: "{{ .Object.metadata.namespace | sha3short }}-{{ .Object.metadata.name | sha3short }}",
5454
}
5555

56-
func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *unstructured.Unstructured, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) (types.NamespacedName, error) {
56+
func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *unstructured.Unstructured, clusterName logicalcluster.Name, workspacePath logicalcluster.Path) (types.NamespacedName, error) {
5757
naming := pr.Spec.Naming
5858
if naming == nil {
5959
naming = &syncagentv1alpha1.ResourceNaming{}
@@ -65,7 +65,7 @@ func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *un
6565
if pattern == "" {
6666
pattern = defaultNamingScheme.Namespace
6767
}
68-
rendered, err := generateLocalObjectIdentifier(pattern, object, clusterName, clusterPath)
68+
rendered, err := generateLocalObjectIdentifier(pattern, object, clusterName, workspacePath)
6969
if err != nil {
7070
return result, fmt.Errorf("invalid namespace naming: %w", err)
7171
}
@@ -76,7 +76,7 @@ func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *un
7676
if pattern == "" {
7777
pattern = defaultNamingScheme.Name
7878
}
79-
rendered, err = generateLocalObjectIdentifier(pattern, object, clusterName, clusterPath)
79+
rendered, err = generateLocalObjectIdentifier(pattern, object, clusterName, workspacePath)
8080
if err != nil {
8181
return result, fmt.Errorf("invalid name naming: %w", err)
8282
}
@@ -86,10 +86,10 @@ func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *un
8686
return result, nil
8787
}
8888

89-
func generateLocalObjectIdentifier(pattern string, object *unstructured.Unstructured, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) (string, error) {
89+
func generateLocalObjectIdentifier(pattern string, object *unstructured.Unstructured, clusterName logicalcluster.Name, workspacePath logicalcluster.Path) (string, error) {
9090
// modern Go template style
9191
if strings.Contains(pattern, "{{") {
92-
return Render(pattern, newLocalObjectNamingContext(object, clusterName, clusterPath))
92+
return Render(pattern, newLocalObjectNamingContext(object, clusterName, workspacePath))
9393
}
9494

9595
// Legacy $variable style, does also not support clusterPath;

0 commit comments

Comments
 (0)