Skip to content

Commit bf86880

Browse files
author
Renato Costa
committed
roachprod: return structured data from roachprod.Monitor
Previously, the `Monitor` function would return semi-structured data in the form of a `(node, message)` pair. The `message` field corresponded to the output (as in, stdout) of the bash script that implements the monitor logic. This tied callers to the specific implementation of the function and to the strings passed to `echo` in that script (e.g., all the checks for the `"dead"` string in the message). This commit updates the function to instead return an event channel. Events can be of different types corresponding to each of the events that the shell script is able to emit (e.g., when the cockroach process is found to be running, when it stops, etc). This makes the parsing of the script output private, and simplifies the logic in most callers. The message returned by the `roachtest` wrapper is also changed slightly in this commit to make it clearer where the error is coming from: the function passed to `monitor.Go` or the monitor process itself. This has been a source of confusion in the past. Finally, the monitor implementation in roachprod is changed slightly to avoid blocking on channel sends if the context has already been canceled. This avoids leaked goroutines in tests, as canceling the context passed to `Monitor()` should cause all goroutines to eventually finish instead of blocking indefinitely. Epic: CRDB-19321 Release note: None
1 parent 4c39761 commit bf86880

File tree

5 files changed

+136
-63
lines changed

5 files changed

+136
-63
lines changed

pkg/cmd/roachprod/main.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -650,21 +650,15 @@ of nodes, outputting a line whenever a change is detected:
650650
`,
651651
Args: cobra.ExactArgs(1),
652652
Run: wrap(func(cmd *cobra.Command, args []string) error {
653-
messages, err := roachprod.Monitor(context.Background(), config.Logger, args[0], monitorOpts)
653+
eventChan, err := roachprod.Monitor(context.Background(), config.Logger, args[0], monitorOpts)
654654
if err != nil {
655655
return err
656656
}
657-
for msg := range messages {
658-
if msg.Err != nil {
659-
msg.Msg += "error: " + msg.Err.Error()
660-
}
661-
thisError := errors.Newf("%d: %s", msg.Node, msg.Msg)
662-
if msg.Err != nil || strings.Contains(msg.Msg, "dead") {
663-
err = errors.CombineErrors(err, thisError)
664-
}
665-
fmt.Println(thisError.Error())
657+
for info := range eventChan {
658+
fmt.Println(info.String())
666659
}
667-
return err
660+
661+
return nil
668662
}),
669663
}
670664

pkg/cmd/roachtest/cluster.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,22 +1528,20 @@ func (c *clusterImpl) assertNoDeadNode(ctx context.Context, t test.Test) error {
15281528
}
15291529

15301530
t.L().Printf("checking for dead nodes")
1531-
ch, err := roachprod.Monitor(ctx, t.L(), c.name, install.MonitorOpts{OneShot: true, IgnoreEmptyNodes: true})
1531+
eventsCh, err := roachprod.Monitor(ctx, t.L(), c.name, install.MonitorOpts{OneShot: true, IgnoreEmptyNodes: true})
15321532

15331533
// An error here means there was a problem initialising a SyncedCluster.
15341534
if err != nil {
15351535
return err
15361536
}
15371537

15381538
deadNodes := 0
1539-
for n := range ch {
1540-
// If there's an error, it means either that the monitor command failed
1541-
// completely, or that it found a dead node worth complaining about.
1542-
if n.Err != nil || strings.HasPrefix(n.Msg, "dead") {
1539+
for info := range eventsCh {
1540+
t.L().Printf("%s", info)
1541+
1542+
if _, isDeath := info.Event.(install.MonitorNodeDead); isDeath {
15431543
deadNodes++
15441544
}
1545-
1546-
t.L().Printf("n%d: err=%v,msg=%s", n.Node, n.Err, n.Msg)
15471545
}
15481546

15491547
if deadNodes > 0 {

pkg/cmd/roachtest/monitor.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ package main
1313
import (
1414
"context"
1515
"fmt"
16-
"strings"
1716
"sync"
1817
"sync/atomic"
1918

@@ -178,7 +177,7 @@ func (m *monitorImpl) wait() error {
178177
m.cancel()
179178
wg.Done()
180179
}()
181-
setErr(errors.Wrap(m.g.Wait(), "monitor task failed"))
180+
setErr(errors.Wrap(m.g.Wait(), "function passed to monitor.Go failed"))
182181
}()
183182

184183
// 2. The second goroutine reads from the monitoring channel, watching for any
@@ -190,28 +189,24 @@ func (m *monitorImpl) wait() error {
190189
wg.Done()
191190
}()
192191

193-
messagesChannel, err := roachprod.Monitor(m.ctx, m.l, m.nodes, install.MonitorOpts{})
192+
eventsCh, err := roachprod.Monitor(m.ctx, m.l, m.nodes, install.MonitorOpts{})
194193
if err != nil {
195194
setErr(errors.Wrap(err, "monitor command failure"))
196195
return
197196
}
198-
var monitorErr error
199-
for msg := range messagesChannel {
200-
if msg.Err != nil {
201-
msg.Msg += "error: " + msg.Err.Error()
202-
}
203-
thisError := errors.Newf("%d: %s", msg.Node, msg.Msg)
204-
if msg.Err != nil || strings.Contains(msg.Msg, "dead") {
205-
monitorErr = errors.CombineErrors(monitorErr, thisError)
197+
198+
for info := range eventsCh {
199+
_, isDeath := info.Event.(install.MonitorNodeDead)
200+
isExpectedDeath := isDeath && atomic.AddInt32(&m.expDeaths, -1) >= 0
201+
var expectedDeathStr string
202+
if isExpectedDeath {
203+
expectedDeathStr = ": expected"
206204
}
207-
var id int
208-
var s string
209-
newMsg := thisError.Error()
210-
if n, _ := fmt.Sscanf(newMsg, "%d: %s", &id, &s); n == 2 {
211-
if strings.Contains(s, "dead") && atomic.AddInt32(&m.expDeaths, -1) < 0 {
212-
setErr(errors.Wrap(fmt.Errorf("unexpected node event: %s", newMsg), "monitor command failure"))
213-
return
214-
}
205+
m.l.Printf("Monitor event: %s%s", info, expectedDeathStr)
206+
207+
if isDeath && !isExpectedDeath {
208+
setErr(fmt.Errorf("unexpected node event: %s", info))
209+
return
215210
}
216211
}
217212
}()

pkg/roachprod/install/cluster_synced.go

Lines changed: 112 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -583,20 +583,52 @@ fi
583583
return results, nil
584584
}
585585

586+
// MonitorNodeSkipped represents a node whose status was not checked.
587+
type MonitorNodeSkipped struct{}
588+
589+
// MonitorNodeRunning represents the cockroach process running on a
590+
// node.
591+
type MonitorNodeRunning struct {
592+
PID string
593+
}
594+
595+
// MonitorNodeDead represents the cockroach process dying on a node.
596+
type MonitorNodeDead struct {
597+
ExitCode string
598+
}
599+
600+
type MonitorError struct {
601+
Err error
602+
}
603+
586604
// NodeMonitorInfo is a message describing a cockroach process' status.
587605
type NodeMonitorInfo struct {
588606
// The index of the node (in a SyncedCluster) at which the message originated.
589607
Node Node
590-
// A message about the node. This is either a PID, "dead", "nc exited", or
591-
// "skipped".
592-
// Anything but a PID or "skipped" is an indication that there is some
593-
// problem with the node and that the process is not running.
594-
Msg string
595-
// Err is an error that may occur when trying to probe the status of the node.
596-
// If Err is non-nil, Msg is empty. After an error is returned, the node with
597-
// the given index will no longer be probed. Errors typically indicate networking
598-
// issues or nodes that have (physically) shut down.
599-
Err error
608+
// Event describes what happened to the node; it is one of
609+
// MonitorNodeSkipped (no store directory was found);
610+
// MonitorNodeRunning, sent when cockroach is running on a node;
611+
// MonitorNodeDead, when the cockroach process stops running on a
612+
// node; or MonitorError, typically indicate networking issues
613+
// or nodes that have (physically) shut down.
614+
Event interface{}
615+
}
616+
617+
func (nmi NodeMonitorInfo) String() string {
618+
var status string
619+
620+
switch event := nmi.Event.(type) {
621+
case MonitorNodeRunning:
622+
status = fmt.Sprintf("cockroach process is running (PID: %s)", event.PID)
623+
case MonitorNodeSkipped:
624+
status = "node skipped"
625+
case MonitorNodeDead:
626+
status = fmt.Sprintf("cockroach process died (exit code %s)", event.ExitCode)
627+
case MonitorError:
628+
status = fmt.Sprintf("error: %s", event.Err.Error())
629+
}
630+
631+
return fmt.Sprintf("n%d: %s", nmi.Node, status)
600632
}
601633

602634
// MonitorOpts is used to pass the options needed by Monitor.
@@ -606,16 +638,16 @@ type MonitorOpts struct {
606638
}
607639

608640
// Monitor writes NodeMonitorInfo for the cluster nodes to the returned channel.
609-
// Infos sent to the channel always have the Index and exactly one of Msg or Err
610-
// set.
641+
// Infos sent to the channel always have the Node the event refers to, and the
642+
// event itself. See documentation for NodeMonitorInfo for possible event types.
611643
//
612-
// If oneShot is true, infos are retrieved only once for each node and the
644+
// If OneShot is true, infos are retrieved only once for each node and the
613645
// channel is subsequently closed; otherwise the process continues indefinitely
614646
// (emitting new information as the status of the cockroach process changes).
615647
//
616-
// If ignoreEmptyNodes is true, nodes on which no CockroachDB data is found
617-
// (in {store-dir}) will not be probed and single message, "skipped", will
618-
// be emitted for them.
648+
// If IgnoreEmptyNodes is true, nodes on which no CockroachDB data is found
649+
// (in {store-dir}) will not be probed and single event, MonitorNodeSkipped,
650+
// will be emitted for them.
619651
func (c *SyncedCluster) Monitor(
620652
l *logger.Logger, ctx context.Context, opts MonitorOpts,
621653
) chan NodeMonitorInfo {
@@ -624,10 +656,30 @@ func (c *SyncedCluster) Monitor(
624656
var wg sync.WaitGroup
625657
monitorCtx, cancel := context.WithCancel(ctx)
626658

659+
// sendEvent sends the NodeMonitorInfo passed through the channel
660+
// that is listened to by the caller. Bails if the context is
661+
// canceled.
662+
sendEvent := func(info NodeMonitorInfo) {
663+
select {
664+
case ch <- info:
665+
// We were able to send the info through the channel.
666+
case <-monitorCtx.Done():
667+
// Don't block trying to send the info.
668+
}
669+
}
670+
671+
const (
672+
separator = "|"
673+
skippedMsg = "skipped"
674+
runningMsg = "running"
675+
deadMsg = "dead"
676+
)
677+
627678
for i := range nodes {
628679
wg.Add(1)
629680
go func(i int) {
630681
defer wg.Done()
682+
631683
node := nodes[i]
632684

633685
// On each monitored node, we loop looking for a cockroach process.
@@ -637,18 +689,30 @@ func (c *SyncedCluster) Monitor(
637689
Store string
638690
Port int
639691
Local bool
692+
Separator string
693+
SkippedMsg string
694+
RunningMsg string
695+
DeadMsg string
640696
}{
641697
OneShot: opts.OneShot,
642698
IgnoreEmpty: opts.IgnoreEmptyNodes,
643699
Store: c.NodeDir(node, 1 /* storeIndex */),
644700
Port: c.NodePort(node),
645701
Local: c.IsLocal(),
702+
Separator: separator,
703+
SkippedMsg: skippedMsg,
704+
RunningMsg: runningMsg,
705+
DeadMsg: deadMsg,
646706
}
647707

708+
// NB.: we parse the output of every line this script
709+
// prints. Every call to `echo` must match the parsing logic
710+
// down below in order to produce structured results to the
711+
// caller.
648712
snippet := `
649713
{{ if .IgnoreEmpty }}
650714
if ! ls {{.Store}}/marker.* 1> /dev/null 2>&1; then
651-
echo "skipped"
715+
echo "{{.SkippedMsg}}"
652716
exit 0
653717
fi
654718
{{- end}}
@@ -682,10 +746,10 @@ while :; do
682746
# the new incarnation. We lost the actual exit status of the old PID.
683747
status="unknown"
684748
fi
685-
echo "dead (exit status ${status})"
749+
echo "{{.DeadMsg}}{{.Separator}}${status}"
686750
fi
687751
if [ "${pid}" != 0 ]; then
688-
echo "${pid}"
752+
echo "{{.RunningMsg}}{{.Separator}}${pid}"
689753
fi
690754
lastpid=${pid}
691755
fi
@@ -704,7 +768,8 @@ done
704768
t := template.Must(template.New("script").Parse(snippet))
705769
var buf bytes.Buffer
706770
if err := t.Execute(&buf, data); err != nil {
707-
ch <- NodeMonitorInfo{Node: node, Err: err}
771+
err := errors.Wrap(err, "failed to execute template")
772+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
708773
return
709774
}
710775

@@ -713,14 +778,16 @@ done
713778

714779
p, err := sess.StdoutPipe()
715780
if err != nil {
716-
ch <- NodeMonitorInfo{Node: node, Err: err}
781+
err := errors.Wrap(err, "failed to read stdout pipe")
782+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
717783
wg.Done()
718784
return
719785
}
720786
// Request a PTY so that the script will receive a SIGPIPE when the
721787
// session is closed.
722788
if err := sess.RequestPty(); err != nil {
723-
ch <- NodeMonitorInfo{Node: node, Err: err}
789+
err := errors.Wrap(err, "failed to request PTY")
790+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
724791
return
725792
}
726793

@@ -734,12 +801,31 @@ done
734801
if err == io.EOF {
735802
return
736803
}
737-
ch <- NodeMonitorInfo{Node: node, Msg: string(line)}
804+
if err != nil {
805+
err := errors.Wrap(err, "error reading from session")
806+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
807+
}
808+
809+
parts := strings.Split(string(line), separator)
810+
switch parts[0] {
811+
case skippedMsg:
812+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorNodeSkipped{}})
813+
case runningMsg:
814+
pid := parts[1]
815+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorNodeRunning{pid}})
816+
case deadMsg:
817+
exitCode := parts[1]
818+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorNodeDead{exitCode}})
819+
default:
820+
err := fmt.Errorf("internal error: unrecognized output from monitor: %s", line)
821+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
822+
}
738823
}
739824
}(p)
740825

741826
if err := sess.Start(); err != nil {
742-
ch <- NodeMonitorInfo{Node: node, Err: err}
827+
err := errors.Wrap(err, "failed to start session")
828+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
743829
return
744830
}
745831

@@ -755,7 +841,8 @@ done
755841
// pipe. Otherwise it can be closed under us, causing the reader to loop
756842
// infinitely receiving a non-`io.EOF` error.
757843
if err := sess.Wait(); err != nil {
758-
ch <- NodeMonitorInfo{Node: node, Err: err}
844+
err := errors.Wrap(err, "failed to wait for session")
845+
sendEvent(NodeMonitorInfo{Node: node, Event: MonitorError{err}})
759846
return
760847
}
761848
}(i)

pkg/roachprod/install/session.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ func newRemoteSession(l *logger.Logger, command *remoteCommand) *remoteSession {
112112
}
113113
}
114114

115-
//const logfile = ""
116115
args := []string{
117116
command.user + "@" + command.host,
118117

0 commit comments

Comments
 (0)