Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/certificates/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/spf13/viper"
)

const receptorCertificatesAppName = "receptor-certificates"

// InitCA Initialize Certificate Authority.
func InitCA(opts *CertOptions, certOut, keyOut string, osWrapper Oser) error {
ca, err := CreateCA(opts, &RsaWrapper{})
Expand Down Expand Up @@ -273,10 +275,10 @@ func init() {
if version > 1 {
return
}
cmdline.RegisterConfigTypeForApp("receptor-certificates",
cmdline.RegisterConfigTypeForApp(receptorCertificatesAppName,
"cert-init", "Initialize PKI CA", InitCAConfig{}, cmdline.Exclusive, cmdline.Section(certSection))
cmdline.RegisterConfigTypeForApp("receptor-certificates",
cmdline.RegisterConfigTypeForApp(receptorCertificatesAppName,
"cert-makereq", "Create certificate request", MakeReqConfig{}, cmdline.Exclusive, cmdline.Section(certSection))
cmdline.RegisterConfigTypeForApp("receptor-certificates",
cmdline.RegisterConfigTypeForApp(receptorCertificatesAppName,
"cert-signreq", "Sign request and produce certificate", SignReqConfig{}, cmdline.Exclusive, cmdline.Section(certSection))
}
10 changes: 6 additions & 4 deletions pkg/netceptor/external_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/gorilla/websocket"
)

const errSessionClosed = "session closed: %s"

// ExternalBackend is a backend implementation for the situation when non-Receptor code
// is initiating connections, outside the control of a Receptor-managed accept loop.
type ExternalBackend struct {
Expand All @@ -36,7 +38,7 @@ func MessageConnFromNetConn(conn net.Conn) MessageConn {
// WriteMessage writes a message to the connection.
func (mc *netMessageConn) WriteMessage(ctx context.Context, data []byte) error {
if ctx.Err() != nil {
return fmt.Errorf("session closed: %s", ctx.Err())
return fmt.Errorf(errSessionClosed, ctx.Err())
}
buf := mc.framer.SendData(data)
n, err := mc.conn.Write(buf)
Expand All @@ -59,7 +61,7 @@ func (mc *netMessageConn) ReadMessage(ctx context.Context, timeout time.Duration
}
for {
if ctx.Err() != nil {
return nil, fmt.Errorf("session closed: %s", ctx.Err())
return nil, fmt.Errorf(errSessionClosed, ctx.Err())
}
if mc.framer.MessageReady() {
break
Expand Down Expand Up @@ -108,7 +110,7 @@ func MessageConnFromWebsocketConn(conn *websocket.Conn) MessageConn {
// WriteMessage writes a message to the connection.
func (mc *websocketMessageConn) WriteMessage(ctx context.Context, data []byte) error {
if ctx.Err() != nil {
return fmt.Errorf("session closed: %s", ctx.Err())
return fmt.Errorf(errSessionClosed, ctx.Err())
}

return mc.conn.WriteMessage(websocket.BinaryMessage, data)
Expand All @@ -117,7 +119,7 @@ func (mc *websocketMessageConn) WriteMessage(ctx context.Context, data []byte) e
// ReadMessage reads a message from the connection.
func (mc *websocketMessageConn) ReadMessage(ctx context.Context, _ time.Duration) ([]byte, error) {
if ctx.Err() != nil {
return nil, fmt.Errorf("session closed: %s", ctx.Err())
return nil, fmt.Errorf(errSessionClosed, ctx.Err())
}
messageType, data, err := mc.conn.ReadMessage()
if messageType != websocket.BinaryMessage {
Expand Down
9 changes: 6 additions & 3 deletions pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/tls"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -42,6 +43,8 @@ const defaultMaxForwardingHops = 30
// defaultMaxConnectionIdleTime is the maximum time a connection can go without data before we consider it failed.
const defaultMaxConnectionIdleTime = 2*defaultRouteUpdateTime + 1*time.Second

const errMustProvideName = "must provide a name"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const errMustProvideName = "must provide a name"
const errMustProvideName = "must provide a %s name"


// MainInstance is the global instance of Netceptor instantiated by the command-line main() function.
var MainInstance *Netceptor

Expand Down Expand Up @@ -932,7 +935,7 @@ func (s *Netceptor) GetServerTLSConfig(name string) (*tls.Config, error) {
// AddWorkCommand records a work command so it can be included in service announcements.
func (s *Netceptor) AddWorkCommand(command string, secure bool) error {
if command == "" {
return fmt.Errorf("must provide a name")
return errors.New(errMustProvideName)
Copy link
Copy Markdown
Contributor

@arrestle arrestle Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I dislike that all these error messages are identical. Can we take this opportunity to mention which name is missing?

Suggested change
return errors.New(errMustProvideName)
return errors.fmt(errMustProvideName,"Work Command")

}
wC := WorkCommand{WorkType: command, Secure: secure}
s.workCommandsLock.Lock()
Expand All @@ -945,7 +948,7 @@ func (s *Netceptor) AddWorkCommand(command string, secure bool) error {
// SetServerTLSConfig stores a server TLS config by name.
func (s *Netceptor) SetServerTLSConfig(name string, config *tls.Config) error {
if name == "" {
return fmt.Errorf("must provide a name")
return errors.New(errMustProvideName)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.New(errMustProvideName)
return errors.fmt(errMustProvideName,"Server TLS Config")

}
s.serverTLSConfigs[name] = config

Expand Down Expand Up @@ -984,7 +987,7 @@ func (s *Netceptor) GetClientTLSConfig(name string, expectedHostName string, exp
// SetClientTLSConfig stores a client TLS config by name.
func (s *Netceptor) SetClientTLSConfig(name string, config *tls.Config, pinnedFingerprints [][]byte) error {
if name == "" {
return fmt.Errorf("must provide a name")
return errors.New(errMustProvideName)
}
s.clientTLSConfigs[name] = config
s.clientPinnedFingerprints[name] = pinnedFingerprints
Expand Down
13 changes: 8 additions & 5 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"github.com/spf13/viper"
)

const errMsgStatusFileUpdate = "Error updating status file %s: %s"
const (
errMsgStatusFileUpdate = "Error updating status file %s: %s"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I see this error message has parameters.

receptorWorkersAppName = "receptor-workers"
)

type BaseWorkUnitForWorkUnit interface {
CancelContext()
Expand Down Expand Up @@ -537,12 +540,12 @@ func init() {
if version > 1 {
return
}
cmdline.RegisterConfigTypeForApp("receptor-workers",
cmdline.RegisterConfigTypeForApp(receptorWorkersAppName,
"work-signing", "Private key to sign work submissions", SigningKeyPrivateCfg{}, cmdline.Singleton, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
cmdline.RegisterConfigTypeForApp(receptorWorkersAppName,
"work-verification", "Public key to verify work submissions", VerifyingKeyPublicCfg{}, cmdline.Singleton, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
cmdline.RegisterConfigTypeForApp(receptorWorkersAppName,
"work-command", "Run a worker using an external command", CommandWorkerCfg{}, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
cmdline.RegisterConfigTypeForApp(receptorWorkersAppName,
"command-runner", "Wrapper around a process invocation", commandRunnerCfg{}, cmdline.Hidden)
}
38 changes: 25 additions & 13 deletions pkg/workceptor/controlsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ type workceptorCommand struct {
params map[string]interface{}
}

const (
errFieldMissing = "field %s missing"
)

const (
cmdCancel = "cancel"
cmdRelease = "release"
cmdForceRelease = "force-release"
cmdStatus = "status"
cmdList = "list"
)

func (t *workceptorCommandType) InitFromString(params string) (controlsvc.ControlCommand, error) {
tokens := strings.Split(params, " ")
if len(tokens) == 0 {
Expand All @@ -44,11 +56,11 @@ func (t *workceptorCommandType) InitFromString(params string) (controlsvc.Contro
if len(tokens) > 3 {
c.params["params"] = strings.Join(tokens[3:], " ")
}
case "list":
case cmdList:
if len(tokens) > 1 {
c.params["unitid"] = tokens[1]
}
case "status", "cancel", "release", "force-release":
case cmdStatus, cmdCancel, cmdRelease, cmdForceRelease:
if len(tokens) < 2 {
return nil, fmt.Errorf("work %s requires a unit ID", c.subcommand)
}
Expand Down Expand Up @@ -82,7 +94,7 @@ func (t *workceptorCommandType) InitFromString(params string) (controlsvc.Contro
func strFromMap(config map[string]interface{}, name string) (string, error) {
value, ok := config[name]
if !ok {
return "", fmt.Errorf("field %s missing", name)
return "", fmt.Errorf(errFieldMissing, name)
}
valueStr, ok := value.(string)
if !ok {
Expand All @@ -96,7 +108,7 @@ func strFromMap(config map[string]interface{}, name string) (string, error) {
func intFromMap(config map[string]interface{}, name string) (int64, error) {
value, ok := config[name]
if !ok {
return 0, fmt.Errorf("field %s missing", name)
return 0, fmt.Errorf(errFieldMissing, name)
}
valueInt, ok := value.(int64)
if ok {
Expand All @@ -120,7 +132,7 @@ func intFromMap(config map[string]interface{}, name string) (int64, error) {
func boolFromMap(config map[string]interface{}, name string) (bool, error) {
value, ok := config[name]
if !ok {
return false, fmt.Errorf("field %s missing", name)
return false, fmt.Errorf(errFieldMissing, name)
}
valueBoolStr, ok := value.(string)
if !ok {
Expand Down Expand Up @@ -163,7 +175,7 @@ func (t *workceptorCommandType) InitFromJSON(config map[string]interface{}) (con
if err != nil {
return nil, err
}
case "status", "cancel", "release", "force-release":
case cmdStatus, cmdCancel, cmdRelease, cmdForceRelease:
c.params["unitid"], err = strFromMap(config, "unitid")
if err != nil {
return nil, err
Expand All @@ -172,7 +184,7 @@ func (t *workceptorCommandType) InitFromJSON(config map[string]interface{}) (con
if err == nil {
c.params["signature"] = signature
}
case "list":
case cmdList:
unitID, err := strFromMap(config, "unitid")
if err == nil {
c.params["unitid"] = unitID
Expand Down Expand Up @@ -327,7 +339,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce
}

return cfr, nil
case "list":
case cmdList:
var unitList []string
targetUnitID, ok := c.params["unitid"].(string)
if ok {
Expand All @@ -346,7 +358,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce
}

return cfr, nil
case "status":
case cmdStatus:
unitid, err := strFromMap(c.params, "unitid")
if err != nil {
return nil, err
Expand All @@ -357,7 +369,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce
}

return cfr, nil
case "cancel", "release", "force-release":
case cmdCancel, cmdRelease, cmdForceRelease:
unitid, err := strFromMap(c.params, "unitid")
if err != nil {
return nil, err
Expand All @@ -369,7 +381,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce
cfr := make(map[string]interface{})
var pendingMsg string
var completeMsg string
if c.subcommand == "cancel" {
if c.subcommand == cmdCancel {
pendingMsg = "cancel pending"
completeMsg = "cancelled"
} else {
Expand All @@ -388,10 +400,10 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc controlsvc.Netce
if err != nil {
return nil, err
}
if c.subcommand == "cancel" {
if c.subcommand == cmdCancel {
err = unit.Cancel()
} else {
err = unit.Release(c.subcommand == "force-release")
err = unit.Release(c.subcommand == cmdForceRelease)
}
if err != nil && !IsPending(err) {
return nil, err
Expand Down
18 changes: 11 additions & 7 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ var ErrPodFailed = fmt.Errorf("pod failed to start")
// ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled.
var ErrImagePullBackOff = fmt.Errorf("container failed to start")

const WorkerContainerName = "worker"
const (
WorkerContainerName = "worker"
errOpenStdout = "Error opening stdout file: %s"
statusPodRunning = "Pod Running"
)

// podRunningAndReady is a completion criterion for pod ready to be attached to.
func podRunningAndReady(kw KubeUnit) func(event watch.Event) (bool, error) {
Expand Down Expand Up @@ -765,7 +769,7 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error {
} else if err != nil { // any other error besides ErrPodCompleted
stdout, err2 := NewStdoutWriter(FileSystem{}, kw.UnitDir())
if err2 != nil {
errMsg := fmt.Sprintf("Error opening stdout file: %s", err2)
errMsg := fmt.Sprintf(errOpenStdout, err2)
kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)

Expand Down Expand Up @@ -962,7 +966,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
// open stdout writer that writes to work unit's data directory
stdout, err := NewStdoutWriter(FileSystem{}, kw.UnitDir())
if err != nil {
errMsg := fmt.Sprintf("Error opening stdout file: %s", err)
errMsg := fmt.Sprintf(errOpenStdout, err)
kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)

Expand All @@ -989,7 +993,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
streamWait.Add(2)

if skipStdin {
kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size())
kw.UpdateBasicStatus(WorkStateRunning, statusPodRunning, stdout.Size())
streamWait.Done()
} else {
retryCount := kw.GetKubeRetryCount()
Expand Down Expand Up @@ -1125,7 +1129,7 @@ func (kw *KubeUnit) RunWorkUsingLogger() {
close(stdinErrChan) // signal STDOUT goroutine to stop
} else {
if stdin.Error() == io.EOF {
kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size())
kw.UpdateBasicStatus(WorkStateRunning, statusPodRunning, stdout.Size())
} else {
// this is probably not possible...
errMsg := fmt.Sprintf("Error reading stdin: %s", stdin.Error())
Expand Down Expand Up @@ -1421,7 +1425,7 @@ func (kw *KubeUnit) runWorkUsingTCP() {
// Open stdout writer
stdout, err := NewStdoutWriter(FileSystem{}, kw.UnitDir())
if err != nil {
errMsg := fmt.Sprintf("Error opening stdout file: %s", err)
errMsg := fmt.Sprintf(errOpenStdout, err)
kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)
cancel()
Expand Down Expand Up @@ -1465,7 +1469,7 @@ func (kw *KubeUnit) runWorkUsingTCP() {
case <-stdin.Done():
err := stdin.Error()
if err == io.EOF {
kw.UpdateBasicStatus(WorkStateRunning, "Pod Running", stdout.Size())
kw.UpdateBasicStatus(WorkStateRunning, statusPodRunning, stdout.Size())
} else {
kw.UpdateBasicStatus(WorkStateFailed, fmt.Sprintf("Error reading stdin: %s", err), stdout.Size())
cancel()
Expand Down
11 changes: 7 additions & 4 deletions pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/ansible/receptor/pkg/utils"
)

const errMsgRemoteExtraDataMissing = "remote ExtraData missing"
const (
errMsgRemoteExtraDataMissing = "remote ExtraData missing"
errRemoteRead = "read error reading from %s: %s"
)

// remoteUnit implements the WorkUnit interface for the Receptor remote worker plugin.
type remoteUnit struct {
Expand Down Expand Up @@ -205,7 +208,7 @@ func (rw *remoteUnit) StartRemoteUnit(ctx context.Context, conn net.Conn, reader
}
response, err := utils.ReadStringContext(ctx, reader, '\n')
if err != nil {
return fmt.Errorf("read error reading from %s: %s", red.RemoteNode, err)
return fmt.Errorf(errRemoteRead, red.RemoteNode, err)
}
submitIDRegex := regexp.MustCompile(`with ID ([a-zA-Z0-9]+)\.`)
match := submitIDRegex.FindSubmatch([]byte(response))
Expand Down Expand Up @@ -237,7 +240,7 @@ func (rw *remoteUnit) StartRemoteUnit(ctx context.Context, conn net.Conn, reader
}
response, err = utils.ReadStringContext(ctx, reader, '\n')
if err != nil {
return fmt.Errorf("read error reading from %s: %s", red.RemoteNode, err)
return fmt.Errorf(errRemoteRead, red.RemoteNode, err)
}
resultErrorRegex := regexp.MustCompile("ERROR: (.*)")
match = resultErrorRegex.FindSubmatch([]byte(response))
Expand Down Expand Up @@ -286,7 +289,7 @@ func (rw *remoteUnit) cancelOrReleaseRemoteUnit(ctx context.Context, conn net.Co
}
response, err := utils.ReadStringContext(ctx, reader, '\n')
if err != nil {
return fmt.Errorf("read error reading from %s: %s", red.RemoteNode, err)
return fmt.Errorf(errRemoteRead, red.RemoteNode, err)
}
if response[:5] == "ERROR" {
return fmt.Errorf("error cancelling remote unit: %s", response[6:])
Expand Down
Loading