Skip to content

Commit e49b3fe

Browse files
serprexAmogh-Bharadwaj
authored andcommitted
pg qrep: avro schema nullable by default
1 parent 7916790 commit e49b3fe

File tree

1 file changed

+3
-30
lines changed

1 file changed

+3
-30
lines changed

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"log/slog"
8-
"maps"
98
"slices"
109

1110
"github.com/jackc/pgx/v5"
@@ -68,18 +67,13 @@ func (qe *QRepQueryExecutor) cursorToSchema(
6867
tx pgx.Tx,
6968
cursorName string,
7069
) (types.QRecordSchema, error) {
71-
type attId struct {
72-
relid uint32
73-
num uint16
74-
}
75-
7670
rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName)
7771
if err != nil {
7872
return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
7973
}
74+
defer rows.Close()
8075
fds := rows.FieldDescriptions()
8176
tableOIDset := make(map[uint32]struct{})
82-
nullPointers := make(map[attId]*bool, len(fds))
8377
qfields := make([]types.QField, len(fds))
8478
for i, fd := range fds {
8579
tableOIDset[fd.TableOID] = struct{}{}
@@ -89,38 +83,17 @@ func (qe *QRepQueryExecutor) cursorToSchema(
8983
qfields[i] = types.QField{
9084
Name: fd.Name,
9185
Type: ctype,
92-
Nullable: false,
86+
Nullable: true,
9387
Precision: precision,
9488
Scale: scale,
9589
}
9690
} else {
9791
qfields[i] = types.QField{
9892
Name: fd.Name,
9993
Type: ctype,
100-
Nullable: false,
94+
Nullable: true,
10195
}
10296
}
103-
nullPointers[attId{
104-
relid: fd.TableOID,
105-
num: fd.TableAttributeNumber,
106-
}] = &qfields[i].Nullable
107-
}
108-
rows.Close()
109-
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
110-
111-
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
112-
if err != nil {
113-
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
114-
}
115-
116-
var att attId
117-
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
118-
if nullPointer, ok := nullPointers[att]; ok {
119-
*nullPointer = true
120-
}
121-
return nil
122-
}); err != nil {
123-
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
12497
}
12598

12699
return types.NewQRecordSchema(qfields), nil

0 commit comments

Comments
 (0)