diff --git a/pkg/certificates/cli.go b/pkg/certificates/cli.go index 967b40844..7639644d6 100644 --- a/pkg/certificates/cli.go +++ b/pkg/certificates/cli.go @@ -15,6 +15,8 @@ import ( "github.com/spf13/viper" ) +const receptorCertificatesAppName = "receptor-certificates" + // InitCA Initialize Certificate Authority. func InitCA(opts *CertOptions, certOut, keyOut string, osWrapper Oser) error { ca, err := CreateCA(opts, &RsaWrapper{}) @@ -273,10 +275,10 @@ func init() { if version > 1 { return } - cmdline.RegisterConfigTypeForApp("receptor-certificates", + cmdline.RegisterConfigTypeForApp(receptorCertificatesAppName, "cert-init", "Initialize PKI CA", InitCAConfig{}, cmdline.Exclusive, cmdline.Section(certSection)) - cmdline.RegisterConfigTypeForApp("receptor-certificates", + cmdline.RegisterConfigTypeForApp(receptorCertificatesAppName, "cert-makereq", "Create certificate request", MakeReqConfig{}, cmdline.Exclusive, cmdline.Section(certSection)) - cmdline.RegisterConfigTypeForApp("receptor-certificates", + cmdline.RegisterConfigTypeForApp(receptorCertificatesAppName, "cert-signreq", "Sign request and produce certificate", SignReqConfig{}, cmdline.Exclusive, cmdline.Section(certSection)) } diff --git a/pkg/netceptor/external_backend.go b/pkg/netceptor/external_backend.go index 86cb4ae78..c4a6705a6 100644 --- a/pkg/netceptor/external_backend.go +++ b/pkg/netceptor/external_backend.go @@ -12,6 +12,8 @@ import ( "github.com/gorilla/websocket" ) +const errSessionClosed = "session closed: %s" + // ExternalBackend is a backend implementation for the situation when non-Receptor code // is initiating connections, outside the control of a Receptor-managed accept loop. type ExternalBackend struct { @@ -36,7 +38,7 @@ func MessageConnFromNetConn(conn net.Conn) MessageConn { // WriteMessage writes a message to the connection. func (mc *netMessageConn) WriteMessage(ctx context.Context, data []byte) error { if ctx.Err() != nil { - return fmt.Errorf("session closed: %s", ctx.Err()) + return fmt.Errorf(errSessionClosed, ctx.Err()) } buf := mc.framer.SendData(data) n, err := mc.conn.Write(buf) @@ -59,7 +61,7 @@ func (mc *netMessageConn) ReadMessage(ctx context.Context, timeout time.Duration } for { if ctx.Err() != nil { - return nil, fmt.Errorf("session closed: %s", ctx.Err()) + return nil, fmt.Errorf(errSessionClosed, ctx.Err()) } if mc.framer.MessageReady() { break @@ -108,7 +110,7 @@ func MessageConnFromWebsocketConn(conn *websocket.Conn) MessageConn { // WriteMessage writes a message to the connection. func (mc *websocketMessageConn) WriteMessage(ctx context.Context, data []byte) error { if ctx.Err() != nil { - return fmt.Errorf("session closed: %s", ctx.Err()) + return fmt.Errorf(errSessionClosed, ctx.Err()) } return mc.conn.WriteMessage(websocket.BinaryMessage, data) @@ -117,7 +119,7 @@ func (mc *websocketMessageConn) WriteMessage(ctx context.Context, data []byte) e // ReadMessage reads a message from the connection. func (mc *websocketMessageConn) ReadMessage(ctx context.Context, _ time.Duration) ([]byte, error) { if ctx.Err() != nil { - return nil, fmt.Errorf("session closed: %s", ctx.Err()) + return nil, fmt.Errorf(errSessionClosed, ctx.Err()) } messageType, data, err := mc.conn.ReadMessage() if messageType != websocket.BinaryMessage { diff --git a/pkg/netceptor/netceptor.go b/pkg/netceptor/netceptor.go index 362056f62..9b943cb1a 100644 --- a/pkg/netceptor/netceptor.go +++ b/pkg/netceptor/netceptor.go @@ -7,6 +7,7 @@ import ( "crypto/tls" "encoding/binary" "encoding/json" + "errors" "fmt" "io" "math" @@ -42,6 +43,8 @@ const defaultMaxForwardingHops = 30 // defaultMaxConnectionIdleTime is the maximum time a connection can go without data before we consider it failed. const defaultMaxConnectionIdleTime = 2*defaultRouteUpdateTime + 1*time.Second +const errMustProvideName = "must provide a name" + // MainInstance is the global instance of Netceptor instantiated by the command-line main() function. var MainInstance *Netceptor @@ -932,7 +935,7 @@ func (s *Netceptor) GetServerTLSConfig(name string) (*tls.Config, error) { // AddWorkCommand records a work command so it can be included in service announcements. func (s *Netceptor) AddWorkCommand(command string, secure bool) error { if command == "" { - return fmt.Errorf("must provide a name") + return errors.New(errMustProvideName) } wC := WorkCommand{WorkType: command, Secure: secure} s.workCommandsLock.Lock() @@ -945,7 +948,7 @@ func (s *Netceptor) AddWorkCommand(command string, secure bool) error { // SetServerTLSConfig stores a server TLS config by name. func (s *Netceptor) SetServerTLSConfig(name string, config *tls.Config) error { if name == "" { - return fmt.Errorf("must provide a name") + return errors.New(errMustProvideName) } s.serverTLSConfigs[name] = config @@ -984,7 +987,7 @@ func (s *Netceptor) GetClientTLSConfig(name string, expectedHostName string, exp // SetClientTLSConfig stores a client TLS config by name. func (s *Netceptor) SetClientTLSConfig(name string, config *tls.Config, pinnedFingerprints [][]byte) error { if name == "" { - return fmt.Errorf("must provide a name") + return errors.New(errMustProvideName) } s.clientTLSConfigs[name] = config s.clientPinnedFingerprints[name] = pinnedFingerprints diff --git a/pkg/workceptor/command.go b/pkg/workceptor/command.go index 8c3fefae1..f95141b13 100644 --- a/pkg/workceptor/command.go +++ b/pkg/workceptor/command.go @@ -24,7 +24,10 @@ import ( "github.com/spf13/viper" ) -const errMsgStatusFileUpdate = "Error updating status file %s: %s" +const ( + errMsgStatusFileUpdate = "Error updating status file %s: %s" + receptorWorkersAppName = "receptor-workers" +) type BaseWorkUnitForWorkUnit interface { CancelContext() @@ -537,12 +540,12 @@ func init() { if version > 1 { return } - cmdline.RegisterConfigTypeForApp("receptor-workers", + cmdline.RegisterConfigTypeForApp(receptorWorkersAppName, "work-signing", "Private key to sign work submissions", SigningKeyPrivateCfg{}, cmdline.Singleton, cmdline.Section(workersSection)) - cmdline.RegisterConfigTypeForApp("receptor-workers", + cmdline.RegisterConfigTypeForApp(receptorWorkersAppName, "work-verification", "Public key to verify work submissions", VerifyingKeyPublicCfg{}, cmdline.Singleton, cmdline.Section(workersSection)) - cmdline.RegisterConfigTypeForApp("receptor-workers", + cmdline.RegisterConfigTypeForApp(receptorWorkersAppName, "work-command", "Run a worker using an external command", CommandWorkerCfg{}, cmdline.Section(workersSection)) - cmdline.RegisterConfigTypeForApp("receptor-workers", + cmdline.RegisterConfigTypeForApp(receptorWorkersAppName, "command-runner", "Wrapper around a process invocation", commandRunnerCfg{}, cmdline.Hidden) } diff --git a/pkg/workceptor/controlsvc.go b/pkg/workceptor/controlsvc.go index 6b5e8c6e6..83fa932c5 100644 --- a/pkg/workceptor/controlsvc.go +++ b/pkg/workceptor/controlsvc.go @@ -24,6 +24,18 @@ type workceptorCommand struct { params map[string]interface{} } +const ( + errFieldMissing = "field %s missing" +) + +const ( + cmdCancel = "cancel" + cmdRelease = "release" + cmdForceRelease = "force-release" + cmdStatus = "status" + cmdList = "list" +) + func (t *workceptorCommandType) InitFromString(params string) (controlsvc.ControlCommand, error) { tokens := strings.Split(params, " ") if len(tokens) == 0 { @@ -44,11 +56,11 @@ func (t *workceptorCommandType) InitFromString(params string) (controlsvc.Contro if len(tokens) > 3 { c.params["params"] = strings.Join(tokens[3:], " ") } - case "list": + case cmdList: if len(tokens) > 1 { c.params["unitid"] = tokens[1] } - case "status", "cancel", "release", "force-release": + case cmdStatus, cmdCancel, cmdRelease, cmdForceRelease: if len(tokens) < 2 { return nil, fmt.Errorf("work %s requires a unit ID", c.subcommand) } @@ -82,7 +94,7 @@ func (t *workceptorCommandType) InitFromString(params string) (controlsvc.Contro func strFromMap(config map[string]interface{}, name string) (string, error) { value, ok := config[name] if !ok { - return "", fmt.Errorf("field %s missing", name) + return "", fmt.Errorf(errFieldMissing, name) } valueStr, ok := value.(string) if !ok { @@ -96,7 +108,7 @@ func strFromMap(config map[string]interface{}, name string) (string, error) { func intFromMap(config map[string]interface{}, name string) (int64, error) { value, ok := config[name] if !ok { - return 0, fmt.Errorf("field %s missing", name) + return 0, fmt.Errorf(errFieldMissing, name) } valueInt, ok := value.(int64) if ok { @@ -120,7 +132,7 @@ func intFromMap(config map[string]interface{}, name string) (int64, error) { func boolFromMap(config map[string]interface{}, name string) (bool, error) { value, ok := config[name] if !ok { - return false, fmt.Errorf("field %s missing", name) + return false, fmt.Errorf(errFieldMissing, name) } valueBoolStr, ok := value.(string) if !ok { @@ -163,7 +175,7 @@ func (t *workceptorCommandType) InitFromJSON(config map[string]interface{}) (con if err != nil { return nil, err } - case "status", "cancel", "release", "force-release": + case cmdStatus, cmdCancel, cmdRelease, cmdForceRelease: c.params["unitid"], err = strFromMap(config, "unitid") if err != nil { return nil, err @@ -172,7 +184,7 @@ func (t *workceptorCommandType) InitFromJSON(config map[string]interface{}) (con if err == nil { c.params["signature"] = signature } - case "list": + case cmdList: unitID, err := strFromMap(config, "unitid") if err == nil { c.params["unitid"] = unitID @@ -327,7 +339,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce } return cfr, nil - case "list": + case cmdList: var unitList []string targetUnitID, ok := c.params["unitid"].(string) if ok { @@ -346,7 +358,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce } return cfr, nil - case "status": + case cmdStatus: unitid, err := strFromMap(c.params, "unitid") if err != nil { return nil, err @@ -357,7 +369,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce } return cfr, nil - case "cancel", "release", "force-release": + case cmdCancel, cmdRelease, cmdForceRelease: unitid, err := strFromMap(c.params, "unitid") if err != nil { return nil, err @@ -369,7 +381,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce cfr := make(map[string]interface{}) var pendingMsg string var completeMsg string - if c.subcommand == "cancel" { + if c.subcommand == cmdCancel { pendingMsg = "cancel pending" completeMsg = "cancelled" } else { @@ -388,10 +400,10 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce if err != nil { return nil, err } - if c.subcommand == "cancel" { + if c.subcommand == cmdCancel { err = unit.Cancel() } else { - err = unit.Release(c.subcommand == "force-release") + err = unit.Release(c.subcommand == cmdForceRelease) } if err != nil && !IsPending(err) { return nil, err diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index b3b1e433a..be23314e0 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -179,7 +179,11 @@ var ErrPodFailed = fmt.Errorf("pod failed to start") // ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled. var ErrImagePullBackOff = fmt.Errorf("container failed to start") -const WorkerContainerName = "worker" +const ( + WorkerContainerName = "worker" + errOpenStdout = "Error opening stdout file: %s" + statusPodRunning = "Pod Running" +) // podRunningAndReady is a completion criterion for pod ready to be attached to. func podRunningAndReady(kw KubeUnit) func(event watch.Event) (bool, error) { @@ -765,7 +769,7 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error { } else if err != nil { // any other error besides ErrPodCompleted stdout, err2 := NewStdoutWriter(FileSystem{}, kw.UnitDir()) if err2 != nil { - errMsg := fmt.Sprintf("Error opening stdout file: %s", err2) + errMsg := fmt.Sprintf(errOpenStdout, err2) kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) @@ -962,7 +966,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() { // open stdout writer that writes to work unit's data directory stdout, err := NewStdoutWriter(FileSystem{}, kw.UnitDir()) if err != nil { - errMsg := fmt.Sprintf("Error opening stdout file: %s", err) + errMsg := fmt.Sprintf(errOpenStdout, err) kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) @@ -989,7 +993,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() { streamWait.Add(2) if skipStdin { - kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) + kw.UpdateBasicStatus(WorkStateRunning, statusPodRunning, stdout.Size()) streamWait.Done() } else { retryCount := kw.GetKubeRetryCount() @@ -1125,7 +1129,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() { close(stdinErrChan) // signal STDOUT goroutine to stop } else { if stdin.Error() == io.EOF { - kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) + kw.UpdateBasicStatus(WorkStateRunning, statusPodRunning, stdout.Size()) } else { // this is probably not possible... errMsg := fmt.Sprintf("Error reading stdin: %s", stdin.Error()) @@ -1421,7 +1425,7 @@ func (kw *KubeUnit) runWorkUsingTCP() { // Open stdout writer stdout, err := NewStdoutWriter(FileSystem{}, kw.UnitDir()) if err != nil { - errMsg := fmt.Sprintf("Error opening stdout file: %s", err) + errMsg := fmt.Sprintf(errOpenStdout, err) kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) cancel() @@ -1465,7 +1469,7 @@ func (kw *KubeUnit) runWorkUsingTCP() { case <-stdin.Done(): err := stdin.Error() if err == io.EOF { - kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size()) + kw.UpdateBasicStatus(WorkStateRunning, statusPodRunning, stdout.Size()) } else { kw.UpdateBasicStatus(WorkStateFailed, fmt.Sprintf("Error reading stdin: %s", err), stdout.Size()) cancel() diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 3532e5068..669abf43f 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -22,7 +22,10 @@ import ( "github.com/ansible/receptor/pkg/utils" ) -const errMsgRemoteExtraDataMissing = "remote ExtraData missing" +const ( + errMsgRemoteExtraDataMissing = "remote ExtraData missing" + errRemoteRead = "read error reading from %s: %s" +) // remoteUnit implements the WorkUnit interface for the Receptor remote worker plugin. type remoteUnit struct { @@ -205,7 +208,7 @@ func (rw *remoteUnit) StartRemoteUnit(ctx context.Context, conn net.Conn, reader } response, err := utils.ReadStringContext(ctx, reader, '\n') if err != nil { - return fmt.Errorf("read error reading from %s: %s", red.RemoteNode, err) + return fmt.Errorf(errRemoteRead, red.RemoteNode, err) } submitIDRegex := regexp.MustCompile(`with ID ([a-zA-Z0-9]+)\.`) match := submitIDRegex.FindSubmatch([]byte(response)) @@ -237,7 +240,7 @@ func (rw *remoteUnit) StartRemoteUnit(ctx context.Context, conn net.Conn, reader } response, err = utils.ReadStringContext(ctx, reader, '\n') if err != nil { - return fmt.Errorf("read error reading from %s: %s", red.RemoteNode, err) + return fmt.Errorf(errRemoteRead, red.RemoteNode, err) } resultErrorRegex := regexp.MustCompile("ERROR: (.*)") match = resultErrorRegex.FindSubmatch([]byte(response)) @@ -286,7 +289,7 @@ func (rw *remoteUnit) cancelOrReleaseRemoteUnit(ctx context.Context, conn net.Co } response, err := utils.ReadStringContext(ctx, reader, '\n') if err != nil { - return fmt.Errorf("read error reading from %s: %s", red.RemoteNode, err) + return fmt.Errorf(errRemoteRead, red.RemoteNode, err) } if response[:5] == "ERROR" { return fmt.Errorf("error cancelling remote unit: %s", response[6:])