diff --git a/examples/cred-sync-secert.yaml b/examples/cred-sync-secert.yaml index cea89ab3..8c5c944f 100644 --- a/examples/cred-sync-secert.yaml +++ b/examples/cred-sync-secert.yaml @@ -4,9 +4,9 @@ data: SERVICE-CRED-1: e2VudGl0eU5hbWU6ZGIsIHVzZXJOYW1lOnRlc3R1c2VyLHBhc3N3b3JkOnRlc3Rwd2R9Cg== # CERTS-: `echo '{"entityName":"customer-client", "certIndetifier":"capten1","caCert":"LS0tLS1CRUdJTiB", "cert": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JS", "key":"LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBL"}' | base64 -w 0` CERTS-1: eyJlbnRpdHlOYW1lIjoiY3VzdG9tZXItY2xpZW50IiwgImNlcnRJbmRldGlmaWVyIjoiY2FwdGVuMSIsImNhQ2VydCI6IkxTMHRMUzFDUlVkSlRpQiIsICJjZXJ0IjogIkxTMHRMUzFDUlVkSlRpQkRSVkpVU1VaSlEwRlVSUzB0TFMwdENrMUpTIiwgImtleSI6IkxTMHRMUzFDUlVkSlRpQlNVMEVnVUZKSlZrRlVSU0JMIn0K + CLUSTER-CRED-1: eyJjcmVkZW50aWFsVHlwZSI6ImNsdXN0ZXItY3JlZCIsImVudGl0eU5hbWUiOiJhc3RyYSIsICJjcmVkSW5kZXRpZmllciI6ImF1dGhUb2tlbiIsICJjcmVkZW50aWFsIjp7ImNsdXN0ZXJJZCI6ICJiN2YxNjQwZS01NDg4LTRmYzQtOTIzMC0xMGY1OGUxOTVlMWEiLCJ0b2tlbiI6IkFzdHJhQ1M6ZFFPVUd5TERyeEJzTEpVUGJkUnF6d0RzOmNlMzUzZDg3ZjE0NGM0NmQ3NDBiNDg4OWNhYTg0MGMwMzI1YWEwZjhiYjIwZWVmODkxYzllZWZiYTA1NTEzMmIifX0K + #GENERIC-1: `echo '{"credentialType":"cluster-cred","entityName":"astra", "credIndetifier":"authToken", "credential":{"clusterId": "b7f1640e-5488-4fc4-9230-10f58e195e1a","token":"AstraCS:dQOUGyLDrxBsLJUPbdRqzwDs:ce353d87f144c46d740b4889caa840c0325aa0f8bb20eef891c9eefba055132b"}}' | base64 -w 0` - #GENERIC-1: `echo '{"credentialType":"cluster","entityName":"astra", "credIndetifier":"authToken", "credential":{"clusterId": "b7f1640e-5488-4fc4-9230-10f58e195e1a","token":"AstraCS:dQOUGyLDrxBsLJUPbdRqzwDs:ce353d87f144c46d740b4889caa840c0325aa0f8bb20eef891c9eefba055132b"}}' | base64 -w 0` - GENERIC-1: eyJjcmVkZW50aWFsVHlwZSI6ImNsdXN0ZXIiLCJlbnRpdHlOYW1lIjoiYXN0cmEiLCAiY3JlZEluZGV0aWZpZXIiOiJhdXRoVG9rZW4iLCAiY3JlZGVudGlhbCI6eyJjbHVzdGVySWQiOiAiYjdmMTY0MGUtNTQ4OC00ZmM0LTkyMzAtMTBmNThlMTk1ZTFhIiwidG9rZW4iOiJBc3RyYUNTOmRRT1VHeUxEcnhCc0xKVVBiZFJxendEczpjZTM1M2Q4N2YxNDRjNDZkNzQwYjQ4ODljYWE4NDBjMDMyNWFhMGY4YmIyMGVlZjg5MWM5ZWVmYmEwNTUxMzJiIn19Cg== kind: Secret metadata: diff --git a/internal/client/inform-change.go b/internal/client/inform-change.go new file mode 100644 index 00000000..b3abf9ef --- /dev/null +++ b/internal/client/inform-change.go @@ -0,0 +1,32 @@ +package client + +import ( + "time" + + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +type AddObjectFunc func(obj interface{}) +type UpdateObjectFunc func(oldObj, newObj interface{}) +type DeleteObjectFunc func(obj interface{}) + +func (k *K8SClient) RegisterConfigMapChangeHandler(addFunc AddObjectFunc, + updateFn UpdateObjectFunc, deleteFunc DeleteObjectFunc) { + informerFactory := informers.NewSharedInformerFactory(k.client, time.Second*30) + configMapInformer := informerFactory.Core().V1().ConfigMaps().Informer() + configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: addFunc, + UpdateFunc: updateFn, + DeleteFunc: deleteFunc, + }) + k.configMapInformer = configMapInformer +} + +func (k *K8SClient) StartObjectChangeInformer() { + stopCh := make(chan struct{}) + defer close(stopCh) + go k.informerFactory.Start(stopCh) + k.informerFactory.WaitForCacheSync(stopCh) + <-stopCh +} diff --git a/internal/client/k8s.go b/internal/client/k8s.go index a0a0f5be..be07006c 100644 --- a/internal/client/k8s.go +++ b/internal/client/k8s.go @@ -4,7 +4,7 @@ import ( "context" "strings" "time" - + "k8s.io/client-go/tools/cache" "github.com/intelops/go-common/logging" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -12,11 +12,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/informers" ) type K8SClient struct { client *kubernetes.Clientset log logging.Logger + configMapInformer cache.SharedIndexInformer + informerFactory informers.SharedInformerFactory } type ConfigMapData struct { diff --git a/internal/job/vault-cred-sync.go b/internal/job/vault-cred-sync.go index 70c084b6..80716e22 100644 --- a/internal/job/vault-cred-sync.go +++ b/internal/job/vault-cred-sync.go @@ -189,6 +189,7 @@ func (v *VaultCredSync) storeGenericCredential(ctx context.Context, vc *client.V } cred := map[string]string{} + for key, val := range genericCredData.Credential { cred[key] = val } diff --git a/server/server.go b/server/server.go index 685f4cec..439062ce 100644 --- a/server/server.go +++ b/server/server.go @@ -7,6 +7,10 @@ import ( "os/signal" "syscall" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" + + "github.com/intelops/vault-cred/internal/client" "github.com/intelops/vault-cred/internal/job" "github.com/intelops/go-common/logging" @@ -50,14 +54,14 @@ func Start() { } }() - s := initScheduler(log, cfg) - s.Start() - + //s := initScheduler(log, cfg) + //s.Start() + startConfigMapChangeHandler(log) signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) <-signals - s.Stop() + //s.Stop() grpcServer.Stop() log.Debug("exiting vault-cred server") } @@ -100,3 +104,53 @@ func initScheduler(log logging.Logger, cfg config.Configuration) (s *job.Schedul } return } +func startConfigMapChangeHandler(log logging.Logger) { + k8sClient, err := client.NewK8SClient(log) + if k8sClient == nil { + log.Errorf("K8sClient", k8sClient) + } + + if err != nil { + log.Errorf("Error while connecting to k8s", k8sClient) + } + //k8sClient, _ := k8s.NewK8SClient(log) + workQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + + addFunc := func(obj interface{}) { + configMap := obj.(*v1.ConfigMap) + fmt.Printf("ConfigMap added: %s\n", configMap.Name) + workQueue.Add(configMap.Name) + } + + updateFunc := func(oldObj, newObj interface{}) { + newConfigMap := newObj.(*v1.ConfigMap) + fmt.Printf("ConfigMap updated: %s\n", newConfigMap.Name) + workQueue.Add(newConfigMap.Name) + } + + deleteFunc := func(obj interface{}) { + configMap := obj.(*v1.ConfigMap) + fmt.Printf("ConfigMap deleted: %s\n", configMap.Name) + workQueue.Add(configMap.Name) + } + + k8sClient.RegisterConfigMapChangeHandler(addFunc, updateFunc, deleteFunc) + go k8sClient.StartObjectChangeInformer() + go processEvents(workQueue) +} +func processEvents(workQueue workqueue.RateLimitingInterface) { + for { + // Retrieve an item from the work queue + item, shutdown := workQueue.Get() + if shutdown { + return + } + + // Handle the item (perform your desired actions here) + configMapName := item.(string) + fmt.Printf("Processing event for ConfigMap: %s\n", configMapName) + + // Mark the item as processed + workQueue.Done(item) + } +}