Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func corePullQRepRecords(

if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
Expand Down Expand Up @@ -460,7 +460,8 @@ func corePullQRepRecords(
return 0, 0, err
}

executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId)
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version,
config.SnapshotName, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
}
Expand Down Expand Up @@ -749,7 +750,7 @@ func pullXminRecordStream(
queryArgs = []any{strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)}
}

executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to create query executor: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
defer connector.Close()

// Create a new QRepQueryExecutor instance
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(b, err, "error while creating QRepQueryExecutor")

// Run the benchmark
Expand Down
59 changes: 37 additions & 22 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
"github.com/PeerDB-io/peerdb/flow/model"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/datatypes"
Expand All @@ -24,19 +25,20 @@ import (
type QRepQueryExecutor struct {
*PostgresConnector
logger log.Logger
env map[string]string
snapshot string
flowJobName string
partitionID string
version uint32
}

func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, version uint32,
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32,
flowJobName string, partitionID string,
) (*QRepQueryExecutor, error) {
return c.NewQRepQueryExecutorSnapshot(ctx, version, "", flowJobName, partitionID)
return c.NewQRepQueryExecutorSnapshot(ctx, env, version, "", flowJobName, partitionID)
}

func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, version uint32,
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32,
snapshot string, flowJobName string, partitionID string,
) (*QRepQueryExecutor, error) {
if _, err := c.fetchCustomTypeMapping(ctx); err != nil {
Expand All @@ -49,6 +51,7 @@ func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, ve
flowJobName: flowJobName,
partitionID: partitionID,
logger: log.With(c.logger, slog.String(string(shared.PartitionIDKey), partitionID)),
env: env,
version: version,
}, nil
}
Expand All @@ -73,13 +76,21 @@ func (qe *QRepQueryExecutor) cursorToSchema(
num uint16
}

strictNullable, err := internal.PeerDBNullableStrict(ctx, qe.env)
if err != nil {
return types.QRecordSchema{}, err
}

rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName)
if err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
}
fds := rows.FieldDescriptions()
tableOIDset := make(map[uint32]struct{})
nullPointers := make(map[attId]*bool, len(fds))
var nullPointers map[attId]*bool
if strictNullable {
nullPointers = make(map[attId]*bool, len(fds))
}
qfields := make([]types.QField, len(fds))
for i, fd := range fds {
tableOIDset[fd.TableOID] = struct{}{}
Expand All @@ -89,38 +100,42 @@ func (qe *QRepQueryExecutor) cursorToSchema(
qfields[i] = types.QField{
Name: fd.Name,
Type: ctype,
Nullable: false,
Nullable: !strictNullable,
Precision: precision,
Scale: scale,
}
} else {
qfields[i] = types.QField{
Name: fd.Name,
Type: ctype,
Nullable: false,
Nullable: !strictNullable,
}
}
nullPointers[attId{
relid: fd.TableOID,
num: fd.TableAttributeNumber,
}] = &qfields[i].Nullable
if nullPointers != nil {
nullPointers[attId{
relid: fd.TableOID,
num: fd.TableAttributeNumber,
}] = &qfields[i].Nullable
}
}
rows.Close()
tableOIDs := slices.Collect(maps.Keys(tableOIDset))

rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
if err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
}
if nullPointers != nil {
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
if err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
}

var att attId
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
if nullPointer, ok := nullPointers[att]; ok {
*nullPointer = true
var att attId
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
if nullPointer, ok := nullPointers[att]; ok {
*nullPointer = true
}
return nil
}); err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
}
return nil
}); err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
}

return types.NewQRecordSchema(qfields), nil
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestExecuteAndProcessQuery(t *testing.T) {
fmt.Sprintf("INSERT INTO %s.test(data) VALUES ('testdata')", utils.QuoteIdentifier(schemaName)))
require.NoError(t, err, "error while inserting data")

qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(t, err, "error while creating QRepQueryExecutor")

batch, err := qe.ExecuteAndProcessQuery(t.Context(), fmt.Sprintf("SELECT * FROM %s.test", utils.QuoteIdentifier(schemaName)))
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestSupportedDataTypes(t *testing.T) {
)
require.NoError(t, err, "error while inserting into test table")

qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(t, err, "error while creating QRepQueryExecutor")
// Select the row back out of the table
batch, err := qe.ExecuteAndProcessQuery(t.Context(),
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestStringDataTypes(t *testing.T) {
_, err = conn.Exec(ctx, query)
require.NoError(t, err)

qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(t, err)
// Select the row back out of the table
batch, err := qe.ExecuteAndProcessQuery(t.Context(),
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *PostgresSource) Exec(ctx context.Context, sql string) error {
}

func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) {
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand All @@ -211,7 +211,7 @@ func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table strin

// to avoid fetching rows from "child" tables ala Postgres table inheritance
func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) {
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func RevokePermissionForTableColumns(ctx context.Context, conn *pgx.Conn, tableI
}

func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error) {
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s PeerFlowE2ETestSuitePG) Exec(ctx context.Context, sql string) error {

func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) {
s.t.Helper()
pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions flow/internal/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_NULLABLE_STRICT",
Description: "Propagate nullability in avro schema during initial load",
DefaultValue: "false",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
Expand Down Expand Up @@ -588,6 +596,10 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE")
}

func PeerDBNullableStrict(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE_STRICT")
}

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT")
if err != nil {
Expand Down
Loading