Skip to content

Commit c3df3b0

Browse files
authored
Merge pull request #59 from PostHog/fix/copy-stdin-performance
Use DuckDB native COPY for COPY FROM STDIN performance
2 parents 364a075 + 7b4d6ee commit c3df3b0

File tree

1 file changed

+78
-54
lines changed

1 file changed

+78
-54
lines changed

server/conn.go

Lines changed: 78 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,9 @@ func (c *clientConn) handleCopyOut(query, upperQuery string) error {
771771

772772
// handleCopyIn handles COPY ... FROM STDIN
773773
func (c *clientConn) handleCopyIn(query, upperQuery string) error {
774+
copyStartTime := time.Now()
775+
log.Printf("[%s] COPY FROM STDIN: starting", c.username)
776+
774777
matches := copyFromStdinRegex.FindStringSubmatch(query)
775778
if len(matches) < 2 {
776779
c.sendError("ERROR", "42601", "Invalid COPY FROM STDIN syntax")
@@ -785,6 +788,7 @@ func (c *clientConn) handleCopyIn(query, upperQuery string) error {
785788
if len(matches) > 2 && matches[2] != "" {
786789
columnList = fmt.Sprintf("(%s)", matches[2])
787790
}
791+
log.Printf("[%s] COPY FROM STDIN: table=%s columns=%s", c.username, tableName, columnList)
788792

789793
// Parse options
790794
delimiter := "\t"
@@ -819,78 +823,98 @@ func (c *clientConn) handleCopyIn(query, upperQuery string) error {
819823
return err
820824
}
821825
c.writer.Flush()
826+
log.Printf("[%s] COPY FROM STDIN: sent CopyInResponse, waiting for data...", c.username)
827+
828+
// Create temp file upfront and stream data directly to it (avoids memory buffering)
829+
// This approach leverages DuckDB's highly optimized CSV parser which handles
830+
// type conversions automatically and can load millions of rows in seconds.
831+
tmpFile, err := os.CreateTemp("", "duckgres-copy-*.csv")
832+
if err != nil {
833+
log.Printf("[%s] COPY FROM STDIN: failed to create temp file: %v", c.username, err)
834+
c.sendError("ERROR", "58000", fmt.Sprintf("failed to create temp file: %v", err))
835+
c.setTxError()
836+
writeReadyForQuery(c.writer, c.txStatus)
837+
c.writer.Flush()
838+
return nil
839+
}
840+
tmpPath := tmpFile.Name()
841+
defer os.Remove(tmpPath)
822842

823-
// Read COPY data from client
824-
var allData bytes.Buffer
843+
// Stream COPY data directly to temp file (no memory buffering)
825844
rowCount := 0
826-
headerSkipped := false
845+
copyDataMessages := 0
846+
bytesWritten := int64(0)
847+
dataReceiveStart := time.Now()
827848

828849
for {
829850
msgType, body, err := readMessage(c.reader)
830851
if err != nil {
852+
log.Printf("[%s] COPY FROM STDIN: error reading message: %v", c.username, err)
853+
tmpFile.Close()
831854
return err
832855
}
833856

834857
switch msgType {
835858
case msgCopyData:
836-
allData.Write(body)
859+
n, err := tmpFile.Write(body)
860+
if err != nil {
861+
log.Printf("[%s] COPY FROM STDIN: failed to write to temp file: %v", c.username, err)
862+
tmpFile.Close()
863+
c.sendError("ERROR", "58000", fmt.Sprintf("failed to write to temp file: %v", err))
864+
c.setTxError()
865+
writeReadyForQuery(c.writer, c.txStatus)
866+
c.writer.Flush()
867+
return nil
868+
}
869+
bytesWritten += int64(n)
870+
copyDataMessages++
871+
if copyDataMessages%10000 == 0 {
872+
log.Printf("[%s] COPY FROM STDIN: received %d CopyData messages, %d bytes written",
873+
c.username, copyDataMessages, bytesWritten)
874+
}
837875

838876
case msgCopyDone:
839-
// Process all data using proper CSV reader
840-
// This correctly handles multi-line quoted fields (e.g., JSON with embedded newlines)
841-
csvReader := csv.NewReader(&allData)
842-
csvReader.Comma = rune(delimiter[0])
843-
csvReader.LazyQuotes = true
844-
csvReader.FieldsPerRecord = -1 // Allow variable number of fields
845-
846-
for {
847-
values, err := csvReader.Read()
848-
if err == io.EOF {
849-
break
850-
}
851-
if err != nil {
852-
c.sendError("ERROR", "22P02", fmt.Sprintf("invalid CSV input: %v", err))
853-
c.setTxError()
854-
writeReadyForQuery(c.writer, c.txStatus)
855-
c.writer.Flush()
856-
return nil
857-
}
877+
tmpFile.Close()
878+
dataReceiveElapsed := time.Since(dataReceiveStart)
879+
log.Printf("[%s] COPY FROM STDIN: CopyDone received - %d messages, %d bytes in %v",
880+
c.username, copyDataMessages, bytesWritten, dataReceiveElapsed)
881+
882+
// Build DuckDB COPY FROM statement
883+
// DuckDB syntax: COPY table FROM 'file' (FORMAT CSV, HEADER, NULL 'value', DELIMITER ',')
884+
copyOptions := []string{"FORMAT CSV"}
885+
if hasHeader {
886+
copyOptions = append(copyOptions, "HEADER")
887+
}
888+
if nullString != "\\N" {
889+
copyOptions = append(copyOptions, fmt.Sprintf("NULL '%s'", nullString))
890+
}
891+
if delimiter != "," {
892+
copyOptions = append(copyOptions, fmt.Sprintf("DELIMITER '%s'", delimiter))
893+
}
858894

859-
// Skip empty rows
860-
if len(values) == 0 || (len(values) == 1 && values[0] == "") {
861-
continue
862-
}
895+
copySQL := fmt.Sprintf("COPY %s %s FROM '%s' (%s)",
896+
tableName, columnList, tmpPath, strings.Join(copyOptions, ", "))
863897

864-
// Skip header if needed
865-
if hasHeader && !headerSkipped {
866-
headerSkipped = true
867-
continue
868-
}
898+
log.Printf("[%s] COPY FROM STDIN: executing native DuckDB COPY: %s", c.username, copySQL)
899+
loadStart := time.Now()
869900

870-
// Build INSERT statement
871-
placeholders := make([]string, len(values))
872-
args := make([]interface{}, len(values))
873-
for i, v := range values {
874-
placeholders[i] = "?"
875-
if v == nullString || v == "\\N" || v == "" {
876-
args[i] = nil
877-
} else {
878-
args[i] = v
879-
}
880-
}
901+
result, err := c.db.Exec(copySQL)
902+
if err != nil {
903+
log.Printf("[%s] COPY FROM STDIN: DuckDB COPY failed: %v", c.username, err)
904+
c.sendError("ERROR", "22P02", fmt.Sprintf("COPY failed: %v", err))
905+
c.setTxError()
906+
writeReadyForQuery(c.writer, c.txStatus)
907+
c.writer.Flush()
908+
return nil
909+
}
881910

882-
insertSQL := fmt.Sprintf("INSERT INTO %s %s VALUES (%s)",
883-
tableName, columnList, strings.Join(placeholders, ", "))
911+
rowCount64, _ := result.RowsAffected()
912+
rowCount = int(rowCount64)
884913

885-
if _, err := c.db.Exec(insertSQL, args...); err != nil {
886-
c.sendError("ERROR", "22P02", fmt.Sprintf("invalid input: %v", err))
887-
c.setTxError()
888-
writeReadyForQuery(c.writer, c.txStatus)
889-
c.writer.Flush()
890-
return nil
891-
}
892-
rowCount++
893-
}
914+
totalElapsed := time.Since(copyStartTime)
915+
loadElapsed := time.Since(loadStart)
916+
log.Printf("[%s] COPY FROM STDIN: completed - %d rows in %v (DuckDB load: %v)",
917+
c.username, rowCount, totalElapsed, loadElapsed)
894918

895919
writeCommandComplete(c.writer, fmt.Sprintf("COPY %d", rowCount))
896920
writeReadyForQuery(c.writer, c.txStatus)

0 commit comments

Comments
 (0)