Skip to content

Commit b4d81fc

Browse files
authored
Ensure a trickle of output ephemeral buffer data is flushed regularly (#292)
1 parent 8439999 commit b4d81fc

File tree

3 files changed

+119
-1
lines changed

3 files changed

+119
-1
lines changed

cli/integrationtest/controlplane_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,98 @@ timeoutSeconds: 600`, getTestConnectivityImage(t))
483483
require.Equal("1234", execStdOut)
484484
}
485485

486+
func TestEndToEndExecWithSocketsAndEphemeralBuffersTrickle(t *testing.T) {
487+
t.Parallel()
488+
skipIfEphemeralBuffersNotSupported(t)
489+
require := require.New(t)
490+
491+
runSpec := fmt.Sprintf(`
492+
job:
493+
codespec:
494+
image: %s
495+
buffers:
496+
inputs: ["input"]
497+
outputs: ["output"]
498+
sockets:
499+
- port: 9002
500+
inputBuffer: input
501+
outputBuffer: output
502+
args:
503+
- socket
504+
- --port
505+
- "9002"
506+
buffers:
507+
input: _
508+
output: _
509+
timeoutSeconds: 600`, getTestConnectivityImage(t))
510+
511+
tempDir := t.TempDir()
512+
runSpecPath := filepath.Join(tempDir, "runspec.yaml")
513+
require.NoError(os.WriteFile(runSpecPath, []byte(runSpec), 0644))
514+
515+
// Create a pipe that trickles data: writes the current date every second for 10 seconds
516+
pr, pw := io.Pipe()
517+
inByteCount := 0
518+
go func() {
519+
defer pw.Close()
520+
for i := range 10 {
521+
n, err := fmt.Fprintf(pw, "%s\n", time.Now().Format(time.RFC3339Nano))
522+
require.NoError(err)
523+
inByteCount += n
524+
if i < 9 {
525+
time.Sleep(1 * time.Second)
526+
}
527+
}
528+
}()
529+
530+
execCmd := exec.Command("tyger", "run", "exec", "--file", runSpecPath, "--log-level", "trace")
531+
execCmd.Stdin = pr
532+
533+
stdErr := &bytes.Buffer{}
534+
execCmd.Stderr = stdErr
535+
536+
execOutPipe, err := execCmd.StdoutPipe()
537+
require.NoError(err)
538+
539+
require.NoError(execCmd.Start())
540+
541+
// Read output and record the start and stop times
542+
var firstReadTime, lastReadTime time.Time
543+
outByteCount := 0
544+
for {
545+
buf := make([]byte, 64*1024)
546+
n, err := execOutPipe.Read(buf)
547+
if n > 0 {
548+
now := time.Now()
549+
if outByteCount == 0 {
550+
firstReadTime = now
551+
}
552+
lastReadTime = now
553+
outByteCount += n
554+
}
555+
if err == io.EOF {
556+
break
557+
}
558+
require.NoError(err)
559+
}
560+
561+
execErr := execCmd.Wait()
562+
t.Log(stdErr.String())
563+
require.NoError(execErr)
564+
565+
require.Equal(inByteCount, outByteCount)
566+
567+
// Validate that output was received as a trickle, not all at once.
568+
// The total duration between first and last received data should be at least 7 seconds.
569+
totalDuration := lastReadTime.Sub(firstReadTime)
570+
571+
t.Logf("First output received at: %s", firstReadTime.Format(time.RFC3339Nano))
572+
t.Logf("Last output received at: %s", lastReadTime.Format(time.RFC3339Nano))
573+
574+
require.Greater(totalDuration.Seconds(), 5.0,
575+
"Expected output to trickle over at least 5 seconds, but total duration was %s", totalDuration)
576+
}
577+
486578
func TestEndToEndCreateWithShortBufferAccessTtl(t *testing.T) {
487579
t.Parallel()
488580
skipIfOnlyFastTests(t)

cli/integrationtest/testutils.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"errors"
1313
"flag"
1414
"fmt"
15+
"io"
1516
"net/http"
1617
"os/exec"
1718
"slices"
@@ -65,6 +66,11 @@ func (b *CmdBuilder) Stdin(stdin string) *CmdBuilder {
6566
return b
6667
}
6768

69+
func (b *CmdBuilder) StdinStream(stdin io.Reader) *CmdBuilder {
70+
b.cmd.Stdin = stdin
71+
return b
72+
}
73+
6874
func (b *CmdBuilder) Run() (stdout string, stderr string, err error) {
6975
defer b.cancelFunc()
7076

cli/internal/dataplane/relayserver.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,27 @@ func RelayOutputServer(
174174
}
175175
}()
176176

177-
_, err := io.Copy(w, inputReader)
177+
var err error
178+
if flusher == nil {
179+
_, err = io.Copy(w, inputReader)
180+
} else {
181+
// If the ResponseWriter supports flushing, we want to flush regularly in the case of a trickle of data.
182+
buf := make([]byte, 32*1024)
183+
for {
184+
n, err := inputReader.Read(buf)
185+
if n > 0 {
186+
w.Write(buf[:n])
187+
flusher.Flush()
188+
}
189+
if err != nil {
190+
if err == io.EOF {
191+
err = nil
192+
}
193+
break
194+
}
195+
}
196+
}
197+
178198
if err != nil {
179199
log.Error().Err(err).Msg("transfer failed")
180200
w.Header().Set(ErrorCodeHeader, transferFailedErrorCode)

0 commit comments

Comments
 (0)