Skip to content

Conversation

@llogen
Copy link
Contributor

@llogen llogen commented Jan 22, 2026

This change replaces the simple full-file transfer mechanism with a robust chunked protocol that supports:

  • Concurrent uploads/downloads with unique transfer IDs
  • 1MB chunk streaming to support large files
  • Transfer state management with acknowledgments
  • Round-robin scheduling for fair resource usage during concurrent transfers

@llogen llogen requested a review from jenstopp January 22, 2026 12:54
@llogen llogen force-pushed the feat/reworkFileTransfer branch 4 times, most recently from a4c6b8d to da4870b Compare January 22, 2026 13:35
This change replaces the simple full-file transfer mechanism with a robust
chunked protocol that supports:
- Concurrent uploads/downloads with unique transfer IDs
- 1MB chunk streaming to support large files
- Transfer state management with acknowledgments
- Round-robin scheduling for fair resource usage during concurrent transfers

Signed-off-by: llogen <christoph.lange@blindspot.software>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the simple full-file transfer mechanism with a robust chunked protocol supporting concurrent transfers with unique IDs, 1MB chunk streaming for large files, transfer state management with acknowledgments, and round-robin scheduling for fair resource usage during concurrent transfers.

Changes:

  • Added new protobuf messages (FileMetadata, FileChunk, FileTransferRequest, FileTransferResponse) with status enum
  • Implemented agent-side chunking with upload/download state tracking and round-robin scheduling
  • Created client-side file transfer manager for handling chunked uploads and downloads
  • Updated buf configuration from v2 to v1 with plugin reference changes
  • Moved google/uuid from indirect to direct dependency

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
protobuf/dutctl/v1/dutctl.proto Defines new chunked file transfer protocol messages and enum
protobuf/gen/dutctl/v1/dutctl.pb.go Auto-generated code from protoc-gen-go v1.36.11 with new message types
protobuf/gen/dutctl/v1/dutctlv1connect/dutctl.connect.go Refactored to inline method descriptor lookups
protobuf/buf.yaml Downgraded from v2 to v1
protobuf/buf.gen.yaml Changed plugin references from local to remote
internal/dutagent/session.go Added upload/download state tracking with mutexes and round-robin scheduling
internal/dutagent/worker.go Modified workers to handle chunked file transfers with metadata
internal/dutagent/broker.go Updated channel initialization with buffers and transfer state maps
cmds/dutctl/rpc.go Integrated file transfer manager for client-side handling
cmds/dutctl/file_transfer.go New file implementing client-side chunked transfer management
go.mod Made google/uuid a direct dependency

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: llogen <christoph.lange@blindspot.software>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1 to +490
// Copyright 2025 Blindspot Software
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
"fmt"
"io"
"log"
"os"
"sync"

"connectrpc.com/connect"

pb "github.com/BlindspotSoftware/dutctl/protobuf/gen/dutctl/v1"
)

const (
clientChunkSize = 1024 * 1024 // 1MB chunks
)

// StreamForClient is a type alias for the stream connection to reduce line length.
type StreamForClient = *connect.BidiStreamForClient[pb.RunRequest, pb.RunResponse]

// clientFileTransferState represents an active file transfer on the client.
type clientFileTransferState struct {
transferID string
path string
file *os.File
direction string // "upload" or "download"
expectedChunkNum int32 // For validating chunk sequence on download
mu sync.Mutex
}

// clientFileTransferManager manages file transfers on the client side.
type clientFileTransferManager struct {
transfers map[string]*clientFileTransferState
mu sync.RWMutex
}

func newClientFileTransferManager() *clientFileTransferManager {
return &clientFileTransferManager{
transfers: make(map[string]*clientFileTransferState),
}
}

func (m *clientFileTransferManager) registerTransfer(transferID, path, direction string) *clientFileTransferState {
m.mu.Lock()
defer m.mu.Unlock()

state := &clientFileTransferState{
transferID: transferID,
path: path,
direction: direction,
}

m.transfers[transferID] = state

return state
}

func (m *clientFileTransferManager) getTransfer(transferID string) *clientFileTransferState {
m.mu.RLock()
defer m.mu.RUnlock()

return m.transfers[transferID]
}

func (m *clientFileTransferManager) removeTransfer(transferID string) {
m.mu.Lock()
defer m.mu.Unlock()

if state, exists := m.transfers[transferID]; exists {
if state.file != nil {
state.file.Close()
}

delete(m.transfers, transferID)
}
}

// sendUploadRejection sends a transfer rejection response to the agent.
func (m *clientFileTransferManager) sendUploadRejection(transferID, message string, stream StreamForClient) error {
res := &pb.RunRequest{
Msg: &pb.RunRequest_FileTransferResponse{
FileTransferResponse: &pb.FileTransferResponse{
TransferId: transferID,
Status: pb.FileTransferResponse_TRANSFER_REJECTED,
ErrorMessage: message,
},
},
}

return stream.Send(res)
}

// sendUploadAcceptance sends a transfer acceptance response to the agent.
func (m *clientFileTransferManager) sendUploadAcceptance(transferID string, stream StreamForClient) error {
res := &pb.RunRequest{
Msg: &pb.RunRequest_FileTransferResponse{
FileTransferResponse: &pb.FileTransferResponse{
TransferId: transferID,
Status: pb.FileTransferResponse_ACCEPTED,
},
},
}

return stream.Send(res)
}

// sendChunkToAgent sends a file chunk to the agent.
func (m *clientFileTransferManager) sendChunkToAgent(
transferID string,
chunkNum int32,
data []byte,
isFinal bool,
stream StreamForClient,
) error {
chunk := &pb.RunRequest{
Msg: &pb.RunRequest_FileChunk{
FileChunk: &pb.FileChunk{
TransferId: transferID,
ChunkNumber: chunkNum,
ChunkData: data,
ChunkOffset: int64(chunkNum) * int64(clientChunkSize),
IsFinal: isFinal,
},
},
}

return stream.Send(chunk)
}

// handleUploadRequest processes a request to upload a file to the agent.
func (m *clientFileTransferManager) handleUploadRequest(transferID, path string, stream StreamForClient) error {
_, statErr := os.Stat(path)
if statErr != nil {
log.Printf("Error accessing file %q: %v", path, statErr)

rejectErr := m.sendUploadRejection(transferID, fmt.Sprintf("cannot access file: %v", statErr), stream)
if rejectErr != nil {
return fmt.Errorf("sending transfer rejection: %w", rejectErr)
}

return nil
}

file, err := os.Open(path)
if err != nil {
log.Printf("Error opening file %q: %v", path, err)

rejectErr := m.sendUploadRejection(transferID, fmt.Sprintf("cannot open file: %v", err), stream)
if rejectErr != nil {
return fmt.Errorf("sending transfer rejection: %w", rejectErr)
}

return nil
}

state := m.registerTransfer(transferID, path, "upload")
state.file = file

acceptErr := m.sendUploadAcceptance(transferID, stream)
if acceptErr != nil {
file.Close()
m.removeTransfer(transferID)

return fmt.Errorf("sending transfer acceptance: %w", acceptErr)
}

m.sendUploadInChunks(transferID, path, file, stream)

return nil
}

// sendUploadInChunks reads and sends a file in chunks to the agent.
func (m *clientFileTransferManager) sendUploadInChunks(transferID, path string, file *os.File, stream StreamForClient) {
go func() {
defer file.Close()
defer m.removeTransfer(transferID)

chunkNum := int32(0)

for {
chunkData := make([]byte, clientChunkSize)
bytesRead, readErr := file.Read(chunkData)

if bytesRead > 0 {
chunkData = chunkData[:bytesRead]
isFinal := readErr == io.EOF

chunkErr := m.sendChunkToAgent(transferID, chunkNum, chunkData, isFinal, stream)
if chunkErr != nil {
log.Printf("Error sending file chunk: %v", chunkErr)

return
}

log.Printf("Sent chunk %d of file %q (%d bytes)", chunkNum, path, bytesRead)
chunkNum++

if isFinal {
log.Printf("Completed sending file %q\n", path)

break
}
}

if readErr != nil && readErr != io.EOF {
log.Printf("Error reading file %q: %v", path, readErr)

return
}

if readErr == io.EOF {
break
}
}
}()
}

// handleFileTransferRequest handles a FileTransferRequest from the agent.
// This can be either:
// 1. A request for the client to upload a file to the agent (with file path in metadata)
// 2. A notification that the agent will send a file download (with destination path in metadata)
// The direction is determined by checking if a transfer for this ID already exists.
func (m *clientFileTransferManager) handleFileTransferRequest(ftReq *pb.FileTransferRequest, stream StreamForClient) error {
transferID := ftReq.GetTransferId()
metadata := ftReq.GetMetadata()
path := metadata.GetPath()

// Check if this is an existing download transfer (agent initiating a file send)
existingState := m.getTransfer(transferID)
if existingState != nil && existingState.direction == "download" {
// This is the metadata notification for a download that's about to start
log.Printf("Agent sending file download: %q (transfer_id=%s)\n", path, transferID)
// Update the path with the actual destination from metadata
existingState.path = path

return nil
}

// This is a request for the client to upload a file
log.Printf("Agent requesting file upload: %q (transfer_id=%s)\n", path, transferID)

return m.handleUploadRequest(transferID, path, stream)
}

// sendDownloadError sends an error response for a download transfer.
func (m *clientFileTransferManager) sendDownloadError(transferID, message string, stream StreamForClient) error {
res := &pb.RunRequest{
Msg: &pb.RunRequest_FileTransferResponse{
FileTransferResponse: &pb.FileTransferResponse{
TransferId: transferID,
Status: pb.FileTransferResponse_TRANSFER_REJECTED,
ErrorMessage: message,
},
},
}

return stream.Send(res)
}

// sendChunkAcknowledgment sends an acknowledgment for a received chunk.
func (m *clientFileTransferManager) sendChunkAcknowledgment(transferID string, nextChunk int32, stream StreamForClient) error {
res := &pb.RunRequest{
Msg: &pb.RunRequest_FileTransferResponse{
FileTransferResponse: &pb.FileTransferResponse{
TransferId: transferID,
Status: pb.FileTransferResponse_CHUNK_RECEIVED,
NextChunkExpected: nextChunk,
},
},
}

return stream.Send(res)
}

// sendTransferComplete sends a transfer completion response.
func (m *clientFileTransferManager) sendTransferComplete(transferID string, stream StreamForClient) error {
res := &pb.RunRequest{
Msg: &pb.RunRequest_FileTransferResponse{
FileTransferResponse: &pb.FileTransferResponse{
TransferId: transferID,
Status: pb.FileTransferResponse_TRANSFER_COMPLETE,
},
},
}

return stream.Send(res)
}

// createDownloadFile creates a file for writing downloaded data.
func (m *clientFileTransferManager) createDownloadFile(transferID, path string, stream StreamForClient) (*os.File, error) {
file, err := os.Create(path)
if err != nil {
log.Printf("Error creating download file: %v", err)

sendErr := m.sendDownloadError(transferID, fmt.Sprintf("cannot create file: %v", err), stream)
if sendErr != nil {
return nil, fmt.Errorf("sending error: %w", sendErr)
}

return nil, err
}

return file, nil
}

// writeChunkToFile writes chunk data to the download file.
func (m *clientFileTransferManager) writeChunkToFile(
transferID string,
state *clientFileTransferState,
chunkData []byte,
stream StreamForClient,
) error {
state.mu.Lock()
file := state.file
state.mu.Unlock()

if file == nil {
return nil
}

_, writeErr := file.Write(chunkData)
if writeErr != nil {
log.Printf("Error writing to file: %v", writeErr)

sendErr := m.sendDownloadError(transferID, fmt.Sprintf("write error: %v", writeErr), stream)
if sendErr != nil {
return fmt.Errorf("sending error response: %w", sendErr)
}

m.removeTransfer(transferID)

return writeErr
}

return nil
}

// validateChunkSequence validates that a chunk arrives in the correct sequence.
// Returns true if valid, false if sequence error (error already sent to stream).
func (m *clientFileTransferManager) validateChunkSequence(
transferID string,
chunk *pb.FileChunk,
state *clientFileTransferState,
stream StreamForClient,
) bool {
state.mu.Lock()
defer state.mu.Unlock()

if chunk.GetChunkNumber() != state.expectedChunkNum {
log.Printf(
"Error: chunk order violation for transfer %s: expected %d, got %d",
transferID, state.expectedChunkNum, chunk.GetChunkNumber(),
)

const errMsg = "chunk sequence error: chunks must arrive in order"

sendErr := m.sendDownloadError(transferID, errMsg, stream)
if sendErr != nil {
log.Printf("Error sending error response: %v", sendErr)
}

m.removeTransfer(transferID)

return false
}

return true
}

// ensureDownloadFileExists creates the download file if needed for the first chunk.
// Returns error if file creation fails (error already sent to stream).
func (m *clientFileTransferManager) ensureDownloadFileExists(
transferID string,
chunk *pb.FileChunk,
state *clientFileTransferState,
stream StreamForClient,
) error {
state.mu.Lock()
needsFile := chunk.GetChunkNumber() == 0 && state.file == nil
state.mu.Unlock()

if !needsFile {
return nil
}

file, err := m.createDownloadFile(transferID, state.path, stream)
if err != nil {
m.removeTransfer(transferID)

// Error response already sent in createDownloadFile.
//nolint:nilerr
return nil
}

state.mu.Lock()
state.file = file
state.mu.Unlock()

return nil
}

// handleFileChunk handles a FileChunk from the agent (file download).
func (m *clientFileTransferManager) handleFileChunk(chunk *pb.FileChunk, stream StreamForClient) error {
transferID := chunk.GetTransferId()

log.Printf("Received file chunk: transfer_id=%s, chunk_number=%d, size=%d bytes, is_final=%v",
transferID, chunk.GetChunkNumber(), len(chunk.GetChunkData()), chunk.GetIsFinal())

// Get file transfer state (must already exist from FileTransferRequest with metadata)
state := m.getTransfer(transferID)
if state == nil {
log.Printf("Error: received chunk for unknown transfer %s", transferID)

const errMsg = "received chunk for unknown transfer, missing FileTransferRequest with metadata"

sendErr := m.sendDownloadError(transferID, errMsg, stream)
if sendErr != nil {
return fmt.Errorf("sending error response: %w", sendErr)
}

// Error response sent successfully; no need to propagate further.
return nil
}

// Validate chunk sequence - chunks must arrive in order.
if !m.validateChunkSequence(transferID, chunk, state, stream) {
return nil
}

// Ensure file exists for first chunk.
err := m.ensureDownloadFileExists(transferID, chunk, state, stream)
if err != nil {
return err
}

// Write chunk to file.
writeErr := m.writeChunkToFile(transferID, state, chunk.GetChunkData(), stream)
if writeErr != nil {
// Error already handled in writeChunkToFile, response already sent to agent.
//nolint:nilerr
return nil
}

// Increment expected chunk for next chunk.
state.mu.Lock()
state.expectedChunkNum++
state.mu.Unlock()

// Send acknowledgment.
ackErr := m.sendChunkAcknowledgment(transferID, chunk.GetChunkNumber()+1, stream)
if ackErr != nil {
return fmt.Errorf("sending chunk ack: %w", ackErr)
}

// If final chunk, close file and send completion.
if chunk.GetIsFinal() {
log.Printf("Download complete: %s\n", state.path)

completeErr := m.sendTransferComplete(transferID, stream)
if completeErr != nil {
return fmt.Errorf("sending completion: %w", completeErr)
}

m.removeTransfer(transferID)
}

return nil
}

// handleFileTransferResponse handles a FileTransferResponse from the agent (acknowledgments).
func (m *clientFileTransferManager) handleFileTransferResponse(ftRes *pb.FileTransferResponse) {
transferID := ftRes.GetTransferId()
status := ftRes.GetStatus()

log.Printf("Received file transfer response: transfer_id=%s, status=%v", transferID, status)

switch status {
case pb.FileTransferResponse_ERROR:
log.Printf("File transfer error for %s: %s", transferID, ftRes.GetErrorMessage())
m.removeTransfer(transferID)
case pb.FileTransferResponse_TRANSFER_COMPLETE:
log.Printf("Agent confirmed transfer complete: %s", transferID)
m.removeTransfer(transferID)
}
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for the new chunked file transfer functionality. The new file transfer protocol introduces significant complexity with concurrent transfers, chunk sequencing, and state management, but there are no tests to verify correctness. Critical scenarios that should be tested include:

  • Single file upload/download
  • Concurrent file transfers
  • Chunk sequence validation
  • Error handling and recovery
  • Resource cleanup on transfer completion/failure

Consider adding comprehensive unit and integration tests for the file transfer manager and session components.

Copilot uses AI. Check for mistakes.
Comment on lines +172 to +208
func (s *session) getNextChunk(transferID string) (*pb.FileChunk, bool, error) {
s.downloadMutex.RLock()
state, exists := s.activeDownloads[transferID]
s.downloadMutex.RUnlock()

if !exists {
return nil, false, fmt.Errorf("download not found: %s", transferID)
}

// Read next chunk from reader
chunkData := make([]byte, chunkSize)
n, err := state.reader.Read(chunkData)

if n > 0 {
chunkData = chunkData[:n]
}

isFinal := err == io.EOF
if err != nil && err != io.EOF {
return nil, false, err
}

// Calculate offset and chunk number
chunkOffset := int64(state.chunkNumber) * int64(chunkSize)

chunk := &pb.FileChunk{
TransferId: transferID,
ChunkNumber: state.chunkNumber,
ChunkData: chunkData,
ChunkOffset: chunkOffset,
IsFinal: isFinal,
}

return r, nil
// Increment chunk number for next call
state.chunkNumber++

return chunk, isFinal, nil
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in getNextChunk: The downloadState.chunkNumber field is modified without holding any mutex. After releasing the downloadMutex at line 175, the state is accessed and modified at line 206 without synchronization. This can lead to race conditions if multiple goroutines call getNextChunk for the same transfer ID concurrently.

Consider adding a mutex to downloadState and protecting all accesses to chunkNumber, or redesign the download flow to ensure only one goroutine processes each download.

Copilot uses AI. Check for mistakes.
func (s *session) removeUpload(transferID string) {
s.uploadMutex.Lock()
defer s.uploadMutex.Unlock()

Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak: The removeUpload function doesn't close the pipe reader before removing the upload from tracking. When an upload is removed (either on error or completion), the reader side of the pipe should be closed to prevent resource leaks and ensure any goroutines blocked on reading from the pipe are unblocked.

Add state.reader.Close() before deleting the upload from the map.

Suggested change
state, ok := s.activeUploads[transferID]
if ok && state != nil && state.reader != nil {
if err := state.reader.Close(); err != nil {
log.Printf("failed to close upload reader for transfer %s: %v", transferID, err)
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +154 to 160
default:
// Non-blocking check for pending file transfers.
// Try to send the next chunk for downloads using round-robin scheduling.
activeDownloads := s.getActiveDownloads()
if len(activeDownloads) == 0 {
continue
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busy-loop in default case: The default case in the select statement will continuously loop without any delay when there are no active downloads, consuming excessive CPU. The 'continue' statement at line 159 immediately restarts the loop without blocking, creating a busy-wait condition.

Consider adding a small sleep/delay when there are no active downloads, or restructure the code to use a channel-based notification system for when downloads become available.

Copilot uses AI. Check for mistakes.
Comment on lines +228 to +248
func (m *clientFileTransferManager) handleFileTransferRequest(ftReq *pb.FileTransferRequest, stream StreamForClient) error {
transferID := ftReq.GetTransferId()
metadata := ftReq.GetMetadata()
path := metadata.GetPath()

// Check if this is an existing download transfer (agent initiating a file send)
existingState := m.getTransfer(transferID)
if existingState != nil && existingState.direction == "download" {
// This is the metadata notification for a download that's about to start
log.Printf("Agent sending file download: %q (transfer_id=%s)\n", path, transferID)
// Update the path with the actual destination from metadata
existingState.path = path

return nil
}

// This is a request for the client to upload a file
log.Printf("Agent requesting file upload: %q (transfer_id=%s)\n", path, transferID)

return m.handleUploadRequest(transferID, path, stream)
}
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic error in download handling: The code at line 234-235 expects a pre-existing download transfer with direction "download", but there's no code path that creates such a transfer before the FileTransferRequest arrives from the agent.

When the agent initiates a download (SendFile), it sends a FileTransferRequest with metadata. On the client side, this will always fall through to line 245 and be treated as an upload request instead of a download notification. This means agent-initiated downloads will fail because the client will try to upload the file path back to the agent.

The client should register a download transfer state when it receives a FileTransferRequest that isn't for a known upload.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants