Skip to content

Commit 1d77529

Browse files
committed
use context to transport cluster, workspace and now recorder
On-behalf-of: @SAP [email protected]
1 parent 9605045 commit 1d77529

File tree

5 files changed

+79
-39
lines changed

5 files changed

+79
-39
lines changed

internal/controller/sync/controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
205205
return reconcile.Result{}, nil
206206
}
207207

208-
cInfo := sync.NewClusterInfo(logicalcluster.Name(request.ClusterName))
208+
recorder := cl.GetEventRecorderFor(ControllerName)
209209

210-
// if desired, fetch the cluster path as well (some downstream service providers might make use of it,
210+
ctx = sync.WithClusterName(ctx, logicalcluster.Name(request.ClusterName))
211+
ctx = sync.WithEventRecorder(ctx, recorder)
212+
213+
// if desired, fetch the workspace path as well (some downstream service providers might make use of it,
211214
// but since it requires an additional permission claim, it's optional)
212215
if r.pubRes.Spec.EnableWorkspacePaths {
213216
lc := &kcpdevcorev1alpha1.LogicalCluster{}
@@ -216,7 +219,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
216219
}
217220

218221
path := lc.Annotations[kcpcore.LogicalClusterPathAnnotationKey]
219-
cInfo = cInfo.WithWorkspacePath(logicalcluster.NewPath(path))
222+
ctx = sync.WithWorkspacePath(ctx, logicalcluster.NewPath(path))
220223
}
221224

222225
// sync main object
@@ -225,7 +228,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
225228
return reconcile.Result{}, fmt.Errorf("failed to create syncer: %w", err)
226229
}
227230

228-
requeue, err := syncer.Process(ctx, cInfo, remoteObj)
231+
requeue, err := syncer.Process(ctx, remoteObj)
229232
if err != nil {
230233
return reconcile.Result{}, err
231234
}

internal/sync/context.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,56 @@ limitations under the License.
1717
package sync
1818

1919
import (
20+
"context"
21+
2022
"github.com/kcp-dev/logicalcluster/v3"
23+
24+
"k8s.io/client-go/tools/record"
25+
)
26+
27+
type contextKey int
28+
29+
const (
30+
clusterContextKey contextKey = iota
31+
workspaceContextKey
32+
recorderContextKey
2133
)
2234

23-
type clusterInfo struct {
24-
clusterName logicalcluster.Name
25-
workspacePath logicalcluster.Path
35+
func WithClusterName(ctx context.Context, cluster logicalcluster.Name) context.Context {
36+
return context.WithValue(ctx, clusterContextKey, cluster)
2637
}
2738

28-
func NewClusterInfo(clusterName logicalcluster.Name) clusterInfo {
29-
return clusterInfo{
30-
clusterName: clusterName,
39+
func clusterFromContext(ctx context.Context) logicalcluster.Name {
40+
cluster, ok := ctx.Value(clusterContextKey).(logicalcluster.Name)
41+
if !ok {
42+
return ""
3143
}
44+
45+
return cluster
46+
}
47+
48+
func WithWorkspacePath(ctx context.Context, path logicalcluster.Path) context.Context {
49+
return context.WithValue(ctx, workspaceContextKey, path)
3250
}
3351

34-
func (c *clusterInfo) WithWorkspacePath(path logicalcluster.Path) clusterInfo {
35-
return clusterInfo{
36-
clusterName: c.clusterName,
37-
workspacePath: path,
52+
func workspacePathFromContext(ctx context.Context) logicalcluster.Path {
53+
path, ok := ctx.Value(workspaceContextKey).(logicalcluster.Path)
54+
if !ok {
55+
return logicalcluster.None
3856
}
57+
58+
return path
59+
}
60+
61+
func WithEventRecorder(ctx context.Context, recorder record.EventRecorder) context.Context {
62+
return context.WithValue(ctx, recorderContextKey, recorder)
63+
}
64+
65+
func recorderFromContext(ctx context.Context) record.EventRecorder {
66+
cluster, ok := ctx.Value(clusterContextKey).(record.EventRecorder)
67+
if !ok {
68+
return nil
69+
}
70+
71+
return cluster
3972
}

internal/sync/syncer.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222

23+
"github.com/kcp-dev/logicalcluster/v3"
2324
"go.uber.org/zap"
2425

2526
"github.com/kcp-dev/api-syncagent/internal/mutation"
@@ -136,11 +137,16 @@ func NewResourceSyncer(
136137
// Each of these steps can potentially end the current processing and return (true, nil). In this
137138
// case, the caller should re-fetch the remote object and call Process() again (most likely in the
138139
// next reconciliation). Only when (false, nil) is returned is the entire process finished.
139-
func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteObj *unstructured.Unstructured) (requeue bool, err error) {
140-
log := s.log.With("source-object", newObjectKey(remoteObj, info.clusterName, info.workspacePath))
140+
// The context must contain a cluster name and event recorder, optionally a workspace path.
141+
func (s *ResourceSyncer) Process(ctx context.Context, remoteObj *unstructured.Unstructured) (requeue bool, err error) {
142+
clusterName := clusterFromContext(ctx)
143+
workspacePath := workspacePathFromContext(ctx)
144+
objectKey := newObjectKey(remoteObj, clusterName, workspacePath)
145+
146+
log := s.log.With("source-object", objectKey)
141147

142148
// find the local equivalent object in the local service cluster
143-
localObj, err := s.findLocalObject(ctx, info, remoteObj)
149+
localObj, err := s.findLocalObject(ctx, objectKey)
144150
if err != nil {
145151
return false, fmt.Errorf("failed to find local equivalent: %w", err)
146152
}
@@ -151,8 +157,8 @@ func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteOb
151157
// Prepare object sync sides.
152158

153159
sourceSide := syncSide{
154-
clusterName: info.clusterName,
155-
workspacePath: info.workspacePath,
160+
clusterName: clusterName,
161+
workspacePath: workspacePath,
156162
client: s.remoteClient,
157163
object: remoteObj,
158164
}
@@ -172,7 +178,7 @@ func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteOb
172178
agentName: s.agentName,
173179
subresources: s.subresources,
174180
// use the projection and renaming rules configured in the PublishedResource
175-
destCreator: s.newLocalObjectCreator(info),
181+
destCreator: s.newLocalObjectCreator(clusterName, workspacePath),
176182
// for the main resource, status subresource handling is enabled (this
177183
// means _allowing_ status back-syncing, it still depends on whether the
178184
// status subresource even exists whether an update happens)
@@ -210,8 +216,8 @@ func (s *ResourceSyncer) Process(ctx context.Context, info clusterInfo, remoteOb
210216
return s.processRelatedResources(ctx, log, stateStore, sourceSide, destSide)
211217
}
212218

213-
func (s *ResourceSyncer) findLocalObject(ctx context.Context, info clusterInfo, remoteObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
214-
localSelector := labels.SelectorFromSet(newObjectKey(remoteObj, info.clusterName, info.workspacePath).Labels())
219+
func (s *ResourceSyncer) findLocalObject(ctx context.Context, objectKey objectKey) (*unstructured.Unstructured, error) {
220+
localSelector := labels.SelectorFromSet(objectKey.Labels())
215221

216222
localObjects := &unstructured.UnstructuredList{}
217223
localObjects.SetAPIVersion(s.destDummy.GetAPIVersion())
@@ -234,7 +240,7 @@ func (s *ResourceSyncer) findLocalObject(ctx context.Context, info clusterInfo,
234240
}
235241
}
236242

237-
func (s *ResourceSyncer) newLocalObjectCreator(info clusterInfo) objectCreatorFunc {
243+
func (s *ResourceSyncer) newLocalObjectCreator(clusterName logicalcluster.Name, workspacePath logicalcluster.Path) objectCreatorFunc {
238244
return func(remoteObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
239245
// map from the remote API into the actual, local API group
240246
destObj := remoteObj.DeepCopy()
@@ -244,7 +250,7 @@ func (s *ResourceSyncer) newLocalObjectCreator(info clusterInfo) objectCreatorFu
244250
destScope := syncagentv1alpha1.ResourceScope(s.localCRD.Spec.Scope)
245251

246252
// map namespace/name
247-
mappedName, err := templating.GenerateLocalObjectName(s.pubRes, remoteObj, info.clusterName, info.workspacePath)
253+
mappedName, err := templating.GenerateLocalObjectName(s.pubRes, remoteObj, clusterName, workspacePath)
248254
if err != nil {
249255
return nil, fmt.Errorf("failed to generate local object name: %w", err)
250256
}

internal/sync/syncer_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -909,8 +909,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
909909
t.Fatalf("Failed to create syncer: %v", err)
910910
}
911911

912-
ctx := t.Context()
913-
cInfo := NewClusterInfo(clusterName)
912+
ctx := WithClusterName(t.Context(), clusterName)
914913

915914
// setup a custom state backend that we can prime
916915
var backend *kubernetesBackend
@@ -940,7 +939,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
940939
t.Fatalf("Detected potential infinite loop, stopping after %d requeues.", i)
941940
}
942941

943-
requeue, err = syncer.Process(ctx, cInfo, target)
942+
requeue, err = syncer.Process(ctx, target)
944943
if err != nil {
945944
break
946945
}
@@ -960,7 +959,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
960959
}
961960
}
962961
} else {
963-
requeue, err = syncer.Process(ctx, cInfo, testcase.remoteObject)
962+
requeue, err = syncer.Process(ctx, testcase.remoteObject)
964963
}
965964

966965
finalRemoteObject, getErr := getFinalObjectVersion(ctx, remoteClient, testcase.remoteObject, testcase.expectedRemoteObject)
@@ -1216,8 +1215,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12161215
t.Fatalf("Failed to create syncer: %v", err)
12171216
}
12181217

1219-
ctx := t.Context()
1220-
cInfo := NewClusterInfo(clusterName)
1218+
ctx := WithClusterName(t.Context(), clusterName)
12211219

12221220
// setup a custom state backend that we can prime
12231221
var backend *kubernetesBackend
@@ -1247,7 +1245,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12471245
t.Fatalf("Detected potential infinite loop, stopping after %d requeues.", i)
12481246
}
12491247

1250-
requeue, err = syncer.Process(ctx, cInfo, target)
1248+
requeue, err = syncer.Process(ctx, target)
12511249
if err != nil {
12521250
break
12531251
}
@@ -1267,7 +1265,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
12671265
}
12681266
}
12691267
} else {
1270-
requeue, err = syncer.Process(ctx, cInfo, testcase.remoteObject)
1268+
requeue, err = syncer.Process(ctx, testcase.remoteObject)
12711269
}
12721270

12731271
finalRemoteObject, getErr := getFinalObjectVersion(ctx, remoteClient, testcase.remoteObject, testcase.expectedRemoteObject)

internal/sync/templating/naming.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ type localObjectNamingContext struct {
4040
ClusterPath logicalcluster.Path
4141
}
4242

43-
func newLocalObjectNamingContext(object *unstructured.Unstructured, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) localObjectNamingContext {
43+
func newLocalObjectNamingContext(object *unstructured.Unstructured, clusterName logicalcluster.Name, workspacePath logicalcluster.Path) localObjectNamingContext {
4444
return localObjectNamingContext{
4545
Object: object.Object,
4646
ClusterName: clusterName,
47-
ClusterPath: clusterPath,
47+
ClusterPath: workspacePath,
4848
}
4949
}
5050

@@ -53,7 +53,7 @@ var defaultNamingScheme = syncagentv1alpha1.ResourceNaming{
5353
Name: "{{ .Object.metadata.namespace | sha3short }}-{{ .Object.metadata.name | sha3short }}",
5454
}
5555

56-
func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *unstructured.Unstructured, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) (types.NamespacedName, error) {
56+
func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *unstructured.Unstructured, clusterName logicalcluster.Name, workspacePath logicalcluster.Path) (types.NamespacedName, error) {
5757
naming := pr.Spec.Naming
5858
if naming == nil {
5959
naming = &syncagentv1alpha1.ResourceNaming{}
@@ -65,7 +65,7 @@ func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *un
6565
if pattern == "" {
6666
pattern = defaultNamingScheme.Namespace
6767
}
68-
rendered, err := generateLocalObjectIdentifier(pattern, object, clusterName, clusterPath)
68+
rendered, err := generateLocalObjectIdentifier(pattern, object, clusterName, workspacePath)
6969
if err != nil {
7070
return result, fmt.Errorf("invalid namespace naming: %w", err)
7171
}
@@ -76,7 +76,7 @@ func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *un
7676
if pattern == "" {
7777
pattern = defaultNamingScheme.Name
7878
}
79-
rendered, err = generateLocalObjectIdentifier(pattern, object, clusterName, clusterPath)
79+
rendered, err = generateLocalObjectIdentifier(pattern, object, clusterName, workspacePath)
8080
if err != nil {
8181
return result, fmt.Errorf("invalid name naming: %w", err)
8282
}
@@ -86,10 +86,10 @@ func GenerateLocalObjectName(pr *syncagentv1alpha1.PublishedResource, object *un
8686
return result, nil
8787
}
8888

89-
func generateLocalObjectIdentifier(pattern string, object *unstructured.Unstructured, clusterName logicalcluster.Name, clusterPath logicalcluster.Path) (string, error) {
89+
func generateLocalObjectIdentifier(pattern string, object *unstructured.Unstructured, clusterName logicalcluster.Name, workspacePath logicalcluster.Path) (string, error) {
9090
// modern Go template style
9191
if strings.Contains(pattern, "{{") {
92-
return Render(pattern, newLocalObjectNamingContext(object, clusterName, clusterPath))
92+
return Render(pattern, newLocalObjectNamingContext(object, clusterName, workspacePath))
9393
}
9494

9595
// Legacy $variable style, does also not support clusterPath;

0 commit comments

Comments
 (0)