Skip to content
This repository was archived by the owner on Aug 28, 2025. It is now read-only.

Commit 60894cd

Browse files
committed
feat: introduce ability to subscribe to a single item
1 parent 1630e65 commit 60894cd

File tree

5 files changed

+268
-123
lines changed

5 files changed

+268
-123
lines changed

cmd/start.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"sigs.k8s.io/controller-runtime/pkg/cache"
1515
"sigs.k8s.io/controller-runtime/pkg/client"
1616

17+
jenxv1 "github.tools.sap/automaticd/automaticd/operators/jenx/api/v1"
1718
jirav1alpha1 "github.tools.sap/automaticd/automaticd/operators/jira/api/v1alpha1"
1819
authzv1 "k8s.io/api/authorization/v1"
1920
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -30,6 +31,7 @@ var startCmd = &cobra.Command{
3031
authzv1.AddToScheme(schema)
3132

3233
jirav1alpha1.AddToScheme(schema)
34+
jenxv1.AddToScheme(schema)
3335

3436
k8sCache, err := cache.New(cfg, cache.Options{
3537
Scheme: schema,

gateway/gateway.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ func New(ctx context.Context, conf Config) (graphql.Schema, error) {
399399
}
400400

401401
capitalizedSingular := cases.Title(language.English).String(crd.Spec.Names.Singular)
402+
capitalizedPlural := cases.Title(language.English).String(crd.Spec.Names.Plural)
402403

403404
versionedMutationType.AddFieldConfig("delete"+capitalizedSingular, &graphql.Field{
404405
Type: graphql.Boolean,
@@ -439,6 +440,29 @@ func New(ctx context.Context, conf Config) (graphql.Schema, error) {
439440
}
440441

441442
subscriptions[group+typeInformation.Name+capitalizedSingular] = &graphql.Field{
443+
Type: crdType,
444+
Args: graphql.FieldConfigArgument{
445+
"name": &graphql.ArgumentConfig{
446+
Type: graphql.NewNonNull(graphql.String),
447+
Description: "the metadata.name of the object you want to watch",
448+
},
449+
"namespace": &graphql.ArgumentConfig{
450+
Type: graphql.NewNonNull(graphql.String),
451+
Description: "the metadata.namesapce of the objects you want to watch",
452+
},
453+
"emitOnlyFieldChanges": &graphql.ArgumentConfig{
454+
Type: graphql.Boolean,
455+
DefaultValue: false,
456+
Description: "only emit events if the fields that are requested have changed",
457+
},
458+
},
459+
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
460+
return p.Source, nil
461+
},
462+
Subscribe: resolver.subscribeItem(crd, typeInformation),
463+
}
464+
465+
subscriptions[group+typeInformation.Name+capitalizedPlural] = &graphql.Field{
442466
Type: graphql.NewList(crdType),
443467
Args: graphql.FieldConfigArgument{
444468
"namespace": &graphql.ArgumentConfig{
@@ -459,6 +483,28 @@ func New(ctx context.Context, conf Config) (graphql.Schema, error) {
459483

460484
if typeInformation.Storage {
461485
subscriptions[group+capitalizedSingular] = &graphql.Field{
486+
Type: crdType,
487+
Args: graphql.FieldConfigArgument{
488+
"name": &graphql.ArgumentConfig{
489+
Type: graphql.NewNonNull(graphql.String),
490+
Description: "the metadata.name of the object you want to watch",
491+
},
492+
"namespace": &graphql.ArgumentConfig{
493+
Type: graphql.NewNonNull(graphql.String),
494+
Description: "the metadata.namesapce of the objects you want to watch",
495+
},
496+
"emitOnlyFieldChanges": &graphql.ArgumentConfig{
497+
Type: graphql.Boolean,
498+
DefaultValue: false,
499+
Description: "only emit events if the fields that are requested have changed",
500+
},
501+
},
502+
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
503+
return p.Source, nil
504+
},
505+
Subscribe: resolver.subscribeItem(crd, typeInformation),
506+
}
507+
subscriptions[group+capitalizedPlural] = &graphql.Field{
462508
Type: graphql.NewList(crdType),
463509
Args: graphql.FieldConfigArgument{
464510
"namespace": &graphql.ArgumentConfig{

gateway/resolver.go

Lines changed: 137 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"slices"
88
"strings"
99

10+
"github.com/google/go-cmp/cmp"
1011
"github.com/graphql-go/graphql"
1112
"github.com/mitchellh/mapstructure"
1213
"go.opentelemetry.io/otel"
@@ -283,9 +284,94 @@ func (r *resolver) updateItem(crd apiextensionsv1.CustomResourceDefinition, type
283284
}
284285
}
285286

287+
func (r *resolver) subscribeItem(crd apiextensionsv1.CustomResourceDefinition, typeInformation apiextensionsv1.CustomResourceDefinitionVersion) func(p graphql.ResolveParams) (interface{}, error) {
288+
logger := slog.With(slog.String("operation", "subribeItem"), slog.String("kind", crd.Spec.Names.Kind), slog.String("version", typeInformation.Name))
289+
return func(p graphql.ResolveParams) (interface{}, error) {
290+
ctx, span := otel.Tracer("").Start(p.Context, "SubscribeForObject", trace.WithAttributes(attribute.String("kind", crd.Spec.Names.Kind)))
291+
defer span.End()
292+
293+
var metadatInput MetadatInput
294+
if err := mapstructure.Decode(p.Args, &metadatInput); err != nil {
295+
logger.Error("unable to decode metadata input", "error", err)
296+
return nil, err
297+
}
298+
299+
listType, ok := r.conf.pluralToListType[crd.Spec.Names.Plural]
300+
if !ok {
301+
logger.Error("no typed client available for the reuqested type")
302+
return nil, errors.New("no typed client available for the reuqested type")
303+
}
304+
305+
if err := isAuthorized(ctx, r.conf.Client, authzv1.ResourceAttributes{
306+
Verb: "watch",
307+
Group: crd.Spec.Group,
308+
Version: typeInformation.Name,
309+
Resource: crd.Spec.Names.Plural,
310+
Namespace: metadatInput.Namespace,
311+
Name: metadatInput.Name,
312+
}); err != nil {
313+
return nil, err
314+
}
315+
316+
listWatch, err := r.conf.Client.Watch(ctx, listType(), client.InNamespace(metadatInput.Namespace), client.MatchingFields{"metadata.name": metadatInput.Name})
317+
if err != nil {
318+
logger.Error("unable to watch object", slog.Any("error", err))
319+
return nil, err
320+
}
321+
322+
resultChannel := make(chan interface{})
323+
324+
go func() {
325+
var item client.Object
326+
for ev := range listWatch.ResultChan() {
327+
changed := false
328+
select {
329+
case <-ctx.Done():
330+
slog.Info("stopping watch due to client cancel")
331+
listWatch.Stop()
332+
close(resultChannel)
333+
return
334+
default:
335+
switch ev.Type {
336+
case watch.Added:
337+
item = ev.Object.(client.Object)
338+
changed = true
339+
case watch.Modified:
340+
341+
emitOnlyFieldChanges, ok := p.Args["emitOnlyFieldChanges"].(bool)
342+
343+
changed = determineFieldChanged(ok && emitOnlyFieldChanges, ev.Object.(client.Object), resultChannel, p, item)
344+
item = ev.Object.(client.Object)
345+
case watch.Deleted:
346+
itemType, ok := r.conf.pluralToObjectType[crd.Spec.Names.Plural]
347+
if !ok {
348+
logger.Error("no typed client available for the reuqested type")
349+
listWatch.Stop()
350+
close(resultChannel)
351+
}
352+
353+
item = itemType()
354+
default:
355+
logger.Info("skipping event", "event", ev.Type, "object", ev.Object)
356+
continue
357+
}
358+
}
359+
360+
if val, ok := p.Args["emitOnlyFieldChanges"].(bool); ok && val && changed {
361+
resultChannel <- item
362+
} else if !ok || !val {
363+
resultChannel <- item
364+
}
365+
}
366+
}()
367+
368+
return resultChannel, nil
369+
}
370+
}
371+
286372
func (r *resolver) subscribeItems(crd apiextensionsv1.CustomResourceDefinition, typeInformation apiextensionsv1.CustomResourceDefinitionVersion) func(p graphql.ResolveParams) (interface{}, error) {
287373
return func(p graphql.ResolveParams) (interface{}, error) {
288-
ctx, span := otel.Tracer("").Start(p.Context, "Subscribe", trace.WithAttributes(attribute.String("kind", crd.Spec.Names.Kind)))
374+
ctx, span := otel.Tracer("").Start(p.Context, "SubscribeForNamespace", trace.WithAttributes(attribute.String("kind", crd.Spec.Names.Kind)))
289375
defer span.End()
290376

291377
listType, ok := r.conf.pluralToListType[crd.Spec.Names.Plural]
@@ -330,47 +416,9 @@ func (r *resolver) subscribeItems(crd apiextensionsv1.CustomResourceDefinition,
330416
for i, item := range items {
331417
if item.GetName() == ev.Object.(client.Object).GetName() {
332418

333-
if val, ok := p.Args["emitOnlyFieldChanges"].(bool); ok && val {
334-
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ev.Object)
335-
if err != nil {
336-
// TODO: handle error
337-
close(resultChannel)
338-
}
339-
340-
fields := getRequestedFields(p)
341-
342-
currentItemUnstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(item)
343-
if err != nil {
344-
// TODO: handle error
345-
close(resultChannel)
346-
}
347-
348-
for _, field := range fields {
349-
fieldValue, found, err := unstructured.NestedFieldNoCopy(unstructuredObj, strings.Split(field, ".")...)
350-
if err != nil {
351-
// TODO: handle error
352-
slog.Error("unable to get field value", "error", err)
353-
close(resultChannel)
354-
}
355-
356-
currentFieldValue, currentFound, err := unstructured.NestedFieldNoCopy(currentItemUnstructured, strings.Split(field, ".")...)
357-
if err != nil {
358-
// TODO: handle error
359-
slog.Error("unable to get field value", "error", err)
360-
close(resultChannel)
361-
}
362-
363-
if !found || !currentFound {
364-
continue
365-
}
366-
if fieldValue == currentFieldValue {
367-
continue
368-
}
369-
370-
changed = true
371-
372-
}
373-
}
419+
emitOnlyFieldChanges, ok := p.Args["emitOnlyFieldChanges"].(bool)
420+
421+
changed = determineFieldChanged(ok && emitOnlyFieldChanges, ev.Object.(client.Object), resultChannel, p, item)
374422

375423
items[i] = ev.Object.(client.Object)
376424
break
@@ -402,6 +450,53 @@ func (r *resolver) subscribeItems(crd apiextensionsv1.CustomResourceDefinition,
402450
}
403451
}
404452

453+
func determineFieldChanged(emitOnlyFieldChanges bool, obj client.Object, resultChannel chan interface{}, p graphql.ResolveParams, currentItem client.Object) bool {
454+
if !emitOnlyFieldChanges {
455+
return true
456+
}
457+
458+
unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
459+
if err != nil {
460+
// TODO: handle error
461+
close(resultChannel)
462+
}
463+
464+
fields := getRequestedFields(p)
465+
466+
currentItemUnstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentItem)
467+
if err != nil {
468+
// TODO: handle error
469+
close(resultChannel)
470+
}
471+
472+
for _, field := range fields {
473+
fieldValue, found, err := unstructured.NestedFieldNoCopy(unstructuredObj, strings.Split(field, ".")...)
474+
if err != nil {
475+
// TODO: handle error
476+
slog.Error("unable to get field value", "error", err)
477+
close(resultChannel)
478+
}
479+
480+
currentFieldValue, currentFound, err := unstructured.NestedFieldNoCopy(currentItemUnstructured, strings.Split(field, ".")...)
481+
if err != nil {
482+
// TODO: handle error
483+
slog.Error("unable to get field value", "error", err)
484+
close(resultChannel)
485+
}
486+
487+
if !found || !currentFound {
488+
continue
489+
}
490+
if cmp.Equal(fieldValue, currentFieldValue) {
491+
continue
492+
}
493+
494+
return true
495+
}
496+
497+
return false
498+
}
499+
405500
func isAuthorized(ctx context.Context, c client.Client, resourceAttributes authzv1.ResourceAttributes) error {
406501
ctx, span := otel.Tracer("").Start(ctx, "AuthorizationCheck")
407502
defer span.End()

go.mod

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
module github.com/openmfp/crd-gql-gateway
22

3-
go 1.22.3
3+
go 1.22.5
44

55
require (
66
github.com/golang-jwt/jwt/v5 v5.2.1
7+
github.com/google/go-cmp v0.6.0
78
github.com/graphql-go/graphql v0.8.1
89
github.com/graphql-go/handler v0.2.3
910
github.com/mitchellh/mapstructure v1.5.0
10-
github.com/spf13/cobra v1.8.0
11+
github.com/spf13/cobra v1.8.1
12+
github.tools.sap/automaticd/automaticd/operators/jenx v0.0.0-20240726034443-228245d73830
1113
github.tools.sap/automaticd/automaticd/operators/jira v0.0.0-20240513130407-09009e9c881c
12-
go.opentelemetry.io/otel v1.27.0
13-
go.opentelemetry.io/otel/trace v1.27.0
14-
golang.org/x/text v0.15.0
15-
k8s.io/api v0.30.1
16-
k8s.io/apiextensions-apiserver v0.30.1
17-
k8s.io/apimachinery v0.30.1
18-
k8s.io/client-go v0.30.1
19-
sigs.k8s.io/controller-runtime v0.18.3
14+
go.opentelemetry.io/otel v1.28.0
15+
go.opentelemetry.io/otel/trace v1.28.0
16+
golang.org/x/text v0.16.0
17+
k8s.io/api v0.30.3
18+
k8s.io/apiextensions-apiserver v0.30.2
19+
k8s.io/apimachinery v0.30.3
20+
k8s.io/client-go v0.30.3
21+
sigs.k8s.io/controller-runtime v0.18.4
2022
)
2123

2224
require (
@@ -36,7 +38,6 @@ require (
3638
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
3739
github.com/golang/protobuf v1.5.4 // indirect
3840
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
39-
github.com/google/go-cmp v0.6.0 // indirect
4041
github.com/google/gofuzz v1.2.0 // indirect
4142
github.com/google/uuid v1.6.0 // indirect
4243
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
@@ -48,36 +49,35 @@ require (
4849
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4950
github.com/modern-go/reflect2 v1.0.2 // indirect
5051
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
51-
github.com/onsi/ginkgo/v2 v2.19.0 // indirect
5252
github.com/pkg/errors v0.9.1 // indirect
5353
github.com/prometheus/client_golang v1.19.1 // indirect
5454
github.com/prometheus/client_model v0.6.1 // indirect
5555
github.com/prometheus/common v0.53.0 // indirect
5656
github.com/prometheus/procfs v0.15.0 // indirect
5757
github.com/spf13/pflag v1.0.5 // indirect
58-
github.tools.sap/automaticd/automaticd/shared v0.39.0 // indirect
59-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect
60-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0 // indirect
61-
go.opentelemetry.io/otel/metric v1.27.0 // indirect
62-
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
63-
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
64-
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d // indirect
65-
golang.org/x/net v0.25.0 // indirect
58+
github.tools.sap/automaticd/automaticd/shared v0.41.1 // indirect
59+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
60+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect
61+
go.opentelemetry.io/otel/metric v1.28.0 // indirect
62+
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
63+
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
64+
golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect
65+
golang.org/x/net v0.27.0 // indirect
6666
golang.org/x/oauth2 v0.21.0 // indirect
67-
golang.org/x/sys v0.20.0 // indirect
68-
golang.org/x/term v0.20.0 // indirect
67+
golang.org/x/sys v0.22.0 // indirect
68+
golang.org/x/term v0.22.0 // indirect
6969
golang.org/x/time v0.5.0 // indirect
7070
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
71-
google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 // indirect
72-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect
73-
google.golang.org/grpc v1.64.0 // indirect
74-
google.golang.org/protobuf v1.34.1 // indirect
71+
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
72+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240708141625-4ad9e859172b // indirect
73+
google.golang.org/grpc v1.65.0 // indirect
74+
google.golang.org/protobuf v1.34.2 // indirect
7575
gopkg.in/inf.v0 v0.9.1 // indirect
7676
gopkg.in/yaml.v2 v2.4.0 // indirect
7777
gopkg.in/yaml.v3 v3.0.1 // indirect
78-
k8s.io/klog/v2 v2.120.1 // indirect
78+
k8s.io/klog/v2 v2.130.1 // indirect
7979
k8s.io/kube-openapi v0.0.0-20240430033511-f0e62f92d13f // indirect
80-
k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect
80+
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
8181
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
8282
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
8383
sigs.k8s.io/yaml v1.4.0 // indirect

0 commit comments

Comments
 (0)