diff --git a/cli/integrationtest/controlplane_test.go b/cli/integrationtest/controlplane_test.go index a95d4411..d652e224 100644 --- a/cli/integrationtest/controlplane_test.go +++ b/cli/integrationtest/controlplane_test.go @@ -483,6 +483,98 @@ timeoutSeconds: 600`, getTestConnectivityImage(t)) require.Equal("1234", execStdOut) } +func TestEndToEndExecWithSocketsAndEphemeralBuffersTrickle(t *testing.T) { + t.Parallel() + skipIfEphemeralBuffersNotSupported(t) + require := require.New(t) + + runSpec := fmt.Sprintf(` +job: + codespec: + image: %s + buffers: + inputs: ["input"] + outputs: ["output"] + sockets: + - port: 9002 + inputBuffer: input + outputBuffer: output + args: + - socket + - --port + - "9002" + buffers: + input: _ + output: _ +timeoutSeconds: 600`, getTestConnectivityImage(t)) + + tempDir := t.TempDir() + runSpecPath := filepath.Join(tempDir, "runspec.yaml") + require.NoError(os.WriteFile(runSpecPath, []byte(runSpec), 0644)) + + // Create a pipe that trickles data: writes the current date every second for 10 seconds + pr, pw := io.Pipe() + inByteCount := 0 + go func() { + defer pw.Close() + for i := range 10 { + n, err := fmt.Fprintf(pw, "%s\n", time.Now().Format(time.RFC3339Nano)) + require.NoError(err) + inByteCount += n + if i < 9 { + time.Sleep(1 * time.Second) + } + } + }() + + execCmd := exec.Command("tyger", "run", "exec", "--file", runSpecPath, "--log-level", "trace") + execCmd.Stdin = pr + + stdErr := &bytes.Buffer{} + execCmd.Stderr = stdErr + + execOutPipe, err := execCmd.StdoutPipe() + require.NoError(err) + + require.NoError(execCmd.Start()) + + // Read output and record the start and stop times + var firstReadTime, lastReadTime time.Time + outByteCount := 0 + for { + buf := make([]byte, 64*1024) + n, err := execOutPipe.Read(buf) + if n > 0 { + now := time.Now() + if outByteCount == 0 { + firstReadTime = now + } + lastReadTime = now + outByteCount += n + } + if err == io.EOF { + break + } + require.NoError(err) + } + + execErr := execCmd.Wait() + t.Log(stdErr.String()) + require.NoError(execErr) + + require.Equal(inByteCount, outByteCount) + + // Validate that output was received as a trickle, not all at once. + // The total duration between first and last received data should be at least 7 seconds. + totalDuration := lastReadTime.Sub(firstReadTime) + + t.Logf("First output received at: %s", firstReadTime.Format(time.RFC3339Nano)) + t.Logf("Last output received at: %s", lastReadTime.Format(time.RFC3339Nano)) + + require.Greater(totalDuration.Seconds(), 5.0, + "Expected output to trickle over at least 5 seconds, but total duration was %s", totalDuration) +} + func TestEndToEndCreateWithShortBufferAccessTtl(t *testing.T) { t.Parallel() skipIfOnlyFastTests(t) diff --git a/cli/integrationtest/testutils.go b/cli/integrationtest/testutils.go index fb004a4d..6b6c90cd 100644 --- a/cli/integrationtest/testutils.go +++ b/cli/integrationtest/testutils.go @@ -12,6 +12,7 @@ import ( "errors" "flag" "fmt" + "io" "net/http" "os/exec" "slices" @@ -65,6 +66,11 @@ func (b *CmdBuilder) Stdin(stdin string) *CmdBuilder { return b } +func (b *CmdBuilder) StdinStream(stdin io.Reader) *CmdBuilder { + b.cmd.Stdin = stdin + return b +} + func (b *CmdBuilder) Run() (stdout string, stderr string, err error) { defer b.cancelFunc() diff --git a/cli/internal/dataplane/relayserver.go b/cli/internal/dataplane/relayserver.go index 108459c1..2a7d537c 100644 --- a/cli/internal/dataplane/relayserver.go +++ b/cli/internal/dataplane/relayserver.go @@ -174,7 +174,27 @@ func RelayOutputServer( } }() - _, err := io.Copy(w, inputReader) + var err error + if flusher == nil { + _, err = io.Copy(w, inputReader) + } else { + // If the ResponseWriter supports flushing, we want to flush regularly in the case of a trickle of data. + buf := make([]byte, 32*1024) + for { + n, err := inputReader.Read(buf) + if n > 0 { + w.Write(buf[:n]) + flusher.Flush() + } + if err != nil { + if err == io.EOF { + err = nil + } + break + } + } + } + if err != nil { log.Error().Err(err).Msg("transfer failed") w.Header().Set(ErrorCodeHeader, transferFailedErrorCode)