Skip to content

Commit 52fd907

Browse files
committed
pg qrep: avro schema nullable by default
1 parent 41ca799 commit 52fd907

File tree

1 file changed

+3
-25
lines changed

1 file changed

+3
-25
lines changed

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"log/slog"
8-
"maps"
98
"slices"
109

1110
"github.com/jackc/pgx/v5"
@@ -77,9 +76,9 @@ func (qe *QRepQueryExecutor) cursorToSchema(
7776
if err != nil {
7877
return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
7978
}
79+
defer rows.Close()
8080
fds := rows.FieldDescriptions()
8181
tableOIDset := make(map[uint32]struct{})
82-
nullPointers := make(map[attId]*bool, len(fds))
8382
qfields := make([]types.QField, len(fds))
8483
for i, fd := range fds {
8584
tableOIDset[fd.TableOID] = struct{}{}
@@ -89,38 +88,17 @@ func (qe *QRepQueryExecutor) cursorToSchema(
8988
qfields[i] = types.QField{
9089
Name: fd.Name,
9190
Type: ctype,
92-
Nullable: false,
91+
Nullable: true,
9392
Precision: precision,
9493
Scale: scale,
9594
}
9695
} else {
9796
qfields[i] = types.QField{
9897
Name: fd.Name,
9998
Type: ctype,
100-
Nullable: false,
99+
Nullable: true,
101100
}
102101
}
103-
nullPointers[attId{
104-
relid: fd.TableOID,
105-
num: fd.TableAttributeNumber,
106-
}] = &qfields[i].Nullable
107-
}
108-
rows.Close()
109-
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
110-
111-
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
112-
if err != nil {
113-
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
114-
}
115-
116-
var att attId
117-
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
118-
if nullPointer, ok := nullPointers[att]; ok {
119-
*nullPointer = true
120-
}
121-
return nil
122-
}); err != nil {
123-
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
124102
}
125103

126104
return types.NewQRecordSchema(qfields), nil

0 commit comments

Comments
 (0)