Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions cli/integrationtest/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,98 @@ timeoutSeconds: 600`, getTestConnectivityImage(t))
require.Equal("1234", execStdOut)
}

func TestEndToEndExecWithSocketsAndEphemeralBuffersTrickle(t *testing.T) {
t.Parallel()
skipIfEphemeralBuffersNotSupported(t)
require := require.New(t)

runSpec := fmt.Sprintf(`
job:
codespec:
image: %s
buffers:
inputs: ["input"]
outputs: ["output"]
sockets:
- port: 9002
inputBuffer: input
outputBuffer: output
args:
- socket
- --port
- "9002"
buffers:
input: _
output: _
timeoutSeconds: 600`, getTestConnectivityImage(t))

tempDir := t.TempDir()
runSpecPath := filepath.Join(tempDir, "runspec.yaml")
require.NoError(os.WriteFile(runSpecPath, []byte(runSpec), 0644))

// Create a pipe that trickles data: writes the current date every second for 10 seconds
pr, pw := io.Pipe()
inByteCount := 0
go func() {
defer pw.Close()
for i := range 10 {
n, err := fmt.Fprintf(pw, "%s\n", time.Now().Format(time.RFC3339Nano))
require.NoError(err)
inByteCount += n
if i < 9 {
time.Sleep(1 * time.Second)
}
}
}()

execCmd := exec.Command("tyger", "run", "exec", "--file", runSpecPath, "--log-level", "trace")
execCmd.Stdin = pr

stdErr := &bytes.Buffer{}
execCmd.Stderr = stdErr

execOutPipe, err := execCmd.StdoutPipe()
require.NoError(err)

require.NoError(execCmd.Start())

// Read output and record the start and stop times
var firstReadTime, lastReadTime time.Time
outByteCount := 0
for {
buf := make([]byte, 64*1024)
n, err := execOutPipe.Read(buf)
if n > 0 {
now := time.Now()
if outByteCount == 0 {
firstReadTime = now
}
lastReadTime = now
outByteCount += n
}
if err == io.EOF {
break
}
require.NoError(err)
}

execErr := execCmd.Wait()
t.Log(stdErr.String())
require.NoError(execErr)

require.Equal(inByteCount, outByteCount)

// Validate that output was received as a trickle, not all at once.
// The total duration between first and last received data should be at least 7 seconds.
totalDuration := lastReadTime.Sub(firstReadTime)

t.Logf("First output received at: %s", firstReadTime.Format(time.RFC3339Nano))
t.Logf("Last output received at: %s", lastReadTime.Format(time.RFC3339Nano))

require.Greater(totalDuration.Seconds(), 5.0,
"Expected output to trickle over at least 5 seconds, but total duration was %s", totalDuration)
}

func TestEndToEndCreateWithShortBufferAccessTtl(t *testing.T) {
t.Parallel()
skipIfOnlyFastTests(t)
Expand Down
6 changes: 6 additions & 0 deletions cli/integrationtest/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"net/http"
"os/exec"
"slices"
Expand Down Expand Up @@ -65,6 +66,11 @@ func (b *CmdBuilder) Stdin(stdin string) *CmdBuilder {
return b
}

func (b *CmdBuilder) StdinStream(stdin io.Reader) *CmdBuilder {
b.cmd.Stdin = stdin
return b
}

func (b *CmdBuilder) Run() (stdout string, stderr string, err error) {
defer b.cancelFunc()

Expand Down
22 changes: 21 additions & 1 deletion cli/internal/dataplane/relayserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,27 @@
}
}()

_, err := io.Copy(w, inputReader)
var err error
if flusher == nil {
_, err = io.Copy(w, inputReader)
} else {
// If the ResponseWriter supports flushing, we want to flush regularly in the case of a trickle of data.
buf := make([]byte, 32*1024)
for {
n, err := inputReader.Read(buf)
if n > 0 {
w.Write(buf[:n])
flusher.Flush()
}
if err != nil {
if err == io.EOF {
err = nil
}
break
}
}
}

if err != nil {
log.Error().Err(err).Msg("transfer failed")
w.Header().Set(ErrorCodeHeader, transferFailedErrorCode)
Expand Down
Loading