Skip to content

Commit a159be8

Browse files
JoshVanLdapr-botyaron2
authored andcommitted
Operator: Component server side scope filtering (dapr#7615)
* Operator: Component server side scope filtering Today, daprd's will receive Component from the operator, regardless of whether they are scoped for that Component or not. This means that clients do receive components (including its associated secrets) that they are not scoped for. Updates Operator API ComponentUpdate to perform service side Component Scope filtering based on the authenticated client App ID. When a Component is de-scoped, daprd will receive a DELETE for the previous Component manifest. When a Component is scoped-in, daprd will receive an CREATE for the new Component manifest. Updates Operator API ListComponents to perform service side Component Scope filtering based on the authenticated client App ID. Uses updated events batcher which ensures queue items are sent in order. Signed-off-by: joshvanl <[email protected]> * Add 2 daprds for operator informer tests Signed-off-by: joshvanl <[email protected]> * Fix control plane trust domain Signed-off-by: joshvanl <[email protected]> * Update github.com/dapr/kit to master Signed-off-by: joshvanl <[email protected]> * go mod tidy Signed-off-by: joshvanl <[email protected]> * Fix incorrect pointer manifest compare Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]> Co-authored-by: Dapr Bot <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Signed-off-by: Elena Kolevska <[email protected]>
1 parent fa54109 commit a159be8

File tree

37 files changed

+2064
-292
lines changed

37 files changed

+2064
-292
lines changed

pkg/apis/components/v1alpha1/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package v1alpha1
1515

1616
import (
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
1819

1920
"github.com/dapr/dapr/pkg/apis/common"
2021
"github.com/dapr/dapr/pkg/apis/components"
@@ -76,6 +77,14 @@ func (c Component) NameValuePairs() []common.NameValuePair {
7677
return c.Spec.Metadata
7778
}
7879

80+
func (c Component) ClientObject() client.Object {
81+
return &c
82+
}
83+
84+
func (c Component) GetScopes() []string {
85+
return c.Scopes
86+
}
87+
7988
// EmptyMetaDeepCopy returns a new instance of the component type with the
8089
// TypeMeta's Kind and APIVersion fields set.
8190
func (c Component) EmptyMetaDeepCopy() metav1.Object {

pkg/apis/httpEndpoint/v1alpha1/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package v1alpha1
1515

1616
import (
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
1819

1920
"github.com/dapr/dapr/pkg/apis/common"
2021
httpendpoint "github.com/dapr/dapr/pkg/apis/httpEndpoint"
@@ -108,6 +109,14 @@ func (h HTTPEndpoint) HasTLSPrivateKey() bool {
108109
return h.Spec.ClientTLS != nil && h.Spec.ClientTLS.PrivateKey != nil && h.Spec.ClientTLS.PrivateKey.Value != nil
109110
}
110111

112+
func (h HTTPEndpoint) ClientObject() client.Object {
113+
return &h
114+
}
115+
116+
func (h HTTPEndpoint) GetScopes() []string {
117+
return h.Scopes
118+
}
119+
111120
// EmptyMetaDeepCopy returns a new instance of the component type with the
112121
// TypeMeta's Kind and APIVersion fields set.
113122
func (h HTTPEndpoint) EmptyMetaDeepCopy() metav1.Object {

pkg/apis/subscriptions/v1alpha1/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package v1alpha1
1515

1616
import (
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
1819

1920
"github.com/dapr/dapr/pkg/apis/common"
2021
"github.com/dapr/dapr/pkg/apis/subscriptions"
@@ -107,3 +108,11 @@ func (s Subscription) LogName() string {
107108
func (s Subscription) NameValuePairs() []common.NameValuePair {
108109
return nil
109110
}
111+
112+
func (s Subscription) ClientObject() client.Object {
113+
return &s
114+
}
115+
116+
func (s Subscription) GetScopes() []string {
117+
return s.Scopes
118+
}

pkg/apis/subscriptions/v2alpha1/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package v2alpha1
1515

1616
import (
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
1819

1920
"github.com/dapr/dapr/pkg/apis/common"
2021
"github.com/dapr/dapr/pkg/apis/subscriptions"
@@ -138,3 +139,11 @@ func (s Subscription) LogName() string {
138139
func (s Subscription) NameValuePairs() []common.NameValuePair {
139140
return nil
140141
}
142+
143+
func (s Subscription) ClientObject() client.Object {
144+
return &s
145+
}
146+
147+
func (s Subscription) GetScopes() []string {
148+
return s.Scopes
149+
}

pkg/internal/apis/namevaluepair.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"fmt"
1818

1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"sigs.k8s.io/controller-runtime/pkg/client"
2021

2122
"github.com/dapr/dapr/pkg/apis/common"
2223
)
@@ -61,3 +62,11 @@ func (g GenericNameValueResource) LogName() string {
6162
func (g GenericNameValueResource) EmptyMetaDeepCopy() metav1.Object {
6263
return &metav1.ObjectMeta{Name: g.Name}
6364
}
65+
66+
func (g GenericNameValueResource) ClientObject() client.Object {
67+
return nil
68+
}
69+
70+
func (g GenericNameValueResource) GetScopes() []string {
71+
return nil
72+
}

pkg/operator/api/api.go

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ import (
2323
"sync/atomic"
2424

2525
"google.golang.org/grpc"
26+
"sigs.k8s.io/controller-runtime/pkg/cache"
2627
"sigs.k8s.io/controller-runtime/pkg/client"
2728

2829
componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
2930
httpendpointsapi "github.com/dapr/dapr/pkg/apis/httpEndpoint/v1alpha1"
3031
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
32+
"github.com/dapr/dapr/pkg/operator/api/informer"
3133
operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1"
3234
"github.com/dapr/dapr/pkg/security"
35+
"github.com/dapr/kit/concurrency"
3336
"github.com/dapr/kit/logger"
3437
)
3538

@@ -43,6 +46,7 @@ var log = logger.NewLogger("dapr.operator.api")
4346

4447
type Options struct {
4548
Client client.Client
49+
Cache cache.Cache
4650
Security security.Provider
4751
Port int
4852
}
@@ -51,33 +55,36 @@ type Options struct {
5155
type Server interface {
5256
Run(context.Context) error
5357
Ready(context.Context) error
54-
OnComponentUpdated(context.Context, operatorv1pb.ResourceEventType, *componentsapi.Component)
55-
OnHTTPEndpointUpdated(context.Context, *httpendpointsapi.HTTPEndpoint)
58+
5659
OnSubscriptionUpdated(context.Context, operatorv1pb.ResourceEventType, *subapi.Subscription)
60+
OnHTTPEndpointUpdated(context.Context, *httpendpointsapi.HTTPEndpoint)
5761
}
5862

5963
type apiServer struct {
6064
operatorv1pb.UnimplementedOperatorServer
6165
Client client.Client
6266
sec security.Provider
6367
port string
64-
// notify all dapr runtime
65-
connLock sync.Mutex
68+
69+
compInformer informer.Interface[componentsapi.Component]
70+
6671
endpointLock sync.Mutex
67-
allConnUpdateChan map[string]chan *ComponentUpdateEvent
6872
allEndpointsUpdateChan map[string]chan *httpendpointsapi.HTTPEndpoint
6973
allSubscriptionUpdateChan map[string]chan *SubscriptionUpdateEvent
74+
connLock sync.Mutex
7075
readyCh chan struct{}
7176
running atomic.Bool
7277
}
7378

7479
// NewAPIServer returns a new API server.
7580
func NewAPIServer(opts Options) Server {
7681
return &apiServer{
77-
Client: opts.Client,
82+
Client: opts.Client,
83+
compInformer: informer.New[componentsapi.Component](informer.Options{
84+
Cache: opts.Cache,
85+
}),
7886
sec: opts.Security,
7987
port: strconv.Itoa(opts.Port),
80-
allConnUpdateChan: make(map[string]chan *ComponentUpdateEvent),
8188
allEndpointsUpdateChan: make(map[string]chan *httpendpointsapi.HTTPEndpoint),
8289
allSubscriptionUpdateChan: make(map[string]chan *SubscriptionUpdateEvent),
8390
readyCh: make(chan struct{}),
@@ -106,39 +113,27 @@ func (a *apiServer) Run(ctx context.Context) error {
106113
}
107114
close(a.readyCh)
108115

109-
errCh := make(chan error)
110-
go func() {
111-
if rErr := s.Serve(lis); rErr != nil {
112-
errCh <- fmt.Errorf("gRPC server error: %w", rErr)
113-
return
114-
}
115-
errCh <- nil
116-
}()
117-
118-
// Block until context is done
119-
<-ctx.Done()
120-
121-
a.connLock.Lock()
122-
for key, ch := range a.allConnUpdateChan {
123-
close(ch)
124-
delete(a.allConnUpdateChan, key)
125-
}
126-
for key, ch := range a.allSubscriptionUpdateChan {
127-
close(ch)
128-
delete(a.allSubscriptionUpdateChan, key)
129-
}
130-
a.connLock.Unlock()
131-
132-
s.GracefulStop()
133-
err = <-errCh
134-
if err != nil {
135-
return err
136-
}
137-
err = lis.Close()
138-
if err != nil && !errors.Is(err, net.ErrClosed) {
139-
return fmt.Errorf("error closing listener: %w", err)
140-
}
141-
return nil
116+
return concurrency.NewRunnerManager(
117+
a.compInformer.Run,
118+
func(ctx context.Context) error {
119+
if err := s.Serve(lis); err != nil {
120+
return fmt.Errorf("gRPC server error: %w", err)
121+
}
122+
return nil
123+
},
124+
func(ctx context.Context) error {
125+
// Block until context is done
126+
<-ctx.Done()
127+
a.connLock.Lock()
128+
for key, ch := range a.allSubscriptionUpdateChan {
129+
close(ch)
130+
delete(a.allSubscriptionUpdateChan, key)
131+
}
132+
a.connLock.Unlock()
133+
s.GracefulStop()
134+
return nil
135+
},
136+
).Run(ctx)
142137
}
143138

144139
func (a *apiServer) Ready(ctx context.Context) error {

0 commit comments

Comments
 (0)