Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (c *Client) EventStream() *EventStream {

// Stream establishes a new subscription to Nomad's event stream and streams
// results back to the returned channel.
//
// Events stop being emitted once the Events.Err field is non-nil.
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) {
r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
Expand Down Expand Up @@ -220,6 +222,11 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
return
case eventsCh <- &events:
}

// There are no recoverable Decode errors, so return on error.
if events.Err != nil {
return
}
}
}()

Expand Down
37 changes: 37 additions & 0 deletions api/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"context"
"encoding/json"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -118,6 +119,42 @@ func TestEvent_Stream(t *testing.T) {
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream event"))
}

// Stop the server to ensure EOF is returned
s.Stop()

for {
select {
case event, ok := <-streamCh:
if !ok {
must.Unreachable(t, must.Sprintf("chan closed before EOF received"))
}

// Sadly decode doesn't return io.EOF
if event.Err != nil && strings.HasSuffix(event.Err.Error(), "EOF") {
// Succcess! Make sure chan gets closed
select {
case _, ok := <-streamCh:
if ok {
must.Unreachable(t, must.Sprintf("expected chan to close after EOF"))
}

// Success!
return
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream to close"))
}
}

if event.Err != nil {
must.Unreachable(t, must.Sprintf("unexpected %v (%T)", event.Err, event.Err))
}
must.Len(t, 1, event.Events)
must.Eq(t, "Evaluation", string(event.Events[0].Topic))
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream EOF"))
}
}
}

func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions api/internal/testutil/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type TestServer struct {
cmd *exec.Cmd
Config *TestServerConfig
t testing.TB
exited bool

HTTPAddr string
SerfAddr string
Expand Down Expand Up @@ -240,6 +241,12 @@ func NewTestServer(t testing.TB, cb ServerConfigCallback) *TestServer {
// Stop stops the test Nomad server, and removes the Nomad data
// directory once we are done.
func (s *TestServer) Stop() {
if s.exited {
// Allow calling multiple times to allow for tests that use defer s.Stop()
// as well as stop the server during the test to assert behavior.
return
}

s.t.Cleanup(func() {
_ = os.RemoveAll(s.Config.DataDir)
})
Expand All @@ -258,6 +265,7 @@ func (s *TestServer) Stop() {

select {
case <-done:
s.exited = true
return
case <-time.After(5 * time.Second):
s.t.Logf("timed out waiting for process to gracefully terminate")
Expand All @@ -268,6 +276,7 @@ func (s *TestServer) Stop() {

select {
case <-done:
s.exited = true
case <-time.After(5 * time.Second):
s.t.Logf("timed out waiting for process to be killed")
}
Expand Down
Loading