Skip to content

Commit 7b4d6ee

Browse files
EDsCODEclaude
andcommitted
Stream COPY data directly to temp file to avoid memory buffering
Previously, all COPY data was buffered in memory before writing to a temp file. For large imports (millions of rows), this could exhaust memory. Now we create the temp file upfront and stream data directly to it as CopyData messages arrive, avoiding memory accumulation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent c6bf444 commit 7b4d6ee

File tree

1 file changed

+32
-36
lines changed

1 file changed

+32
-36
lines changed

server/conn.go

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -825,63 +825,59 @@ func (c *clientConn) handleCopyIn(query, upperQuery string) error {
825825
c.writer.Flush()
826826
log.Printf("[%s] COPY FROM STDIN: sent CopyInResponse, waiting for data...", c.username)
827827

828-
// Read COPY data from client
829-
var allData bytes.Buffer
828+
// Create temp file upfront and stream data directly to it (avoids memory buffering)
829+
// This approach leverages DuckDB's highly optimized CSV parser which handles
830+
// type conversions automatically and can load millions of rows in seconds.
831+
tmpFile, err := os.CreateTemp("", "duckgres-copy-*.csv")
832+
if err != nil {
833+
log.Printf("[%s] COPY FROM STDIN: failed to create temp file: %v", c.username, err)
834+
c.sendError("ERROR", "58000", fmt.Sprintf("failed to create temp file: %v", err))
835+
c.setTxError()
836+
writeReadyForQuery(c.writer, c.txStatus)
837+
c.writer.Flush()
838+
return nil
839+
}
840+
tmpPath := tmpFile.Name()
841+
defer os.Remove(tmpPath)
842+
843+
// Stream COPY data directly to temp file (no memory buffering)
830844
rowCount := 0
831845
copyDataMessages := 0
846+
bytesWritten := int64(0)
832847
dataReceiveStart := time.Now()
833848

834849
for {
835850
msgType, body, err := readMessage(c.reader)
836851
if err != nil {
837852
log.Printf("[%s] COPY FROM STDIN: error reading message: %v", c.username, err)
853+
tmpFile.Close()
838854
return err
839855
}
840856

841857
switch msgType {
842858
case msgCopyData:
843-
allData.Write(body)
844-
copyDataMessages++
845-
if copyDataMessages%10000 == 0 {
846-
log.Printf("[%s] COPY FROM STDIN: received %d CopyData messages, %d bytes total",
847-
c.username, copyDataMessages, allData.Len())
848-
}
849-
850-
case msgCopyDone:
851-
dataReceiveElapsed := time.Since(dataReceiveStart)
852-
log.Printf("[%s] COPY FROM STDIN: CopyDone received - %d messages, %d bytes in %v",
853-
c.username, copyDataMessages, allData.Len(), dataReceiveElapsed)
854-
855-
// Write data to a temp file and use DuckDB's native COPY FROM for performance.
856-
// This approach leverages DuckDB's highly optimized CSV parser which handles
857-
// type conversions automatically and can load millions of rows in seconds.
858-
// The temp file is created, used, and deleted within this single request.
859-
tmpFile, err := os.CreateTemp("", "duckgres-copy-*.csv")
859+
n, err := tmpFile.Write(body)
860860
if err != nil {
861-
log.Printf("[%s] COPY FROM STDIN: failed to create temp file: %v", c.username, err)
862-
c.sendError("ERROR", "58000", fmt.Sprintf("failed to create temp file: %v", err))
861+
log.Printf("[%s] COPY FROM STDIN: failed to write to temp file: %v", c.username, err)
862+
tmpFile.Close()
863+
c.sendError("ERROR", "58000", fmt.Sprintf("failed to write to temp file: %v", err))
863864
c.setTxError()
864865
writeReadyForQuery(c.writer, c.txStatus)
865866
c.writer.Flush()
866867
return nil
867868
}
868-
tmpPath := tmpFile.Name()
869-
defer os.Remove(tmpPath)
869+
bytesWritten += int64(n)
870+
copyDataMessages++
871+
if copyDataMessages%10000 == 0 {
872+
log.Printf("[%s] COPY FROM STDIN: received %d CopyData messages, %d bytes written",
873+
c.username, copyDataMessages, bytesWritten)
874+
}
870875

871-
// Write data to temp file
872-
writeStart := time.Now()
873-
bytesWritten, err := tmpFile.Write(allData.Bytes())
876+
case msgCopyDone:
874877
tmpFile.Close()
875-
if err != nil {
876-
log.Printf("[%s] COPY FROM STDIN: failed to write temp file: %v", c.username, err)
877-
c.sendError("ERROR", "58000", fmt.Sprintf("failed to write temp file: %v", err))
878-
c.setTxError()
879-
writeReadyForQuery(c.writer, c.txStatus)
880-
c.writer.Flush()
881-
return nil
882-
}
883-
log.Printf("[%s] COPY FROM STDIN: wrote %d bytes to temp file in %v",
884-
c.username, bytesWritten, time.Since(writeStart))
878+
dataReceiveElapsed := time.Since(dataReceiveStart)
879+
log.Printf("[%s] COPY FROM STDIN: CopyDone received - %d messages, %d bytes in %v",
880+
c.username, copyDataMessages, bytesWritten, dataReceiveElapsed)
885881

886882
// Build DuckDB COPY FROM statement
887883
// DuckDB syntax: COPY table FROM 'file' (FORMAT CSV, HEADER, NULL 'value', DELIMITER ',')

0 commit comments

Comments
 (0)