Skip to content

Commit 364a075

Browse files
authored
Merge pull request #58 from PostHog/fix/copy-multiline-csv
Fix COPY FROM STDIN to handle multi-line CSV fields
2 parents 8d7ffda + 9c91695 commit 364a075

File tree

2 files changed

+112
-11
lines changed

2 files changed

+112
-11
lines changed

server/conn.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -836,11 +836,28 @@ func (c *clientConn) handleCopyIn(query, upperQuery string) error {
836836
allData.Write(body)
837837

838838
case msgCopyDone:
839-
// Process all data
840-
lines := strings.Split(allData.String(), "\n")
841-
for _, line := range lines {
842-
line = strings.TrimSpace(line)
843-
if line == "" {
839+
// Process all data using proper CSV reader
840+
// This correctly handles multi-line quoted fields (e.g., JSON with embedded newlines)
841+
csvReader := csv.NewReader(&allData)
842+
csvReader.Comma = rune(delimiter[0])
843+
csvReader.LazyQuotes = true
844+
csvReader.FieldsPerRecord = -1 // Allow variable number of fields
845+
846+
for {
847+
values, err := csvReader.Read()
848+
if err == io.EOF {
849+
break
850+
}
851+
if err != nil {
852+
c.sendError("ERROR", "22P02", fmt.Sprintf("invalid CSV input: %v", err))
853+
c.setTxError()
854+
writeReadyForQuery(c.writer, c.txStatus)
855+
c.writer.Flush()
856+
return nil
857+
}
858+
859+
// Skip empty rows
860+
if len(values) == 0 || (len(values) == 1 && values[0] == "") {
844861
continue
845862
}
846863

@@ -850,12 +867,6 @@ func (c *clientConn) handleCopyIn(query, upperQuery string) error {
850867
continue
851868
}
852869

853-
// Parse values and insert
854-
values := c.parseCopyLine(line, delimiter)
855-
if len(values) == 0 {
856-
continue
857-
}
858-
859870
// Build INSERT statement
860871
placeholders := make([]string, len(values))
861872
args := make([]interface{}, len(values))

server/conn_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package server
22

33
import (
4+
"encoding/csv"
5+
"io"
6+
"strings"
47
"testing"
58
)
69

@@ -642,3 +645,90 @@ func TestParseCopyLine(t *testing.T) {
642645
})
643646
}
644647
}
648+
649+
func TestParseMultiLineCSV(t *testing.T) {
650+
// This tests the fix for COPY FROM STDIN with multi-line quoted fields.
651+
// Previously, we split by newlines first then parsed each line, which broke
652+
// when quoted fields contained embedded newlines (e.g., JSON with formatting).
653+
// Now we use csv.Reader on the entire buffer which handles this correctly.
654+
655+
tests := []struct {
656+
name string
657+
input string
658+
delimiter string
659+
expected [][]string
660+
}{
661+
{
662+
name: "simple rows no newlines",
663+
input: "a,b,c\n1,2,3\n",
664+
delimiter: ",",
665+
expected: [][]string{{"a", "b", "c"}, {"1", "2", "3"}},
666+
},
667+
{
668+
name: "quoted field with embedded newline",
669+
input: "id,json,status\n1,\"{\"\"key\"\":\n\"\"value\"\"}\",active\n",
670+
delimiter: ",",
671+
expected: [][]string{{"id", "json", "status"}, {"1", "{\"key\":\n\"value\"}", "active"}},
672+
},
673+
{
674+
name: "multiple fields with newlines",
675+
input: "a,\"line1\nline2\",b\nc,\"x\ny\nz\",d\n",
676+
delimiter: ",",
677+
expected: [][]string{{"a", "line1\nline2", "b"}, {"c", "x\ny\nz", "d"}},
678+
},
679+
{
680+
name: "JSON metadata like Fivetran sends",
681+
input: "customer_id,metadata,type\ncust_123,\"{\"\"subscription\"\":\n \"\"active\"\",\n \"\"plan\"\": \"\"pro\"\"}\",invoice\n",
682+
delimiter: ",",
683+
expected: [][]string{{"customer_id", "metadata", "type"}, {"cust_123", "{\"subscription\":\n \"active\",\n \"plan\": \"pro\"}", "invoice"}},
684+
},
685+
{
686+
name: "13 columns with multiline JSON in middle",
687+
input: "c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13\nv1,v2,v3,v4,v5,v6,\"{\"\"a\"\":\n\"\"b\"\"}\",v8,v9,v10,v11,v12,v13\n",
688+
delimiter: ",",
689+
expected: [][]string{{"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"}, {"v1", "v2", "v3", "v4", "v5", "v6", "{\"a\":\n\"b\"}", "v8", "v9", "v10", "v11", "v12", "v13"}},
690+
},
691+
}
692+
693+
for _, tt := range tests {
694+
t.Run(tt.name, func(t *testing.T) {
695+
// Simulate what handleCopyIn does: use csv.Reader on entire buffer
696+
reader := csv.NewReader(strings.NewReader(tt.input))
697+
reader.Comma = rune(tt.delimiter[0])
698+
reader.LazyQuotes = true
699+
reader.FieldsPerRecord = -1
700+
701+
var rows [][]string
702+
for {
703+
record, err := reader.Read()
704+
if err == io.EOF {
705+
break
706+
}
707+
if err != nil {
708+
t.Fatalf("csv.Read() error: %v", err)
709+
}
710+
rows = append(rows, record)
711+
}
712+
713+
if len(rows) != len(tt.expected) {
714+
t.Errorf("got %d rows, want %d\nGot: %v\nWant: %v",
715+
len(rows), len(tt.expected), rows, tt.expected)
716+
return
717+
}
718+
719+
for i, row := range rows {
720+
if len(row) != len(tt.expected[i]) {
721+
t.Errorf("row %d: got %d columns, want %d\nGot: %v\nWant: %v",
722+
i, len(row), len(tt.expected[i]), row, tt.expected[i])
723+
continue
724+
}
725+
for j, val := range row {
726+
if val != tt.expected[i][j] {
727+
t.Errorf("row %d col %d: got %q, want %q",
728+
i, j, val, tt.expected[i][j])
729+
}
730+
}
731+
}
732+
})
733+
}
734+
}

0 commit comments

Comments
 (0)