Skip to content

Commit 0b365da

Browse files
committed
Setup dynamic secret eventing system
1 parent 268b84f commit 0b365da

File tree

4 files changed

+288
-15
lines changed

4 files changed

+288
-15
lines changed

controllers/instant_updates.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ func (v vaultWebsocketConnAdapter) Close(code websocket.StatusCode, reason strin
5959
return v.conn.Close(code, reason)
6060
}
6161

62+
// eventMsg is used to extract the relevant fields from an event message sent
63+
// from Vault
64+
type eventMsg struct {
65+
Data struct {
66+
Event struct {
67+
Metadata struct {
68+
Path string `json:"path"`
69+
Modified string `json:"modified"`
70+
Operation string `json:"operation"`
71+
} `json:"metadata"`
72+
} `json:"event"`
73+
Namespace string `json:"namespace"`
74+
} `json:"data"`
75+
}
76+
6277
// StreamSecretEventsFunc streams Vault events for the provided object.
6378
type StreamSecretEventsFunc func(context.Context, client.Object, websocketConnector) error
6479

controllers/vaultdynamicsecret_controller.go

Lines changed: 195 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@ import (
1212
"maps"
1313
"net/http"
1414
"os"
15+
"strings"
1516
"time"
1617

1718
"github.com/cenkalti/backoff/v4"
19+
"github.com/hashicorp/go-secure-stdlib/parseutil"
1820
"github.com/hashicorp/vault/api"
1921
corev1 "k8s.io/api/core/v1"
2022
apierrors "k8s.io/apimachinery/pkg/api/errors"
2123
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2224
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/apimachinery/pkg/types"
2426
"k8s.io/client-go/tools/record"
27+
"nhooyr.io/websocket"
2528

2629
ctrl "sigs.k8s.io/controller-runtime"
2730
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -44,6 +47,14 @@ const (
4447
vaultDynamicSecretFinalizer = "vaultdynamicsecret.secrets.hashicorp.com/finalizer"
4548
)
4649

50+
func dynamicSecretEventPath(o *secretsv1beta1.VaultDynamicSecret) string {
51+
mount := strings.Trim(o.Spec.Mount, "/")
52+
if mount == "" {
53+
return "/v1/sys/events/subscribe/*"
54+
}
55+
return fmt.Sprintf("/v1/sys/events/subscribe/%s*", mount)
56+
}
57+
4758
// staticCredsJitterHorizon should be used when computing the jitter
4859
// duration for the static-creds rotation time horizon.
4960
var (
@@ -67,7 +78,8 @@ type VaultDynamicSecretReconciler struct {
6778
// sourceCh is used to trigger a requeue of resource instances from an
6879
// external source. Should be set on a source.Channel in SetupWithManager.
6980
// This channel should be closed when the controller is stopped.
70-
SourceCh chan event.GenericEvent
81+
SourceCh chan event.GenericEvent
82+
eventWatcherRegistry *eventWatcherRegistry
7183
// runtimePodUID should always be set when updating resource's Status.
7284
// This is done via the downwardAPI. We get the current Pod's UID from either the
7385
// OPERATOR_POD_UID environment variable, or the /var/run/podinfo/uid file; in that order.
@@ -389,6 +401,47 @@ func (r *VaultDynamicSecretReconciler) Reconcile(ctx context.Context, req ctrl.R
389401
}
390402
}
391403

404+
if o.Spec.SyncConfig != nil && o.Spec.SyncConfig.InstantUpdates {
405+
logger.V(consts.LogLevelDebug).Info("Event watcher enabled for VaultDynamicSecret")
406+
err := EnsureEventWatcher(ctx, &InstantUpdateConfig{
407+
Secret: o,
408+
Client: vClient,
409+
WatchPath: dynamicSecretEventPath(o),
410+
Registry: r.eventWatcherRegistry,
411+
BackOffRegistry: r.BackOffRegistry,
412+
SourceCh: r.SourceCh,
413+
Recorder: r.Recorder,
414+
EventObjectFactory: func(key types.NamespacedName) client.Object {
415+
return &secretsv1beta1.VaultDynamicSecret{
416+
ObjectMeta: metav1.ObjectMeta{
417+
Namespace: key.Namespace,
418+
Name: key.Name,
419+
},
420+
}
421+
},
422+
StreamSecretEvents: func(watchCtx context.Context, obj client.Object, wsClient websocketConnector) error {
423+
vds, ok := obj.(*secretsv1beta1.VaultDynamicSecret)
424+
if !ok {
425+
return fmt.Errorf("unexpected object type %T", obj)
426+
}
427+
return r.streamDynamicSecretEvents(watchCtx, vds, wsClient)
428+
},
429+
NewClientFunc: func(watchCtx context.Context, obj client.Object) (vault.Client, error) {
430+
vds, ok := obj.(*secretsv1beta1.VaultDynamicSecret)
431+
if !ok {
432+
return nil, fmt.Errorf("unexpected object type %T", obj)
433+
}
434+
return r.ClientFactory.Get(watchCtx, r.Client, vds)
435+
},
436+
})
437+
if err != nil {
438+
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
439+
"Failed to watch events: %s", err)
440+
}
441+
} else {
442+
UnwatchEvents(r.eventWatcherRegistry, o)
443+
}
444+
392445
if ok := r.SyncRegistry.Delete(req.NamespacedName); ok {
393446
logger.V(consts.LogLevelDebug).Info("Deleted object from SyncRegistry",
394447
"obj", req.NamespacedName)
@@ -700,6 +753,7 @@ func (r *VaultDynamicSecretReconciler) SetupWithManager(mgr ctrl.Manager, opts c
700753

701754
// TODO: close this channel when the controller is stopped.
702755
r.SourceCh = make(chan event.GenericEvent)
756+
r.eventWatcherRegistry = newEventWatcherRegistry()
703757
m := ctrl.NewControllerManagedBy(mgr).
704758
For(&secretsv1beta1.VaultDynamicSecret{}).
705759
WithOptions(opts).
@@ -748,6 +802,7 @@ func (r *VaultDynamicSecretReconciler) handleDeletion(ctx context.Context, o *se
748802
r.SyncRegistry.Delete(objKey)
749803
r.BackOffRegistry.Delete(objKey)
750804
r.referenceCache.Remove(SecretTransformation, objKey)
805+
UnwatchEvents(r.eventWatcherRegistry, o)
751806
if controllerutil.ContainsFinalizer(o, vaultDynamicSecretFinalizer) {
752807
logger.Info("Removing finalizer")
753808
if controllerutil.RemoveFinalizer(o, vaultDynamicSecretFinalizer) {
@@ -761,6 +816,135 @@ func (r *VaultDynamicSecretReconciler) handleDeletion(ctx context.Context, o *se
761816
return nil
762817
}
763818

819+
func (r *VaultDynamicSecretReconciler) streamDynamicSecretEvents(ctx context.Context, o *secretsv1beta1.VaultDynamicSecret, wsClient websocketConnector) error {
820+
logger := log.FromContext(ctx).WithName("streamDynamicSecretEvents")
821+
conn, err := wsClient.Connect(ctx)
822+
if err != nil {
823+
return fmt.Errorf("failed to connect to vault websocket: %w", err)
824+
}
825+
defer conn.Close(websocket.StatusNormalClosure, "closing event watcher")
826+
827+
// We made it past the initial websocket connection, so emit a "good" event
828+
// status
829+
r.Recorder.Event(o, corev1.EventTypeNormal, consts.ReasonEventWatcherStarted, "Started watching events")
830+
831+
for {
832+
select {
833+
case <-ctx.Done():
834+
logger.V(consts.LogLevelDebug).Info("Context done, closing websocket",
835+
"namespace", o.Namespace, "name", o.Name)
836+
return nil
837+
default:
838+
msgType, message, err := conn.Read(ctx)
839+
if err != nil {
840+
return fmt.Errorf("failed to read from websocket: %w, message: %q",
841+
err, string(message))
842+
}
843+
messageMap := eventMsg{}
844+
err = json.Unmarshal(message, &messageMap)
845+
if err != nil {
846+
return fmt.Errorf("failed to unmarshal event message: %w", err)
847+
}
848+
logger.V(consts.LogLevelDebug).Info("Received message",
849+
"message type", msgType, "message", messageMap)
850+
851+
modified, err := parseutil.ParseBool(messageMap.Data.Event.Metadata.Modified)
852+
if err != nil {
853+
return fmt.Errorf("failed to parse modified field: %w", err)
854+
}
855+
operation, err := parseutil.ParseString(messageMap.Data.Event.Metadata.Operation)
856+
if err != nil {
857+
return fmt.Errorf("failed to parse operation field: %w", err)
858+
}
859+
860+
// Skip non-modified events early
861+
if !modified {
862+
logger.V(consts.LogLevelTrace).Info("Non-modified event received from Vault, ignoring",
863+
"message", messageMap)
864+
continue
865+
}
866+
867+
path := messageMap.Data.Event.Metadata.Path
868+
vdsPath := vault.JoinPath(o.Spec.Mount, o.Spec.Path)
869+
870+
logger.Info("Modified event received from Vault",
871+
"path", path, "operation", operation, "vdsPath", vdsPath)
872+
873+
// Handle credential creation/update events
874+
if r.isCreateOrUpdateEvent(operation) && path == vdsPath {
875+
logger.Info("Create/update event received on VaultDynamicSecret, triggering reconciliation",
876+
"operation", operation, "path", path)
877+
878+
r.triggerVDSReconciliation(o)
879+
continue
880+
}
881+
882+
// Handle role deletion events
883+
if r.isRoleDeletionEvent(operation) {
884+
expectedRolePath := r.getRolePathForCredentialPath(vdsPath)
885+
if path == expectedRolePath {
886+
logger.Info("Role deletion event affects our VDS, triggering deletion",
887+
"operation", operation, "path", path, "expectedRolePath", expectedRolePath)
888+
889+
// Handle role deletion by removing finalizers and deleting the object
890+
if err := r.handleRoleDeletion(ctx, o); err != nil {
891+
logger.Error(err, "Failed to handle VDS deletion after role deletion", "path", path)
892+
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonEventWatcherError,
893+
"Failed to handle VDS deletion for role path %s: %s", path, err)
894+
} else {
895+
r.Recorder.Eventf(o, corev1.EventTypeNormal, "RoleDeleted",
896+
"VaultDynamicSecret deleted due to corresponding role deletion in Vault")
897+
}
898+
return nil // Exit the event loop as we're deleting the VDS
899+
}
900+
}
901+
902+
// Log events that don't match our criteria
903+
logger.V(consts.LogLevelTrace).Info("Event does not match our VDS criteria, ignoring",
904+
"operation", operation, "path", path, "vdsPath", vdsPath)
905+
}
906+
}
907+
}
908+
909+
func (r *VaultDynamicSecretReconciler) triggerVDSReconciliation(o *secretsv1beta1.VaultDynamicSecret) {
910+
if r.SourceCh == nil {
911+
return
912+
}
913+
objKey := client.ObjectKeyFromObject(o)
914+
if r.SyncRegistry != nil {
915+
r.SyncRegistry.Add(objKey)
916+
}
917+
r.SourceCh <- event.GenericEvent{
918+
Object: &secretsv1beta1.VaultDynamicSecret{
919+
ObjectMeta: metav1.ObjectMeta{
920+
Namespace: o.Namespace,
921+
Name: o.Name,
922+
},
923+
},
924+
}
925+
}
926+
927+
func (r *VaultDynamicSecretReconciler) getRolePathForCredentialPath(credentialPath string) string {
928+
if strings.Contains(credentialPath, "/static-creds/") {
929+
return strings.Replace(credentialPath, "/static-creds/", "/static-roles/", 1)
930+
}
931+
if strings.Contains(credentialPath, "/creds/") {
932+
return strings.Replace(credentialPath, "/creds/", "/roles/", 1)
933+
}
934+
return credentialPath
935+
}
936+
937+
func (r *VaultDynamicSecretReconciler) handleRoleDeletion(ctx context.Context, o *secretsv1beta1.VaultDynamicSecret) error {
938+
cur := &secretsv1beta1.VaultDynamicSecret{}
939+
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(o), cur); err != nil {
940+
if apierrors.IsNotFound(err) {
941+
return nil
942+
}
943+
return err
944+
}
945+
return client.IgnoreNotFound(r.Client.Delete(ctx, cur))
946+
}
947+
764948
// revokeLease revokes the VDS secret's lease.
765949
// NOTE: Enabling revocation requires the VaultAuthMethod referenced by `o.Spec.VaultAuthRef` to have a policy
766950
// that includes `path "sys/leases/revoke" { capabilities = ["update"] }`, otherwise this will fail with permission
@@ -922,6 +1106,16 @@ func (r *VaultDynamicSecretReconciler) vaultClientCallback(ctx context.Context,
9221106
}
9231107
}
9241108

1109+
// isCreateOrUpdateEvent checks if the operation is a credential creation/update event
1110+
func (r *VaultDynamicSecretReconciler) isCreateOrUpdateEvent(operation string) bool {
1111+
return operation == "creds-create" || operation == "static-roles-create" || operation == "static-roles-update"
1112+
}
1113+
1114+
// isRoleDeletionEvent checks if the operation is a role deletion event
1115+
func (r *VaultDynamicSecretReconciler) isRoleDeletionEvent(operation string) bool {
1116+
return operation == "role-delete" || operation == "static-role-delete"
1117+
}
1118+
9251119
func computeRotationTime(o *secretsv1beta1.VaultDynamicSecret) time.Time {
9261120
var ts int64
9271121
var horizon time.Duration

controllers/vaultdynamicsecret_controller_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/stretchr/testify/require"
1717
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1818
"k8s.io/apimachinery/pkg/types"
19+
"k8s.io/client-go/tools/record"
1920
"k8s.io/client-go/util/workqueue"
2021
"sigs.k8s.io/controller-runtime/pkg/client"
2122
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -995,6 +996,83 @@ func TestVaultDynamicSecretReconciler_computePostSyncHorizon(t *testing.T) {
995996
}
996997
}
997998

999+
func Test_dynamicSecretEventPath(t *testing.T) {
1000+
t.Parallel()
1001+
tests := []struct {
1002+
name string
1003+
mount string
1004+
want string
1005+
}{
1006+
{
1007+
name: "mount-without-slashes",
1008+
mount: "database",
1009+
want: "/v1/sys/events/subscribe/database*",
1010+
},
1011+
{
1012+
name: "mount-with-leading-slash",
1013+
mount: "/pki",
1014+
want: "/v1/sys/events/subscribe/pki*",
1015+
},
1016+
{
1017+
name: "empty-mount",
1018+
mount: "",
1019+
want: "/v1/sys/events/subscribe/*",
1020+
},
1021+
}
1022+
1023+
for _, tt := range tests {
1024+
t.Run(tt.name, func(t *testing.T) {
1025+
assert.Equal(t, tt.want, dynamicSecretEventPath)
1026+
})
1027+
}
1028+
}
1029+
1030+
func TestVaultDynamicSecretReconciler_streamDynamicSecretEvents(t *testing.T) {
1031+
t.Parallel()
1032+
1033+
sourceCh := make(chan event.GenericEvent, 1)
1034+
r := &VaultDynamicSecretReconciler{
1035+
Recorder: record.NewFakeRecorder(5),
1036+
SourceCh: sourceCh,
1037+
SyncRegistry: NewSyncRegistry(),
1038+
}
1039+
1040+
vds := &secretsv1beta1.VaultDynamicSecret{
1041+
ObjectMeta: metav1.ObjectMeta{
1042+
Name: "example",
1043+
Namespace: "default",
1044+
},
1045+
Spec: secretsv1beta1.VaultDynamicSecretSpec{
1046+
Mount: "database",
1047+
Path: "creds/app",
1048+
},
1049+
}
1050+
1051+
eventJSON := []byte(`{"data":{"event":{"metadata":{"path":"database/creds/app","modified":"true","operation":"creds-create"}},"namespace":"/"}}`)
1052+
wsClient := &fakeWebsocketClient{
1053+
conn: newFakeWebsocketConn(eventJSON),
1054+
}
1055+
1056+
ctx, cancel := context.WithCancel(context.Background())
1057+
t.Cleanup(cancel)
1058+
1059+
errCh := make(chan error, 1)
1060+
go func() {
1061+
errCh <- r.streamDynamicSecretEvents(ctx, vds, wsClient)
1062+
}()
1063+
1064+
select {
1065+
case evt := <-sourceCh:
1066+
assert.Equal(t, vds.GetName(), evt.Object.GetName())
1067+
assert.Equal(t, vds.GetNamespace(), evt.Object.GetNamespace())
1068+
case <-time.After(2 * time.Second):
1069+
t.Fatal("expected reconciliation event")
1070+
}
1071+
1072+
cancel()
1073+
require.Error(t, <-errCh)
1074+
}
1075+
9981076
type stubVaultClient struct {
9991077
vault.Client
10001078
cacheKey vault.ClientCacheKey

controllers/vaultstaticsecret_controller.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -371,20 +371,6 @@ func (r *VaultStaticSecretReconciler) handleDeletion(ctx context.Context, o clie
371371
return nil
372372
}
373373

374-
// eventMsg is used to extract the relevant fields from an event message sent
375-
// from Vault
376-
type eventMsg struct {
377-
Data struct {
378-
Event struct {
379-
Metadata struct {
380-
Path string `json:"path"`
381-
Modified string `json:"modified"`
382-
} `json:"metadata"`
383-
} `json:"event"`
384-
Namespace string `json:"namespace"`
385-
} `json:"data"`
386-
}
387-
388374
func (r *VaultStaticSecretReconciler) streamStaticSecretEvents(ctx context.Context, o *secretsv1beta1.VaultStaticSecret, wsClient websocketConnector) error {
389375
logger := log.FromContext(ctx).WithName("streamStaticSecretEvents")
390376
conn, err := wsClient.Connect(ctx)

0 commit comments

Comments
 (0)