diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 31b0ad8f..bde0532d 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "net" "net/http" "os" "strings" @@ -25,6 +26,7 @@ import ( "k8s.io/client-go/kubernetes" clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/jetstack/preflight/api" @@ -50,7 +52,12 @@ const schemaVersion string = "v2.0.0" // Run starts the agent process func Run(cmd *cobra.Command, args []string) { logs.Log.Printf("Preflight agent version: %s (%s)", version.PreflightVersion, version.Commit) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel( + klog.NewContext( + context.Background(), + klog.Background(), + ), + ) defer cancel() file, err := os.Open(Flags.ConfigFilePath) @@ -83,11 +90,13 @@ func Run(cmd *cobra.Command, args []string) { } }() - group.Go(func() error { + { server := http.NewServeMux() + const serverAddress = ":8081" + log := klog.FromContext(ctx).WithName("APIServer").WithValues("addr", serverAddress) if Flags.Profiling { - logs.Log.Printf("pprof profiling was enabled.") + log.Info("Profiling endpoints enabled", "path", "/debug/pprof") server.HandleFunc("/debug/pprof/", pprof.Index) server.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) server.HandleFunc("/debug/pprof/profile", pprof.Profile) @@ -96,7 +105,7 @@ func Run(cmd *cobra.Command, args []string) { } if Flags.Prometheus { - logs.Log.Printf("Prometheus was enabled.\nRunning prometheus on port :8081") + log.Info("Metrics endpoints enabled", "path", "/metrics") prometheus.MustRegister(metricPayloadSize) server.Handle("/metrics", promhttp.Handler()) } @@ -105,21 +114,32 @@ func Run(cmd *cobra.Command, args []string) { // what "ready" means for the agent, we just return 200 OK inconditionally. // The goal is to satisfy some Kubernetes distributions, like OpenShift, // that require a liveness and health probe to be present for each pod. + log.Info("Healthz endpoints enabled", "path", "/healthz") server.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) + log.Info("Readyz endpoints enabled", "path", "/readyz") server.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) - err := http.ListenAndServe(":8081", server) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - return fmt.Errorf("failed to run the health check server: %s", err) - } - // The agent must stop if the management server stops - cancel() - return nil - }) + group.Go(func() error { + err := listenAndServe( + klog.NewContext(gctx, log), + &http.Server{ + Addr: serverAddress, + Handler: server, + BaseContext: func(_ net.Listener) context.Context { + return gctx + }, + }, + ) + if err != nil { + return fmt.Errorf("APIServer: %s", err) + } + return nil + }) + } _, isVenConn := preflightClient.(*client.VenConnClient) if isVenConn { @@ -412,3 +432,48 @@ func postData(config CombinedConfig, preflightClient client.Client, readings []* return nil } + +// listenAndServe starts the supplied HTTP server and stops it gracefully when +// the supplied context is cancelled. +// It returns when the graceful server shutdown is complete or when the server +// exits with an error. +// If the server fails to start, it returns the server error. +// If the server fails to shutdown gracefully, it returns the shutdown error. +// The server is given 3 seconds to shutdown gracefully before it is stopped +// forcefully. +func listenAndServe(ctx context.Context, server *http.Server) error { + log := klog.FromContext(ctx).WithName("ListenAndServe") + + log.V(1).Info("Starting") + + listenCTX, listenCancelCause := context.WithCancelCause(context.WithoutCancel(ctx)) + go func() { + err := server.ListenAndServe() + listenCancelCause(fmt.Errorf("ListenAndServe: %s", err)) + }() + + select { + case <-listenCTX.Done(): + log.V(1).Info("Shutdown skipped", "reason", "Server already stopped") + return context.Cause(listenCTX) + + case <-ctx.Done(): + log.V(1).Info("Shutting down") + } + + shutdownCTX, shutdownCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*3) + shutdownErr := server.Shutdown(shutdownCTX) + shutdownCancel() + if shutdownErr != nil { + shutdownErr = fmt.Errorf("Shutdown: %s", shutdownErr) + } + + closeErr := server.Close() + if closeErr != nil { + closeErr = fmt.Errorf("Close: %s", closeErr) + } + + log.V(1).Info("Shutdown complete") + + return errors.Join(shutdownErr, closeErr) +} diff --git a/pkg/agent/run_test.go b/pkg/agent/run_test.go new file mode 100644 index 00000000..69025950 --- /dev/null +++ b/pkg/agent/run_test.go @@ -0,0 +1,72 @@ +package agent + +import ( + "bytes" + "context" + "os" + "os/exec" + "testing" + "time" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/require" + "k8s.io/klog/v2" + + "github.com/jetstack/preflight/pkg/logs" +) + +// TestRunOneShot runs the agent in `--one-shot` mode and verifies that it exits +// after the first data gathering iteration. +func TestRunOneShot(t *testing.T) { + if _, found := os.LookupEnv("GO_CHILD"); found { + // Silence the warning about missing pod name for event generation + // TODO(wallrj): This should not be required when an `--input-file` has been supplied. + t.Setenv("POD_NAME", "venafi-kubernetes-e2e") + // Silence the error about missing kubeconfig. + // TODO(wallrj): This should not be required when an `--input-file` has been supplied. + t.Setenv("KUBECONFIG", "testdata/one-shot/success/kubeconfig.yaml") + + c := &cobra.Command{} + InitAgentCmdFlags(c, &Flags) + logs.AddFlags(c.Flags()) + + err := c.ParseFlags([]string{ + "--one-shot", + // TODO(wallrj): This should not be required when an `--input-file` has been supplied. + "--api-token=should-not-be-required", + // TODO(wallrj): This should not be required when an `--input-file` has been supplied. + "--install-namespace=default", + "--agent-config-file=testdata/one-shot/success/config.yaml", + "--input-path=testdata/one-shot/success/input.json", + "--output-path=/dev/null", + "-v=1", + }) + require.NoError(t, err) + + logs.Initialize() + Run(c, nil) + klog.Flush() + return + } + t.Log("Running child process") + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=^TestRunOneShot$") + var ( + stdout bytes.Buffer + stderr bytes.Buffer + ) + cmd.Stdout = &stdout + cmd.Stderr = &stderr + cmd.Env = append( + os.Environ(), + "GO_CHILD=true", + ) + err := cmd.Run() + + stdoutStr := stdout.String() + stderrStr := stderr.String() + t.Logf("STDOUT\n%s\n", stdoutStr) + t.Logf("STDERR\n%s\n", stderrStr) + require.NoError(t, err, context.Cause(ctx)) +} diff --git a/pkg/agent/testdata/one-shot/success/config.yaml b/pkg/agent/testdata/one-shot/success/config.yaml new file mode 100644 index 00000000..51688b26 --- /dev/null +++ b/pkg/agent/testdata/one-shot/success/config.yaml @@ -0,0 +1,4 @@ +# Just enough venafi-kubernetes-agent config to allow it to run with an input +# file in one-shot mode. +cluster_id: "venafi-kubernetes-agent-e2e" +organization_id: "venafi-kubernetes-agent-e2e" diff --git a/pkg/agent/testdata/one-shot/success/input.json b/pkg/agent/testdata/one-shot/success/input.json new file mode 100644 index 00000000..fe51488c --- /dev/null +++ b/pkg/agent/testdata/one-shot/success/input.json @@ -0,0 +1 @@ +[] diff --git a/pkg/agent/testdata/one-shot/success/kubeconfig.yaml b/pkg/agent/testdata/one-shot/success/kubeconfig.yaml new file mode 100644 index 00000000..993dcec3 --- /dev/null +++ b/pkg/agent/testdata/one-shot/success/kubeconfig.yaml @@ -0,0 +1,15 @@ +# Just enough kubeconfig to satisfy client-go +apiVersion: v1 +kind: Config +current-context: cluster-1 +contexts: +- name: cluster-1 + context: + cluster: cluster-1 + user: user-1 +clusters: +- name: cluster-1 + cluster: + server: https://192.0.2.1:8443 +preferences: {} +users: []